🚒 All at Sea with Streams - Using Kafka to Detect Patterns in the Behaviour of Ships

A presentation at Kafka Summit Americas in September 2021 in by Robin Moffatt

Slide 1

Slide 1

🚒 All at Sea with Streams Using Kafka to Detect Patterns in the Behaviour of Ships #kafkasummit @rmoff

Slide 2

Slide 2

Further reading β€’ This talk is built on concepts explored in detail in these two blogs: β€’ https://www.confluent.co.uk/blog/streaming-etl-andanalytics-for-real-time-location-tracking/ β€’ https://www.confluent.co.uk/blog/streaming-data-withconfluent-and-ksqldb-for-new-use-cases-with-ais/ @rmoff

Slide 3

Slide 3

Planes @rmoff

Slide 4

Slide 4

Trains @rmoff

Slide 5

Slide 5

Automobiles @rmoff

Slide 6

Slide 6

…and Ships! @rmoff

Slide 7

Slide 7

@rmoff

Slide 8

Slide 8

@rmoff

Slide 9

Slide 9

Handling different messages types in the same stream @rmoff

Slide 10

Slide 10

@rmoff

Slide 11

Slide 11

@rmoff

Slide 12

Slide 12

Use ksqlDB to route messages based on type ksql> CREATE OR REPLACE STREAM AIS_MSG_TYPE_1_2_3 WITH (FORMAT=’AVRO’) AS SELECT CAST(EXTRACTJSONFIELD(msg,’$.type’) AS INT) AS msg_type, CAST(EXTRACTJSONFIELD(msg,’$.mmsi’) AS VARCHAR) AS mmsi, CAST(EXTRACTJSONFIELD(msg,’$.status_text’) AS VARCHAR) AS status_text, CAST(EXTRACTJSONFIELD(msg,’$.speed’) AS DOUBLE) AS speed, CAST(EXTRACTJSONFIELD(msg,’$.course’) AS DOUBLE) AS course, CAST(EXTRACTJSONFIELD(msg,’$.heading’) AS INT) AS heading FROM AIS_RAW WHERE EXTRACTJSONFIELD(msg,’$.type’) IN (β€˜1’ ,’2’ ,’3’ ,’18’ ,’27’) PARTITION BY CAST(EXTRACTJSONFIELD(msg,’$.mmsi’) AS VARCHAR); @rmoff

Slide 13

Slide 13

@rmoff

Slide 14

Slide 14

ksql> CREATE OR REPLACE STREAM AIS_MSG_TYPE_5 WITH (FORMAT=’AVRO’) SELECT CAST(EXTRACTJSONFIELD(msg,’$.type’) AS INT) CAST(EXTRACTJSONFIELD(msg,’$.mmsi’) AS VARCHAR) CAST(EXTRACTJSONFIELD(msg,’$.callsign’) AS VARCHAR) CAST(EXTRACTJSONFIELD(msg,’$.shipname’) AS VARCHAR) CONCAT(CAST(EXTRACTJSONFIELD(msg,’$.shipname’) AS VARCHAR), ’ (β€˜, CAST(EXTRACTJSONFIELD(msg,’$.callsign’) AS VARCHAR), β€˜)’) CAST(EXTRACTJSONFIELD(msg,’$.shiptype_text’) AS VARCHAR) CAST(EXTRACTJSONFIELD(msg,’$.destination’) AS VARCHAR) FROM AIS_RAW WHERE EXTRACTJSONFIELD(msg,’$.type’) = β€˜5’ PARTITION BY CAST(EXTRACTJSONFIELD(msg,’$.mmsi’) AS VARCHAR); AS AS AS AS AS msg_type, mmsi, callsign, shipname_raw, AS shipname, AS shiptype_text, AS destination @rmoff

Slide 15

Slide 15

Build a materialized view of state from a Kafka topic ksql> CREATE TABLE SHIP_INFO AS SELECT MMSI, MAX(ROWTIME) AS LAST_INFO_PING_TS, LATEST_BY_OFFSET(SHIPNAME) AS SHIPNAME, LATEST_BY_OFFSET(DRAUGHT) AS DRAUGHT, LATEST_BY_OFFSET(DESTINATION) AS DESTINATION FROM AIS_MSG_TYPE_5 GROUP BY MMSI EMIT CHANGES; ksql> SELECT MMSI, TIMESTAMPTOSTRING(LAST_INFO_PING_TS,’HH:mm:ss’,’Europe/London’) AS LAST_INFO_PING_TS, SHIPNAME, DRAUGHT, DESTINATION FROM SHIP_INFO WHERE MMSI = β€˜255805587’; +β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+ |MMSI |LAST_INFO_PING_TS |SHIPNAME |DRAUGHT |DESTINATION | +β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+ |255805587 |11:17:17 |NCL AVEROY (CQHL) |7.5 |SVELGEN | +β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+ @rmoff

Slide 16

Slide 16

@rmoff

Slide 17

Slide 17

AIS_MSG_TYPE_1_2_3 @rmoff

Slide 18

Slide 18

SHIP_INFO @rmoff

Slide 19

Slide 19

AIS_MSG_TYPE_1_2_3 SHIP_INFO @rmoff

Slide 20

Slide 20

Join the stream to the table ksql> CREATE STREAM SHIP_STATUS_REPORTS WITH (KAFKA_TOPIC=’SHIP_STATUS_REPORTS_V00’) AS SELECT STATUS_REPORT.ROWTIME AS STATUS_TS, STATUS_REPORT., SHIP_INFO. FROM AIS_MSG_TYPE_1_2_3 STATUS_REPORT LEFT JOIN SHIP_INFO SHIP_INFO ON STATUS_REPORT.MMSI = SHIP_INFO.MMSI ; @rmoff

Slide 21

Slide 21

Ship movements (msg type 1) enriched with ship details (msg type 5) ksql> SELECT TIMESTAMPTOSTRING(STATUS_TS,’HH:mm:ss’,’Europe/Oslo’) AS STATUS_TS, SHIP_LOCATION, STATUS_REPORT_STATUS_TEXT, SHIP_INFO_SHIPNAME, SHIP_INFO_DRAUGHT, SHIP_INFO_DESTINATION_LIST FROM SHIP_STATUS_REPORTS WHERE SHIP_INFO_MMSI = β€˜255805587’ EMIT CHANGES; +β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+ |STATUS_TS |SHIP_LOCATION |STATUS_REPORT_STATUS_TEXT |SHIP_INFO_SHIPNAME |SHIP_INFO_DRAUGHT |SHIP_INFO_DESTINATION | +β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+ |11:37:47 |{lat=61.773223, lon=5.294023} |Moored |NCL AVEROY (CQHL) |7.5 |[SVELGEN] | […] |17:16:45 |{lat=61.939807, lon=5.143242} |Under way using engine |NCL AVEROY (CQHL) |7.5 |[FLORO] | […] |23:05:25 |{lat=62.468148, lon=6.137387} |Under way using engine |NCL AVEROY (CQHL) |8.1 |[ALESUND] | […] |23:11:04 |{lat=62.468122, lon=6.13745} |Under way using engine |NCL AVEROY (CQHL) |8.1 |[ALESUND] | […] |23:35:47 |{lat=62.468125, lon=6.137473} |Moored |NCL AVEROY (CQHL) |8.1 |[ALESUND] | […] @rmoff

Slide 22

Slide 22

Stream the enriched Kafka topic to Elasticsearch @rmoff

Slide 23

Slide 23

@rmoff

Slide 24

Slide 24

Identifying Transshipping Behaviour Miller NA, Roan A, Hochberg T, Amos J and Kroodsma DA (2018) Identifying Global Patterns of Transshipment Behavior. Front. Mar. Sci. 5:240. doi: 10.3389/fmars.2018.00240 @rmoff

Slide 25

Slide 25

Credit: Francisco Blaha

Slide 26

Slide 26

Additional ship data fi https://global shingwatch.org/data-download/datasets/public-miller-frontiers-marine-science-2018 @rmoff

Slide 27

Slide 27

One-time CSV data load into Kafka $ ccloud kafka topic create reefers $ tr -d β€˜\r’ < Reefer.csv | \ awk -F”;” ’ { print $2 β€œ\x1c” $1 } β€˜| \ docker run β€”rm β€”interactive edenhill/kafkacat:1.6.0 \ kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \ -b $CCLOUD_BROKER:9092 \ -X sasl.username=”$CCLOUD_API_KEY” \ -X sasl.password=”$CCLOUD_API_SECRET” \ -t reefers -K$’\x1c’ -P $ ccloud kafka topic create transshipment-vessels $ tr -d β€˜\r’ < transshipment-vessels-v20170717.csv | \ sed β€œs/,/$(printf β€˜\x1c’)/” | \ docker run β€”rm β€”interactive edenhill/kafkacat:1.6.0 \ kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \ -b $CCLOUD_BROKER:9092 \ -X sasl.username=”$CCLOUD_API_KEY” \ -X sasl.password=”$CCLOUD_API_SECRET” \ -t transshipment-vessels -K$’\x1c’ -P @rmoff

Slide 28

Slide 28

Pattern matching (part 1) Miller NA, Roan A, Hochberg T, Amos J and Kroodsma DA (2018) Identifying Global Patterns of Transshipment Behavior. Front. Mar. Sci. 5:240. doi: 10.3389/fmars.2018.00240 @rmoff

Slide 29

Slide 29

Pattern matching with ksqlDB @rmoff

Slide 30

Slide 30

Preparing the ship movements stream CREATE STREAM SHIP_STATUS_REPORTS WITH (KAFKA_TOPIC=’SHIP_STATUS_REPORTS_V00’) AS SELECT STATUS_REPORT.ROWTIME AS STATUS_TS, STATUS_REPORT., SHIP_INFO., 1 AS DUMMY FROM AIS_MSG_TYPE_1_2_3 STATUS_REPORT LEFT JOIN SHIP_INFO SHIP_INFO ON STATUS_REPORT.MMSI = SHIP_INFO.MMSI; CREATE STREAM REEFER_MOVEMENTS AS SELECT * FROM SHIP_STATUS_REPORTS WHERE (SHIP_INFO_IS_TRANSSHIPPER=1 OR SHIP_INFO_IS_REEFER=1); @rmoff

Slide 31

Slide 31

Stream-Stream Join with GEO_DISTANCE CREATE STREAM REEFERS_AND_VESSELS_WITHIN_500M WITH (KAFKA_TOPIC=’REEFERS_AND_VESSELS_WITHIN_500M_V00’) AS SELECT V.STATUS_REPORT_MMSI AS FISHING_VESSEL_MMSI, R.STATUS_REPORT_MMSI AS REEFER_MMSI, V.SHIP_LOCATION AS FISHING_VESSEL_LOCATION, R.SHIP_LOCATION AS REEFER_LOCATION, GEO_DISTANCE(V.STATUS_REPORT_LAT, V.STATUS_REPORT_LON, R.STATUS_REPORT_LAT, R.STATUS_REPORT_LON, β€˜KM’) AS DISTANCE_KM, CASE WHEN GEO_DISTANCE(V.STATUS_REPORT_LAT, V.STATUS_REPORT_LON, R.STATUS_REPORT_LAT, R.STATUS_REPORT_LON, β€˜KM’) < 0.5 AND ( R.STATUS_REPORT_SPEED <2 AND V.STATUS_REPORT_SPEED < 2) THEN 1 ELSE 0 END AS IN_RANGE_AND_SPEED FROM SHIP_STATUS_REPORTS V INNER JOIN REEFER_MOVEMENTS R WITHIN 1 MINUTE ON R.DUMMY = V.DUMMY WHERE V.SHIP_INFO_SHIPTYPE_TEXT = β€˜Fishing’ AND V.STATUS_REPORT_MMSI != R.STATUS_REPORT_MMSI AND GEO_DISTANCE(V.STATUS_REPORT_LAT, V.STATUS_REPORT_LON, R.STATUS_REPORT_LAT, R.STATUS_REPORT_LON, β€˜KM’) < 1 PARTITION BY V.STATUS_REPORT_MMSI; @rmoff

Slide 32

Slide 32

Vessels identified as being near each other +β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”-+ |FISHING_VESSEL_MMSI |REEFER_MMSI |FISHING_VESSEL_LOCATION |REEFER_LOCATION |DISTANCE_KM |IN_RANGE_AND_SPEED | +β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”β€”β€”β€”-+ |258036000 |257430000 |{lat=62.324023, lon=5.66427} |{lat=62.321373, lon=5.653208} |0.642853083812366 |0 | |273844800 |273440860 |{lat=69.728918, lon=30.034538} |{lat=69.72819, lon=30.03179} |0.1332701769571591 |1 | |273433220 |273440860 |{lat=69.72493, lon=30.023542} |{lat=69.72819, lon=30.03179} |0.4820709202116538 |1 | |273433400 |273440860 |{lat=69.727357, lon=30.03141} |{lat=69.72819, lon=30.03179} |0.09377524348557457 |1 | |257810500 |258211000 |{lat=62.55565, lon=6.26555} |{lat=62.548723, lon=6.276892} |0.9649975921607864 |0 | @rmoff

Slide 33

Slide 33

Vessels identified using ksqlDB streamed to Elasticsearch and Kibana @rmoff

Slide 34

Slide 34

Pattern matching (part 2) Miller NA, Roan A, Hochberg T, Amos J and Kroodsma DA (2018) Identifying Global Patterns of Transshipment Behavior. Front. Mar. Sci. 5:240. doi: 10.3389/fmars.2018.00240 @rmoff

Slide 35

Slide 35

Session windowed aggregations with ksqlDB SELECT FISHING_VESSEL_MMSI, REEFER_MMSI, TIMESTAMPTOSTRING(MIN(ROWTIME),’HH:mm:ss’,’Europe/Oslo’) AS FIRST_TIME, TIMESTAMPTOSTRING(MAX(ROWTIME),’HH:mm:ss’,’Europe/Oslo’) AS LAST_TIME, (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 AS DIFF_SEC, MAX(DISTANCE_KM) AS FURTHEST_DISTANCE, COUNT(*) AS EVENT_CT FROM REEFERS_AND_VESSELS_WITHIN_1KM WINDOW SESSION (10 MINUTES) WHERE IN_RANGE_AND_SPEED=1 GROUP BY FISHING_VESSEL_MMSI, REEFER_MMSI EMIT CHANGES; +β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”+β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”-+ |FISHING_VESSEL_MMSI |REEFER_MMSI |FIRST_TIME |LAST_TIME |DIFF_SEC |FURTHEST_DISTANCE |EVENT_CT | +β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”β€”+β€”β€”β€”β€”β€”-+β€”β€”β€”β€”β€”+β€”β€”β€”β€”-+β€”β€”β€”β€”β€”β€”β€”β€”β€”-+β€”β€”β€”β€”-+ |273433220 |273440860 |11:57:14 |12:33:13 |2159 |0.4846046047267392 |13 | |258036000 |257430000 |13:01:07 |13:04:54 |227 |0.4997634317289095 |4 | |257888000 |258211000 |11:57:54 |13:33:45 |5751 |0.3039245344432804 |58 | @rmoff

Slide 36

Slide 36

CREATE TABLE REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR WITH (KAFKA_TOPIC=’REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_V00’) AS SELECT FISHING_VESSEL_MMSI, REEFER_MMSI, STRUCT(β€œlat”:=LATEST_BY_OFFSET(FISHING_VESSEL_LAT),”lon”:=LATEST_BY_OFFSET(FISHING_VESSEL_LON)) LATEST_FISHING_VESSEL_LOCATION, STRUCT(β€œlat”:=LATEST_BY_OFFSET(REEFER_LAT), β€œlon”:=LATEST_BY_OFFSET(REEFER_LON)) MIN(DISTANCE_KM) MAX(DISTANCE_KM) MIN(FISHING_VESSEL_TS) MAX(FISHING_VESSEL_TS) (MAX(FISHING_VESSEL_TS) - MIN(FISHING_VESSEL_TS)) / 1000 FROM REEFERS_AND_VESSELS_WITHIN_1KM WINDOW SESSION (10 MINUTES) WHERE IN_RANGE_AND_SPEED = 1 GROUP BY FISHING_VESSEL_MMSI, REEFER_MMSI HAVING (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 > 7200; AS AS AS AS AS AS AS LATEST_REEFER_LOCATION, CLOSEST_DISTANCE_KM, FURTHEST_DISTANCE_KM, FIRST_TS, LAST_TS, DIFF_SEC @rmoff

Slide 37

Slide 37

@rmoff

Slide 38

Slide 38

@rmoff

Slide 39

Slide 39

Recap @rmoff

Slide 40

Slide 40

@rmoff

Slide 41

Slide 41

@rmoff

Slide 42

Slide 42

@rmoff

Slide 43

Slide 43

@rmoff

Slide 44

Slide 44

@rmoff

Slide 45

Slide 45

@rmoff

Slide 46

Slide 46

@rmoff

Slide 47

Slide 47

@rmoff

Slide 48

Slide 48

s e l c i t r a eep-dive D β€’ β€’ β€’ β€’ β€’ ka? f a K e ds h c n a e r p T A d s i e lat e R What . s v g min a e per r e t e S K t o n o e Z v E ut o h t i w fka a a K k f n a i K s : e t f ante KRa r a u G & ns o i t c a s n a Tr ge a r o t S & ng Processi tals n e m a d Fun β€’ β€’ β€’ β€’ β€’ e c n a m r o f r Kafka Pe a k f a K e v i t ms e t s y S Cloud-na e s ba a t a D g n Streami fka a K e h c a p ls a n Testing A r e t n I s fka’ a K e r o l Exp β€’ β€’ β€’ β€’ β€’ Over 10 Apache K afka 101 Kafka Co nnect 10 1 Kafka Str eams 101 ksqlDB 1 01 Inside ks qlDB hours of β€’ β€’ β€’ β€’ f ree cou rses Spring F ramewo rk and K Building afka Data Pip elines wi Event So th Kafka urcing w ith Kafka Data Me sh 101 Plus: Hands-on Quick Starts and Client Language Guides + Event Streaming Patterns + More fl developer.con uent.io

Slide 49

Slide 49

#EOF @rmoff http://youtube.com/rmoff