🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

A presentation at Confluent Online Talk in December 2019 in by Robin Moffatt

Slide 1

Slide 1

🚂 On Track with Apache Kafka®: Building a Streaming ETL solution with Rail Data @rmoff @confluentinc

Slide 2

Slide 2

Slide 3

Slide 3

h/t @kennybastani / @gamussa

Slide 4

Slide 4

Slide 5

Slide 5

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 6

Slide 6

@rmoff @confluentinc https:!//wiki.openraildata.com 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 7

Slide 7

Real-time visualisation of rail data @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 8

Slide 8

Real-time visualisation of rail data @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 9

Slide 9

Why do trains get cancelled? @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 10

Slide 10

@rmoff @confluentinc Do cancellation reasons vary by Train Operator? 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 11

Slide 11

@rmoff @confluentinc Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 12

Slide 12

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 13

Slide 13

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 14

Slide 14

ig nf Co Co nf ig @rmoff @confluentinc SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 15

Slide 15

Code! @rmoff @confluentinc http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 16

Slide 16

@rmoff @confluentinc Getting the data in 📥 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 17

Slide 17

@rmoff @confluentinc Data Sources Train Movements Continuous feed Schedules Updated Daily Reference data Static 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 18

Slide 18

@rmoff @confluentinc Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 19

Slide 19

kafkacat @rmoff @confluentinc • Kafka CLI producer & consumer • Also metadata inspection • Integrates beautifully with unix pipes! curl -s “https:”//my.api.endpoint” | \ kafkacat -b localhost:9092 -t topic -P • Get it! • https:!//github.com/edenhill/kafkacat/ • apt-get install kafkacat • brew install kafkacat • https:!//hub.docker.com/r/confluentinc/cp-kafkacat/ https://rmoff.net/2018/05/10/quick-n-easy-population-of-realistic-test-data-into-kafka/ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 20

Slide 20

https://wiki.openraildata.com/index.php?title=Train_Movements Movement data @rmoff @confluentinc A train’s movements Arrived / departed Where What time Variation from timetable A train was cancelled Where When Why A train was ‘activated’ Links a train to a schedule 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 21

Slide 21

Ingest ActiveMQ with Kafka Connect “connector.class” : “activemq.url” : “jms.destination.type”: “jms.destination.name”: “kafka.topic” : @rmoff @confluentinc “io.confluent.connect.activemq.ActiveMQSourceConnector”, “tcp:”//datafeeds.networkrail.co.uk:61619”, “topic”, “TRAIN_MVT_EA_TOC”, “networkrail_TRAIN_MVT”, Kafka Movement events Kafka Connect Northern Trains GNER* TransPennine networkrail_TRAIN_MVT

  • Remember them? 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 22

Slide 22

@rmoff @confluentinc Envelope Payload 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 23

Slide 23

Message transformation Extract element Convert JSON @rmoff @confluentinc Explode Array kafkacat -b localhost:9092 -G tm_explode networkrail_TRAIN_MVT -u | \ jq -c ‘.text|fromjson[]’ | \ kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -T -P 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 24

Slide 24

https://wiki.openraildata.com/index.php?title=SCHEDULE Schedule data @rmoff @confluentinc Train routes Type of train (diesel/electric) Classes, sleeping accomodation, reservations, etc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 25

Slide 25

Load JSON from S3 into Kafka @rmoff @confluentinc curl -s -L -u “$USER:$PW” “$FEED_URL” | \ gunzip | \ kafkacat -b localhost -P -t CIF_FULL_DAILY 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 26

Slide 26

https://wiki.openraildata.com/index.php?title=SCHEDULE Reference data @rmoff @confluentinc Train company names Location codes Train type codes Location geocodes 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 27

Slide 27

Geocoding @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 28

Slide 28

Static reference data @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 29

Slide 29

@rmoff @confluentinc AA:{“canx_reason_code”:”AA”,”canx_reason”:”Waiting acceptance into off Network Terminal or Yard”} AC:{“canx_reason_code”:”AC”,”canx_reason”:”Waiting train preparation or completion of TOPS list/RT3973”} AE:{“canx_reason_code”:”AE”,”canx_reason”:”Congestion in off Network Terminal or Yard”} AG:{“canx_reason_code”:”AG”,”canx_reason”:”Wagon load incident including adjusting loads or open door”} AJ:{“canx_reason_code”:”AJ”,”canx_reason”:”Waiting Customer’s traffic including documentation”} DA:{“canx_reason_code”:”DA”,”canx_reason”:”Non Technical Fleet Holding Code”} DB:{“canx_reason_code”:”DB”,”canx_reason”:”Train Operations Holding Code”} DC:{“canx_reason_code”:”DC”,”canx_reason”:”Train Crew Causes Holding Code”} DD:{“canx_reason_code”:”DD”,”canx_reason”:”Technical Fleet Holding Code”} DE:{“canx_reason_code”:”DE”,”canx_reason”:”Station Causes Holding Code”} DF:{“canx_reason_code”:”DF”,”canx_reason”:”External Causes Holding Code”} DG:{“canx_reason_code”:”DG”,”canx_reason”:”Freight Terminal and or Yard Holding Code”} DH:{“canx_reason_code”:”DH”,”canx_reason”:”Adhesion and or Autumn Holding Code”} FA:{“canx_reason_code”:”FA”,”canx_reason”:”Dangerous goods incident”} FC:{“canx_reason_code”:”FC”,”canx_reason”:”Freight train driver”} kafkacat -l canx_reason_code.dat -b localhost:9092 -t canx_reason_code -P -K: 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 30

Slide 30

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 31

Slide 31

@rmoff @confluentinc Transforming the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 32

Slide 32

@rmoff @confluentinc Movement Activation Schedule Location 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 33

Slide 33

@rmoff @confluentinc Streams of events Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 34

Slide 34

Stream Processing with KSQL @rmoff @confluentinc Stream: widgets Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 35

Slide 35

Stream Processing with KSQL @rmoff @confluentinc Stream: widgets CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 36

Slide 36

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 37

Slide 37

Event data @rmoff @confluentinc ActiveMQ Kafka Connect NETWORKRAIL_TRAIN_MVT_X 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 38

Slide 38

@rmoff @confluentinc Message routing with KSQL TRAIN_ACTIVATIONS Time NETWORKRAIL_TRAIN_MVT_X ’ 1 0 00 ’ = E YP T _ G MS TRAIN_CANCELLATIONS Time MSG _TY Time MSG_TYPE=’0002’ PE= ‘00 03’ TRAIN_MOVEMENTS Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 39

Slide 39

@rmoff @confluentinc Split a stream of data (message routing) CREATE STREAM TRAIN_MOVEMENTS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X s t n e v e t n e m e ’ v 3 o 0 0 0 M ’ = E P Y T _ G MS WHERE header!->msg_type = ‘0003’; NETWORKRAIL_TRAIN_MVT_X Source topic CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X WHERE header!->msg_type = ‘0002’; Can cella t MSG _TY ion e v e PE= n t ‘00 s 0 2’ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 40

Slide 40

@rmoff @confluentinc Joining events to lookup data (stream-table joins) TS 10:00:01 LOC_ID 42 TS 10:00:02 TRAIN_MOVEMENTS (Stream) LOCATION (Table) ID DESCRIPTION 42 MENSTON 43 Ilkley LOC_ID 43 TS 10:00:01 LOC_ID 42 LOC_DESC Menston TS 10:00:02 LOC_ID 43 CREATE STREAM TRAIN_MOVEMENTS_ENRICHED AS SELECT TM.LOC_ID AS LOC_ID, TM.TS AS TS, L.DESCRIPTION AS LOC_DESC, […] FROM TRAIN_MOVEMENTS TM TRAIN_MOVEMENTS_ENRICHED LEFT JOIN LOCATION L (Stream) ON TM.LOC_ID = L.ID; LOC_DESC Ilkley 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 41

Slide 41

@rmoff @confluentinc Decode values, and generate composite key +———————-+————————+————-+——————————-+————————+————————————+ | CIF_train_uid | sched_start_dt | stp_ind | SCHEDULE_KEY | CIF_power_type | POWER_TYPE | +———————-+————————+————-+——————————-+————————+————————————+ | Y82535 | 2019-05-20 | P | Y82535/2019-05-20/P | E | Electric | | Y82537 | 2019-05-20 | P | Y82537/2019-05-20/P | DMU | Other | | Y82542 | 2019-05-20 | P | Y82542/2019-05-20/P | D | Diesel | | Y82535 | 2019-05-24 | P | Y82535/2019-05-24/P | EMU | Electric Multiple Unit | | Y82542 | 2019-05-24 | P | Y82542/2019-05-24/P | HST | High Speed Train | CASE SELECT JsonScheduleV1”->CIF_train_uid + ‘/’ WHEN JsonScheduleV1”->schedule_segment”->CIF_power_type = ‘D’

  • JsonScheduleV1”->schedule_start_date + ‘/’ THEN
  • JsonScheduleV1”->CIF_stp_indicator AS SCHEDULE_KEY, ‘Diesel’ WHEN JsonScheduleV1”->schedule_segment”->CIF_power_type = ‘E’ THEN ‘Electric’ WHEN JsonScheduleV1”->schedule_segment”->CIF_power_type = ‘ED’ THEN ‘Electro-Diesel’ END AS POWER_TYPE 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 42

Slide 42

Schemas @rmoff @confluentinc CREATE STREAM SCHEDULE_RAW ( TiplocV1 STRUCT<transaction_type tiploc_code NALCO STANOX crs_code description tps_description WITH (KAFKA_TOPIC=’CIF_FULL_DAILY’, VALUE_FORMAT=’JSON’); VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR>) 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 43

Slide 43

@rmoff @confluentinc Handling JSON data CREATE STREAM NETWORKRAIL_TRAIN_MVT_X ( header STRUCT< msg_type VARCHAR, […] >, body VARCHAR) WITH (KAFKA_TOPIC=’networkrail_TRAIN_MVT_X’, VALUE_FORMAT=’JSON’); CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT HEADER, EXTRACTJSONFIELD(body,’$.canx_timestamp’) AS canx_timestamp, EXTRACTJSONFIELD(body,’$.canx_reason_code’) AS canx_reason_code, […] FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 44

Slide 44

@rmoff @confluentinc Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 45

Slide 45

@rmoff @confluentinc The Confluent Schema Registry Avro Schema Schema Registry Target Source KSQL Avro Message Avro Message Kafka Connect 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 46

Slide 46

@rmoff @confluentinc Reserialise data CREATE STREAM TRAIN_CANCELLATIONS_00 WITH (VALUE_FORMAT=’AVRO’) AS SELECT * FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 47

Slide 47

@rmoff @confluentinc Flatten and re-key a stream { } “TiplocV1”: { “transaction_type”: “Create”, “tiploc_code”: “ABWD”, “nalco”: “513100”, “stanox”: “88601”, “crs_code”: “ABW”, “description”: “ABBEY WOOD”, “tps_description”: “ABBEY WOOD” } Access nested CREATE STREAM TIPLOC_FLAT_KEYED SELECT TiplocV1”->TRANSACTION_TYPE TiplocV1”->TIPLOC_CODE TiplocV1”->NALCO TiplocV1”->STANOX TiplocV1”->CRS_CODE TiplocV1”->DESCRIPTION TiplocV1”->TPS_DESCRIPTION FROM SCHEDULE_RAW PARTITION BY TIPLOC_CODE; element AS AS AS AS AS AS AS TRANSACTION_TYPE , TIPLOC_CODE , NALCO , STANOX , CRS_CODE , DESCRIPTION , TPS_DESCRIPTION y e e g k n g a n h i C n ksql> DESCRIBE TIPLOC_FLAT_KEYED; io t i t r a p Name : TIPLOC_FLAT_KEYED Field | Type ——————————————————————-TRANSACTION_TYPE | VARCHAR(STRING) TIPLOC_CODE | VARCHAR(STRING) NALCO | VARCHAR(STRING) STANOX | VARCHAR(STRING) CRS_CODE | VARCHAR(STRING) DESCRIPTION | VARCHAR(STRING) TPS_DESCRIPTION | VARCHAR(STRING) ——————————————————————— 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 48

Slide 48

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 49

Slide 49

@rmoff @confluentinc Using the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 50

Slide 50

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 51

Slide 51

@rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 52

Slide 52

@rmoff @confluentinc Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 53

Slide 53

@rmoff @confluentinc Kafka -> Elasticsearch { “connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “connection.url”: “http:”//elasticsearch:9200”, “type.name”: “type.name=kafkaconnect”, “key.ignore”: “false”, “schema.ignore”: “true”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 54

Slide 54

@rmoff @confluentinc Kibana for real-time visualisation of rail data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 55

Slide 55

Kibana for real-time analysis of rail data @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 56

Slide 56

@rmoff @confluentinc Kafka -> Postgres { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter”, “connection.url”: “jdbc:postgresql:”//postgres:5432/”, “connection.user”: “postgres”, “connection.password”: “postgres”, “auto.create”: true, “auto.evolve”: true, “insert.mode”: “upsert”, “pk.mode”: “record_key”, “pk.fields”: “MESSAGE_KEY”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “transforms”: “dropArrays”, “transforms.dropArrays.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.dropArrays.blacklist”: “SCHEDULE_SEGMENT_LOCATION, HEADER” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 57

Slide 57

Kafka -> Postgres @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 58

Slide 58

But do you even need a database? @rmoff @confluentinc TRAIN_MOVEMENTS Time SELECT TIMESTAMPTOSTRING(WINDOWSTART(),’yyyy-MM-dd’) AS WINDOW_START_TS, SUM(CASE WHEN TOC = ‘Arriva Trains Northern’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘East Midlands Trains’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘London North Eastern Railway’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘TransPennine Express’ THEN 1 ELSE 0 FROM TRAIN_MOVEMENTS WINDOW TUMBLING (SIZE 1 DAY) GROUP BY VARIATION_STATUS; TOC_STATS Aggregate Time Aggregate VARIATION_STATUS, END) AS Arriva_CT, END) AS EastMidlands_CT, END) AS LNER_CT, END) AS TransPennineExpress_CT +——————+——————-+————+—————+———+———-+ | Date | Variation | Arriva | East Mid | LNER | TPE | +——————+——————-+————+—————+———+———-+ | 2019-07-02 | OFF ROUTE | 46 | 78 | 20 | 167 | | 2019-07-02 | ON TIME | 19083 | 3568 | 1509 | 2916 | | 2019-07-02 | LATE | 30850 | 7953 | 5420 | 9042 | | 2019-07-02 | EARLY | 11478 | 3518 | 1349 | 2096 | | 2019-07-03 | OFF ROUTE | 79 | 25 | 41 | 213 | | 2019-07-03 | ON TIME | 19512 | 4247 | 1915 | 2936 | | 2019-07-03 | LATE | 37357 | 8258 | 5342 | 11016 | | 2019-07-03 | EARLY | 11825 | 4574 | 1888 | 2094 | 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 59

Slide 59

{ @rmoff @confluentinc Kafka -> S3 “connector.class”: “io.confluent.connect.s3.S3SinkConnector”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “tasks.max”: “1”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “s3.region”: “us-west-2”, “s3.bucket.name”: “rmoff-rail-streaming-demo”, “flush.size”: “65536”, “storage.class”: “io.confluent.connect.s3.storage.S3Storage”, “format.class”: “io.confluent.connect.s3.format.avro.AvroFormat”, “schema.generator.class”: “io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator”, “schema.compatibility”: “NONE”, “partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”, “path.format”:“‘year’=YYYY/’month’=MM/’day’=dd”, “timestamp.extractor”:”Record”, “partition.duration.ms”: 1800000, “locale”:”en”, “timezone”: “UTC” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 60

Slide 60

@rmoff @confluentinc Kafka -> S3 $ aws s3 ls s3:!//rmoff-rail-streaming-demo/topics/TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00/year=2019/month=07/day=07/ 2019-07-07 10:05:41 2019-07-07 19:47:11 200995548 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0004963416.avro 87631494 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0005007151.avro 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 61

Slide 61

@rmoff @confluentinc Infering table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 62

Slide 62

@rmoff @confluentinc Inferring table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 63

Slide 63

Analysis with AWS Athena @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 64

Slide 64

@rmoff @confluentinc Graph analysis { “connector.class”: “streams.kafka.connect.sink.Neo4jSinkConnector”, “topics”: “TRAIN_CANCELLATIONS_02”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “errors.tolerance”: “all”, “errors.deadletterqueue.topic.name”: “sink-neo4j-train-00_dlq”, “errors.deadletterqueue.topic.replication.factor”: 1, “errors.deadletterqueue.context.headers.enable”:true, “neo4j.server.uri”: “bolt:”//neo4j:7687”, “neo4j.authentication.basic.username”: “neo4j”, “neo4j.authentication.basic.password”: “connect”, “neo4j.topic.cypher.TRAIN_CANCELLATIONS_02”: “MERGE (train:train{id: event.TRAIN_ID}) MERGE (toc:toc{toc: event.TOC}) MERGE (canx_reason:canx_reason{reason: event.CANX_REASON}) MERGE (canx_loc:canx_loc{location: coalesce(event.CANCELLATION_LOCATION,’<unknown>’)}) MERGE (train)[:OPERATED_BY]!->(toc) MERGE (canx_loc)!<-[:CANCELLED_AT{reason:event.CANX_REASON, time:event.CANX_TIMESTAMP}]-(train)-[:CANCELLED_BECAUSE]!->(canx_reason)” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 65

Slide 65

Graph relationships @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 66

Slide 66

@rmoff @confluentinc Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 67

Slide 67

Event-driven Alerting - logic TRAIN_MOVEMENTS @rmoff @confluentinc Time ALERT_CONFIG Key/Value state CREATE STREAM TRAINS_DELAYED_ALERT AS SELECT […] FROM TRAIN_MOVEMENTS T TRAIN_DELAYED_ALERT Time INNER JOIN ALERT_CONFIG A ON T.LOC_NLCDESC = A.STATION WHERE TIMETABLE_VARIATION > A.ALERT_OVER_MINS; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 68

Slide 68

@rmoff @confluentinc Sending messages to Telegram curl -X POST -H ‘Content-Type: application/json’ -d ‘{“chat_id”: “-364377679”, “text”: “This is a test from curl”}’ https:”//api.telegram.org/botxxxxx/sendMessage { } “connector.class”: “io.confluent.connect.http.HttpSinkConnector”, “request.method”: “post”, “http.api.url”: “https:!//api.telegram.org/[…]/sendMessage”, “headers”: “Content-Type: application/json”, “topics”: “TRAINS_DELAYED_ALERT_TG”, “tasks.max”: “1”, “batch.prefix”: “{“chat_id”:”-364377679”,”parse_mode”: “markdown”,”, “batch.suffix”: “}”, “batch.max.size”: “1”, “regex.patterns”:”.\{MESSAGE=(.)\}.*”, “regex.replacements”: “”text”:”$1”“, “regex.separator”: “~”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” Kafka Kafka Connect HTTP Sink Telegram 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 69

Slide 69

Event-driven Alerting - config @rmoff @confluentinc Compacted topic $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”45”} EOF CREATE TABLE ALERT_CONFIG (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’, KEY=’STATION’); CREATE STREAM ALERT_CONFIG_STREAM (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’); ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 45 $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”20”} EOF ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; LEEDS | 20 ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 20 *

  • until topic compaction runs 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 70

Slide 70

@rmoff @confluentinc Running it & monitoring & maintenance 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 71

Slide 71

Consumer lag @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 72

Slide 72

System health & Throughput @rmoff @confluentinc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 73

Slide 73

@rmoff @confluentinc Restart failing connectors rmoff@proxmox01 > crontab -l “*/5 * * * * /home/rmoff/restart_failed_connector_tasks.sh rmoff@proxmox01 > cat /home/rmoff/restart_failed_connector_tasks.sh “#!/usr/bin/env bash # @rmoff / June 6, 2019

Restart any connector tasks that are FAILED curl -s “http:”//localhost:8083/connectors” | \ jq ‘.[]’ | \ xargs -I{connector_name} curl -s “http:”//localhost:8083/connectors/”{connector_name}”/status” | \ jq -c -M ‘[select(.tasks[].state”==”FAILED”) | .name,”§±§”,.tasks[].id]’ | \ grep -v “[]”| \ sed -e ‘s/^[“”//g’| sed -e ‘s/”,”§±§”,//tasks”//g’|sed -e ‘s/]$”//g’| \ xargs -I{connector_and_task} curl -X POST “http:”//localhost:8083/connectors/”{connector_and_task}”/ restart”

🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 74

Slide 74

Is everything running? @rmoff @confluentinc http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 75

Slide 75

@rmoff @confluentinc check_latest_timestamp.sh Start at latest Just read one message message kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -o-1 -c1 -C | \ jq ‘.header.msg_queue_timestamp’ | \ sed -e ‘s/”“//g’ | \ sed -e ‘s/000$”//g’ | \ Convert from epoch to humanxargs -Ifoo date “—date=@foo readable timestamp format $ ./check_latest_timestamp.sh Fri Jul 5 16:28:09 BST 2019 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 76

Slide 76

Kafka Connect @rmoff @confluentinc Kafka Kafka Connect KSQL ✅ Stream raw data ✅ Stream enriched data ✅ Historic data & data replay 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 77

Slide 77

ig nf Co Co nf ig @rmoff @confluentinc SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 78

Slide 78

Fully Managed Kafka as a Service

Slide 79

Slide 79

Free Books! @rmoff @confluentinc http://cnfl.io/book-bundle 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 80

Slide 80

@rmoff @confluentinc #EOF 💬 Join the Confluent Community Slack group at http://cnfl.io/slack https://talks.rmoff.net