π’ All at Sea with Streams Using Kafka to Detect Patterns in the Behaviour of Ships #kafkasummit @rmoff
A presentation at Kafka Summit Americas in September 2021 in by Robin Moffatt
π’ All at Sea with Streams Using Kafka to Detect Patterns in the Behaviour of Ships #kafkasummit @rmoff
Further reading β’ This talk is built on concepts explored in detail in these two blogs: β’ https://www.confluent.co.uk/blog/streaming-etl-andanalytics-for-real-time-location-tracking/ β’ https://www.confluent.co.uk/blog/streaming-data-withconfluent-and-ksqldb-for-new-use-cases-with-ais/ @rmoff
Planes @rmoff
Trains @rmoff
Automobiles @rmoff
β¦and Ships! @rmoff
@rmoff
@rmoff
Handling different messages types in the same stream @rmoff
@rmoff
@rmoff
Use ksqlDB to route messages based on type ksql> CREATE OR REPLACE STREAM AIS_MSG_TYPE_1_2_3 WITH (FORMAT=βAVROβ) AS SELECT CAST(EXTRACTJSONFIELD(msg,β$.typeβ) AS INT) AS msg_type, CAST(EXTRACTJSONFIELD(msg,β$.mmsiβ) AS VARCHAR) AS mmsi, CAST(EXTRACTJSONFIELD(msg,β$.status_textβ) AS VARCHAR) AS status_text, CAST(EXTRACTJSONFIELD(msg,β$.speedβ) AS DOUBLE) AS speed, CAST(EXTRACTJSONFIELD(msg,β$.courseβ) AS DOUBLE) AS course, CAST(EXTRACTJSONFIELD(msg,β$.headingβ) AS INT) AS heading FROM AIS_RAW WHERE EXTRACTJSONFIELD(msg,β$.typeβ) IN (β1β ,β2β ,β3β ,β18β ,β27β) PARTITION BY CAST(EXTRACTJSONFIELD(msg,β$.mmsiβ) AS VARCHAR); @rmoff
@rmoff
ksql> CREATE OR REPLACE STREAM AIS_MSG_TYPE_5 WITH (FORMAT=βAVROβ) SELECT CAST(EXTRACTJSONFIELD(msg,β$.typeβ) AS INT) CAST(EXTRACTJSONFIELD(msg,β$.mmsiβ) AS VARCHAR) CAST(EXTRACTJSONFIELD(msg,β$.callsignβ) AS VARCHAR) CAST(EXTRACTJSONFIELD(msg,β$.shipnameβ) AS VARCHAR) CONCAT(CAST(EXTRACTJSONFIELD(msg,β$.shipnameβ) AS VARCHAR), β (β, CAST(EXTRACTJSONFIELD(msg,β$.callsignβ) AS VARCHAR), β)β) CAST(EXTRACTJSONFIELD(msg,β$.shiptype_textβ) AS VARCHAR) CAST(EXTRACTJSONFIELD(msg,β$.destinationβ) AS VARCHAR) FROM AIS_RAW WHERE EXTRACTJSONFIELD(msg,β$.typeβ) = β5β PARTITION BY CAST(EXTRACTJSONFIELD(msg,β$.mmsiβ) AS VARCHAR); AS AS AS AS AS msg_type, mmsi, callsign, shipname_raw, AS shipname, AS shiptype_text, AS destination @rmoff
Build a materialized view of state from a Kafka topic ksql> CREATE TABLE SHIP_INFO AS SELECT MMSI, MAX(ROWTIME) AS LAST_INFO_PING_TS, LATEST_BY_OFFSET(SHIPNAME) AS SHIPNAME, LATEST_BY_OFFSET(DRAUGHT) AS DRAUGHT, LATEST_BY_OFFSET(DESTINATION) AS DESTINATION FROM AIS_MSG_TYPE_5 GROUP BY MMSI EMIT CHANGES; ksql> SELECT MMSI, TIMESTAMPTOSTRING(LAST_INFO_PING_TS,βHH:mm:ssβ,βEurope/Londonβ) AS LAST_INFO_PING_TS, SHIPNAME, DRAUGHT, DESTINATION FROM SHIP_INFO WHERE MMSI = β255805587β; +βββββ+βββββββββ+βββββββββ+ββββ+ββββββ+ |MMSI |LAST_INFO_PING_TS |SHIPNAME |DRAUGHT |DESTINATION | +βββββ+βββββββββ+βββββββββ+ββββ+ββββββ+ |255805587 |11:17:17 |NCL AVEROY (CQHL) |7.5 |SVELGEN | +βββββ+βββββββββ+βββββββββ+ββββ+ββββββ+ @rmoff
@rmoff
AIS_MSG_TYPE_1_2_3 @rmoff
SHIP_INFO @rmoff
AIS_MSG_TYPE_1_2_3 SHIP_INFO @rmoff
Join the stream to the table ksql> CREATE STREAM SHIP_STATUS_REPORTS WITH (KAFKA_TOPIC=βSHIP_STATUS_REPORTS_V00β) AS SELECT STATUS_REPORT.ROWTIME AS STATUS_TS, STATUS_REPORT., SHIP_INFO. FROM AIS_MSG_TYPE_1_2_3 STATUS_REPORT LEFT JOIN SHIP_INFO SHIP_INFO ON STATUS_REPORT.MMSI = SHIP_INFO.MMSI ; @rmoff
Ship movements (msg type 1) enriched with ship details (msg type 5) ksql> SELECT TIMESTAMPTOSTRING(STATUS_TS,βHH:mm:ssβ,βEurope/Osloβ) AS STATUS_TS, SHIP_LOCATION, STATUS_REPORT_STATUS_TEXT, SHIP_INFO_SHIPNAME, SHIP_INFO_DRAUGHT, SHIP_INFO_DESTINATION_LIST FROM SHIP_STATUS_REPORTS WHERE SHIP_INFO_MMSI = β255805587β EMIT CHANGES; +βββββ-+ββββββββββββββββ+βββββββββββββ-+ββββββββββ-+βββββββββ-+βββββββββββ+ |STATUS_TS |SHIP_LOCATION |STATUS_REPORT_STATUS_TEXT |SHIP_INFO_SHIPNAME |SHIP_INFO_DRAUGHT |SHIP_INFO_DESTINATION | +βββββ-+ββββββββββββββββ+βββββββββββββ-+ββββββββββ-+βββββββββ-+βββββββββββ+ |11:37:47 |{lat=61.773223, lon=5.294023} |Moored |NCL AVEROY (CQHL) |7.5 |[SVELGEN] | [β¦] |17:16:45 |{lat=61.939807, lon=5.143242} |Under way using engine |NCL AVEROY (CQHL) |7.5 |[FLORO] | [β¦] |23:05:25 |{lat=62.468148, lon=6.137387} |Under way using engine |NCL AVEROY (CQHL) |8.1 |[ALESUND] | [β¦] |23:11:04 |{lat=62.468122, lon=6.13745} |Under way using engine |NCL AVEROY (CQHL) |8.1 |[ALESUND] | [β¦] |23:35:47 |{lat=62.468125, lon=6.137473} |Moored |NCL AVEROY (CQHL) |8.1 |[ALESUND] | [β¦] @rmoff
Stream the enriched Kafka topic to Elasticsearch @rmoff
@rmoff
Identifying Transshipping Behaviour Miller NA, Roan A, Hochberg T, Amos J and Kroodsma DA (2018) Identifying Global Patterns of Transshipment Behavior. Front. Mar. Sci. 5:240. doi: 10.3389/fmars.2018.00240 @rmoff
Credit: Francisco Blaha
Additional ship data fi https://global shingwatch.org/data-download/datasets/public-miller-frontiers-marine-science-2018 @rmoff
One-time CSV data load into Kafka $ ccloud kafka topic create reefers $ tr -d β\rβ < Reefer.csv | \ awk -Fβ;β β { print $2 β\x1cβ $1 } β| \ docker run βrm βinteractive edenhill/kafkacat:1.6.0 \ kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \ -b $CCLOUD_BROKER:9092 \ -X sasl.username=β$CCLOUD_API_KEYβ \ -X sasl.password=β$CCLOUD_API_SECRETβ \ -t reefers -K$β\x1cβ -P $ ccloud kafka topic create transshipment-vessels $ tr -d β\rβ < transshipment-vessels-v20170717.csv | \ sed βs/,/$(printf β\x1cβ)/β | \ docker run βrm βinteractive edenhill/kafkacat:1.6.0 \ kafkacat -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \ -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \ -b $CCLOUD_BROKER:9092 \ -X sasl.username=β$CCLOUD_API_KEYβ \ -X sasl.password=β$CCLOUD_API_SECRETβ \ -t transshipment-vessels -K$β\x1cβ -P @rmoff
Pattern matching (part 1) Miller NA, Roan A, Hochberg T, Amos J and Kroodsma DA (2018) Identifying Global Patterns of Transshipment Behavior. Front. Mar. Sci. 5:240. doi: 10.3389/fmars.2018.00240 @rmoff
Pattern matching with ksqlDB @rmoff
Preparing the ship movements stream CREATE STREAM SHIP_STATUS_REPORTS WITH (KAFKA_TOPIC=βSHIP_STATUS_REPORTS_V00β) AS SELECT STATUS_REPORT.ROWTIME AS STATUS_TS, STATUS_REPORT., SHIP_INFO., 1 AS DUMMY FROM AIS_MSG_TYPE_1_2_3 STATUS_REPORT LEFT JOIN SHIP_INFO SHIP_INFO ON STATUS_REPORT.MMSI = SHIP_INFO.MMSI; CREATE STREAM REEFER_MOVEMENTS AS SELECT * FROM SHIP_STATUS_REPORTS WHERE (SHIP_INFO_IS_TRANSSHIPPER=1 OR SHIP_INFO_IS_REEFER=1); @rmoff
Stream-Stream Join with GEO_DISTANCE CREATE STREAM REEFERS_AND_VESSELS_WITHIN_500M WITH (KAFKA_TOPIC=βREEFERS_AND_VESSELS_WITHIN_500M_V00β) AS SELECT V.STATUS_REPORT_MMSI AS FISHING_VESSEL_MMSI, R.STATUS_REPORT_MMSI AS REEFER_MMSI, V.SHIP_LOCATION AS FISHING_VESSEL_LOCATION, R.SHIP_LOCATION AS REEFER_LOCATION, GEO_DISTANCE(V.STATUS_REPORT_LAT, V.STATUS_REPORT_LON, R.STATUS_REPORT_LAT, R.STATUS_REPORT_LON, βKMβ) AS DISTANCE_KM, CASE WHEN GEO_DISTANCE(V.STATUS_REPORT_LAT, V.STATUS_REPORT_LON, R.STATUS_REPORT_LAT, R.STATUS_REPORT_LON, βKMβ) < 0.5 AND ( R.STATUS_REPORT_SPEED <2 AND V.STATUS_REPORT_SPEED < 2) THEN 1 ELSE 0 END AS IN_RANGE_AND_SPEED FROM SHIP_STATUS_REPORTS V INNER JOIN REEFER_MOVEMENTS R WITHIN 1 MINUTE ON R.DUMMY = V.DUMMY WHERE V.SHIP_INFO_SHIPTYPE_TEXT = βFishingβ AND V.STATUS_REPORT_MMSI != R.STATUS_REPORT_MMSI AND GEO_DISTANCE(V.STATUS_REPORT_LAT, V.STATUS_REPORT_LON, R.STATUS_REPORT_LAT, R.STATUS_REPORT_LON, βKMβ) < 1 PARTITION BY V.STATUS_REPORT_MMSI; @rmoff
Vessels identified as being near each other +ββββββββββ+ββββββ+βββββββββββββββ-+βββββββββββββββ+βββββββββββ+βββββββββ-+ |FISHING_VESSEL_MMSI |REEFER_MMSI |FISHING_VESSEL_LOCATION |REEFER_LOCATION |DISTANCE_KM |IN_RANGE_AND_SPEED | +ββββββββββ+ββββββ+βββββββββββββββ-+βββββββββββββββ+βββββββββββ+βββββββββ-+ |258036000 |257430000 |{lat=62.324023, lon=5.66427} |{lat=62.321373, lon=5.653208} |0.642853083812366 |0 | |273844800 |273440860 |{lat=69.728918, lon=30.034538} |{lat=69.72819, lon=30.03179} |0.1332701769571591 |1 | |273433220 |273440860 |{lat=69.72493, lon=30.023542} |{lat=69.72819, lon=30.03179} |0.4820709202116538 |1 | |273433400 |273440860 |{lat=69.727357, lon=30.03141} |{lat=69.72819, lon=30.03179} |0.09377524348557457 |1 | |257810500 |258211000 |{lat=62.55565, lon=6.26555} |{lat=62.548723, lon=6.276892} |0.9649975921607864 |0 | @rmoff
Vessels identified using ksqlDB streamed to Elasticsearch and Kibana @rmoff
Pattern matching (part 2) Miller NA, Roan A, Hochberg T, Amos J and Kroodsma DA (2018) Identifying Global Patterns of Transshipment Behavior. Front. Mar. Sci. 5:240. doi: 10.3389/fmars.2018.00240 @rmoff
Session windowed aggregations with ksqlDB SELECT FISHING_VESSEL_MMSI, REEFER_MMSI, TIMESTAMPTOSTRING(MIN(ROWTIME),βHH:mm:ssβ,βEurope/Osloβ) AS FIRST_TIME, TIMESTAMPTOSTRING(MAX(ROWTIME),βHH:mm:ssβ,βEurope/Osloβ) AS LAST_TIME, (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 AS DIFF_SEC, MAX(DISTANCE_KM) AS FURTHEST_DISTANCE, COUNT(*) AS EVENT_CT FROM REEFERS_AND_VESSELS_WITHIN_1KM WINDOW SESSION (10 MINUTES) WHERE IN_RANGE_AND_SPEED=1 GROUP BY FISHING_VESSEL_MMSI, REEFER_MMSI EMIT CHANGES; +ββββββββββ+ββββββ+βββββ-+βββββ+ββββ-+βββββββββ-+ββββ-+ |FISHING_VESSEL_MMSI |REEFER_MMSI |FIRST_TIME |LAST_TIME |DIFF_SEC |FURTHEST_DISTANCE |EVENT_CT | +ββββββββββ+ββββββ+βββββ-+βββββ+ββββ-+βββββββββ-+ββββ-+ |273433220 |273440860 |11:57:14 |12:33:13 |2159 |0.4846046047267392 |13 | |258036000 |257430000 |13:01:07 |13:04:54 |227 |0.4997634317289095 |4 | |257888000 |258211000 |11:57:54 |13:33:45 |5751 |0.3039245344432804 |58 | @rmoff
CREATE TABLE REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR WITH (KAFKA_TOPIC=βREEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_V00β) AS SELECT FISHING_VESSEL_MMSI, REEFER_MMSI, STRUCT(βlatβ:=LATEST_BY_OFFSET(FISHING_VESSEL_LAT),βlonβ:=LATEST_BY_OFFSET(FISHING_VESSEL_LON)) LATEST_FISHING_VESSEL_LOCATION, STRUCT(βlatβ:=LATEST_BY_OFFSET(REEFER_LAT), βlonβ:=LATEST_BY_OFFSET(REEFER_LON)) MIN(DISTANCE_KM) MAX(DISTANCE_KM) MIN(FISHING_VESSEL_TS) MAX(FISHING_VESSEL_TS) (MAX(FISHING_VESSEL_TS) - MIN(FISHING_VESSEL_TS)) / 1000 FROM REEFERS_AND_VESSELS_WITHIN_1KM WINDOW SESSION (10 MINUTES) WHERE IN_RANGE_AND_SPEED = 1 GROUP BY FISHING_VESSEL_MMSI, REEFER_MMSI HAVING (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 > 7200; AS AS AS AS AS AS AS LATEST_REEFER_LOCATION, CLOSEST_DISTANCE_KM, FURTHEST_DISTANCE_KM, FIRST_TS, LAST_TS, DIFF_SEC @rmoff
@rmoff
@rmoff
Recap @rmoff
@rmoff
@rmoff
@rmoff
@rmoff
@rmoff
@rmoff
@rmoff
@rmoff
s e l c i t r a eep-dive D β’ β’ β’ β’ β’ ka? f a K e ds h c n a e r p T A d s i e lat e R What . s v g min a e per r e t e S K t o n o e Z v E ut o h t i w fka a a K k f n a i K s : e t f ante KRa r a u G & ns o i t c a s n a Tr ge a r o t S & ng Processi tals n e m a d Fun β’ β’ β’ β’ β’ e c n a m r o f r Kafka Pe a k f a K e v i t ms e t s y S Cloud-na e s ba a t a D g n Streami fka a K e h c a p ls a n Testing A r e t n I s fkaβ a K e r o l Exp β’ β’ β’ β’ β’ Over 10 Apache K afka 101 Kafka Co nnect 10 1 Kafka Str eams 101 ksqlDB 1 01 Inside ks qlDB hours of β’ β’ β’ β’ f ree cou rses Spring F ramewo rk and K Building afka Data Pip elines wi Event So th Kafka urcing w ith Kafka Data Me sh 101 Plus: Hands-on Quick Starts and Client Language Guides + Event Streaming Patterns + More fl developer.con uent.io
#EOF @rmoff http://youtube.com/rmoff