with Apac he Kafka and KSQL @rmoff Photo by Freddie Collins on Unsplash M T A D d u a r F n o i t c e et
A presentation at London Kafka Meetup in April 2019 in London, UK by Robin Moffatt
with Apac he Kafka and KSQL @rmoff Photo by Freddie Collins on Unsplash M T A D d u a r F n o i t c e et
hoto by Mirza Babic on Unsplash Spotting fraud in realtime @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Photo by Lasaye Hommes on Unsplash
Inbound stream of ATM data @rmoff • Account id • Location • Amount • https://github.com/rmoff/gess ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Demo! ATM Fraud Detection with Apache Kafka and KSQL
Spot patterns within this stream Ac. ID A42 Transaction ID Time xxx116d91d6-ef17 11:56:58 @rmoff ATM Midland A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Spot patterns within this stream Ac. ID A42 Transaction ID Time xxx116d91d6-ef17 11:56:58 ATM Midland A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds Legit Legit ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Spot patterns within this stream Ac. ID A42 Transaction ID Time xxx116d91d6-ef17 11:56:58 ATM Midland A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds Legit Dodgy! Legit ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Spot patterns within this stream Ac. ID A42 Transaction ID Time xxx116d91d6-ef17 11:56:58 ATM Midland A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds Legit Dodgy! Legit ATM Fraud Detection with Apache Kafka and KSQL
Inbound stream of ATM data @rmoff • Account id • Location • Amount • https://github.com/rmoff/gess ATM Fraud Detection with Apache Kafka and KSQL
KSQL : Stream Processing with SQL @rmoff SELECT TXN_ID, ATM, CUSTOMER_NAME, CUSTOMER_PHONE FROM ATM_POSSIBLE_FRAUD; ATM Fraud Detection with Apache Kafka and KSQL
@rmoff ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Elasticsearch Customer details ATM fraud txns with customer details Notification service
@rmoff KSQL is the Streaming SQL Engine for Apache Kafka ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Filter messages with KSQL completedOrders orders → → → → → → → → → → → 01, £10.00, 05, £10.00, 06, £24.00, 02, £12.33, 04, £5.50, → COMPLETE COMPLETE COMPLETE PENDING COMPLETE CREATE STREAM completedOrders AS SELECT * FROM orders WHERE status=’COMPLETE’; → → → → → → → → → → → 01, £10.00, 06, £24.00, 02, £12.33, 04, £5.50, → COMPLETE COMPLETE COMPLETE COMPLETE ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Drop columns with KSQL customer → → → → → → → → → → →→ {“id”:1, {“id”:2, {“id”:3, “name”:”Dana Lidgerton”, “name”:”Milo Wellsman”, “name”:”Dolph Cleeton”, “card”:”5048370182840140} “card”:”3557977885537506} “card”:”3586303633007251} CREATE STREAM customerNoCC AS SELECT ID, NAME customerNoCC FROM customer; → → → → → → → → → → →→ {“id”:1, {“id”:2, {“id”:3, “name”:”Dana Lidgerton”} “name”:”Milo Wellsman”} “name”:”Dolph Cleeton”} ATM Fraud Detection with Apache Kafka and KSQL
Stateful aggregation with KSQL @rmoff customersByCountry customer → → → → → → → → → → →→ {“id”:1, {“id”:2, {“id”:3, “name”:”Dana Lidgerton”, “name”:”Milo Wellsman”, “name”:”Dolph Cleeton”, “country”:”UK”} “country”:”UK”} “country”:”Germany”} CREATE STREAM customersByCountry AS SELECT country, COUNT(*) AS customerCount FROM customer WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY country; → → → → → → → → → → →→ {“country”:”UK”, {“country”:”Germany”, “customerCount”:2} “customerCount”:1} ATM Fraud Detection with Apache Kafka and KSQL
KSQL for Data Transformation @rmoff CREATE STREAM pageviews WITH (PARTITIONS=4, VALUE_FORMAT=’AVRO’) AS SELECT * FROM pageviews_json; ATM Fraud Detection with Apache Kafka and KSQL
@rmoff KSQL in Development and Production Interactive KSQL for development and testing Headless KSQL for Production REST Desired KSQL queries have been identified “Hmm, let me try out this idea…” ATM Fraud Detection with Apache Kafka and KSQL
KSQL supports JOINs @rmoff CREATE STREAM vip_actions AS SELECT userid, page, action FROM clickstream c LEFT JOIN users u ON c.userid = u.user_id WHERE u.level = ‘Platinum’; ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Stream Stream joins Shipments Orders order.id = shipment.order_id Leadtime shipment_ts - order_ts CREATE STREAM MISSED_SLA AS SELECT * FROM ORDER_SHIPMENTS WHERE LEADTIME_HR > 3; ATM Fraud Detection with Apache Kafka and KSQL
Stream Stream joins @rmoff ATM transactions ATM Fraud Detection with Apache Kafka and KSQL
Stream Stream joins @rmoff ATM transactions ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Demo! ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Self-Join (Cartesian product) T Ac. ID Transaction ID A42 A42 A42 T1 Ac. ID Transaction ID A42 A42 A42 Time Time ATM xxx116d91d6-ef17 11:56:58 Midland 116d91d6-ef17 11:58:19 Halifax 09c2f660-ef17 19:31:11 Lloyds ATM xxx116d91d6-ef17 11:56:58 Midland 116d91d6-ef17 11:58:19 Halifax 09c2f660-ef17 19:31:11 Lloyds T2 Ac. ID Transaction ID A42 A42 A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland 116d91d6-ef17 11:58:19 Halifax 09c2f660-ef17 19:31:11 Lloyds ATM Fraud Detection with Apache Kafka and KSQL
Self-Join (Cartesian product) T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland @rmoff T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds ATM_TXNS T1 INNER JOIN ATM_TXNS T2 ON T1.ACCOUNT_ID = T2.ACCOUNT_ID ATM Fraud Detection with Apache Kafka and KSQL
Self-Join (Cartesian product) T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland @rmoff T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds FROM ATM_TXNS T1 INNER JOIN ATM_TXNS T2 WITHIN 10 MINUTES ON T1.ACCOUNT_ID = T2.ACCOUNT_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Self-Join T1 Txn ID T2 Txn ID T1 Time T2 Time T1 ATM T2 ATM 11:56:58 11:58:19 Midland Halifax xxx116d91d6-ef17 116d91d6-ef17 116d91d6-ef17 xxx116d91d6-ef17 11:58:19 11:56:58 Halifax Midland xxx116d91d6-ef17 xxx116d91d6-ef17 11:56:58 11:56:58 Midland Midland 116d91d6-ef17 116d91d6-ef17 11:58:19 Halifax Halifax 11:58:19 ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Self-Join T1 Txn ID T2 Txn ID T1 Time T2 Time T1 ATM T2 ATM 11:56:58 11:58:19 Midland Halifax xxx116d91d6-ef17 116d91d6-ef17 116d91d6-ef17 xxx116d91d6-ef17 11:58:19 11:56:58 Halifax Midland xxx116d91d6-ef17 xxx116d91d6-ef17 11:56:58 11:56:58 Midland Midland 116d91d6-ef17 116d91d6-ef17 11:58:19 Halifax Halifax 11:58:19 Self join on same txn IDs ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Exclude joins on the same txn T1 Txn ID T2 Txn ID T1 Time T2 Time T1 ATM T2 ATM 11:56:58 11:58:19 Midland Halifax 11:56:58 Halifax Midland xxx116d91d6-ef17 116d91d6-ef17 116d91d6-ef17 xxx116d91d6-ef17 11:58:19 WHERE T1.TRANSACTION_ID != T2.TRANSACTION_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Exclude joins on the same txn T1 Txn ID T2 Txn ID T1 Time T2 Time T1 ATM T2 ATM 11:56:58 11:58:19 Midland Halifax 11:56:58 Halifax Midland xxx116d91d6-ef17 116d91d6-ef17 116d91d6-ef17 xxx116d91d6-ef17 11:58:19 Duplicate results (A:B / B:A) ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Join Windows T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds WHERE WITHIN 10 MINUTES T1.TRANSACTION_ID != T2.TRANSACTION_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Join Windows T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds WHERE WITHIN 10 MINUTES T1.TRANSACTION_ID != T2.TRANSACTION_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Join Windows T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds WHERE WITHIN 10 MINUTES T1.TRANSACTION_ID != T2.TRANSACTION_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Only join forward T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds WITHIN (0 MINUTES, 10 MINUTES) WHERE T1.TRANSACTION_ID != T2.TRANSACTION_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Only join forward T1 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland T2 Ac. ID Transaction ID A42 Time ATM xxx116d91d6-ef17 11:56:58 Midland A42 116d91d6-ef17 11:58:19 Halifax A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds A42 09c2f660-ef17 19:31:11 Lloyds WITHIN (0 MINUTES, 10 MINUTES) WHERE T1.TRANSACTION_ID != T2.TRANSACTION_ID ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Only join forward T1 Txn ID T2 Txn ID xxx116d91d6-ef17 116d91d6-ef17 T1 Time T2 Time T1 ATM T2 ATM 11:56:58 11:58:19 Midland Halifax WITHIN (0 MINUTES, 10 MINUTES) Ignore events in the right-hand stream prior to those in the left ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Only join forward T1 Txn ID xxx116d91d6-ef17 T2 Txn ID 116d91d6-ef17 T1 Time T2 Time T1 ATM T2 ATM 11:56:58 11:58:19 Midland Halifax Legit Dodgy! ATM Fraud Detection with Apache Kafka and KSQL
Photo by Esteban Lopez on Unsplash @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Calcuate distance between ATMs @rmoff TX1 TX2 GEO_DISTANCE(TX1.location->lat, TX1.location->lon, TX2.location->lat, TX2.location->lon, ‘KM’) ATM Fraud Detection with Apache Kafka and KSQL
Calculate time between transactions @rmoff TX2.ROWTIME - TX1.ROWTIME AS MILLISECONDS_DIFFERENCE (TX2.ROWTIME - TX1.ROWTIME) / 1000 / 60 / 60 AS HOURS_DIFFERENCE ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Photo by Esteban Lopez on Unsplash GEO_DISTANCE(…) / HOURS_DIFFERENCE AS KMH_REQUIRED ATM Fraud Detection with Apache Kafka and KSQL
@rmoff So speaking of time… ksql> PRINT ‘atm_txns_gess’ ; Format:JSON Kafka message { “ROWTIME”: 1544116309152, timestamp “ROWKEY”: “null”, 2018-12-06 17:11:49 “account_id”: “a218”, “timestamp”: “2018-12-06 17:09:58 +0000”, “atm”: “HSBC”, …} Event time ATM Fraud Detection with Apache Kafka and KSQL
CREATE STREAM ATM_TXNS_GESS (account_id VARCHAR, timestamp VARCHAR, … WITH (KAFKA_TOPIC=’atm_txns_gess’, TIMESTAMP=’timestamp’, TIMESTAMP_FORMAT= ‘yyyy-MM-dd HH:mm:ss X’); ksql> PRINT ‘atm_txns_gess’ ; Format:JSON { “ROWTIME”: 1544116309152, “ROWKEY”: “null”, “account_id”: “a218”, “timestamp”: “2018-12-06 17:09:58 +0000”,
But what about the account holder? 👤 @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Photo by Samuel Zeller on Unsplash @rmoff ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Elasticsearch Customer details ATM fraud txns with customer details Notification service
@rmoff Streaming Integration with Kafka Connect syslog flat file CSV JSON Sources MQTT Tasks Workers Kafka Connect Kafka Brokers ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Streaming Integration with Kafka Connect Amazon S3 Sinks MQTT Tasks Workers Kafka Connect Kafka Brokers ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Streaming Integration with Kafka Connect Amazon S3 syslog flat file CSV JSON Sources Sinks MQTT MQTT Tasks Workers Kafka Connect Kafka Brokers ATM Fraud Detection with Apache Kafka and KSQL
Confluent Hub @rmoff • One-stop place to discover and download : • Connectors • Transformations • Converters hub.confluent.io ATM Fraud Detection with Apache Kafka and KSQL
Time The Table Stream Duality Stream Account ID Amount 12345 + €50 12345
@rmoff The truth is the log. The database is a cache of a subset of the log. —Pat Helland Immutability Changes Everything http://cidrdb.org/cidr2015/Papers/CIDR15_Paper16.pdf Photo by Bobby Burch on Unsplash ATM Fraud Detection with Apache Kafka and KSQL
Demo Time! Customer details @rmoff t c e n n o C a m k u f i a z K e b e D ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Spot patterns within this stream Ac. ID A42 Transaction ID Time xxx116d91d6-ef17 11:56:58 ATM Midland A42 116d91d6-ef17 11:58:19 Halifax A42 09c2f660-ef17 19:31:11 Lloyds Legit Dodgy! Legit ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Suspect Transactions Ac. ID T1 Time ATM T2 Time ATM A42 11:56:58 Midland 11:58:19 Halifax Dodgy! ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Suspect Transactions Name Phone Robin M 1234 567 Ac. ID T1 Time ATM T2 Time ATM A42 11:56:58 Midland 11:58:19 Halifax ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Elasticsearch Customer details ATM fraud txns with customer details Notification service
@rmoff Elasticsearch atm_txns_gess accounts Customer details ATM fraud txns ATM_POSSIBLE_FRAUD_ENRICHED with customer details Notification service
Kafka Connect + Neo4j @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Kafka Connect + Neo4j @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Kafka Connect + KSQL + Neo4j @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Kafka Connect + KSQL + Neo4j @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Photo by Joshua Rodriguez on Unsplash What can we do with it? @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Realtime Operations View & Analysis @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Graph analysis @rmoff ATM Fraud Detection with Apache Kafka and KSQL
Push notification to the customer @rmoff ATM Fraud Detection with Apache Kafka and KSQL
@rmoff Confluent Community Components Apache Kafka with a bunch of cool stuff! For free! Log Events Database Changes loT Data Web Events … Confluent Platform Data Integration Real-time Applications Monitoring & Administration Confluent Control Center | Security Confluent Platform Transformations Hadoop Operations Replicator | Auto Data Balancing Custom Apps Database Data Compatibility Schema Registry SQL Stream Processing KSQL Analytics Data Warehouse Development and Connectivity Clients | Connectors | REST Proxy | CLI CRM Monitoring Apache Kafka® Core | Connect API | Streams API … CUSTOMER SELF-MANAGED Datacenter Public Cloud … CONFLUENT FULLY-MANAGED Confluent Cloud ATM Fraud Detection with Apache Kafka and KSQL
Discount code! KS19Comm25 69 1
@rmoff http://cnfl.io/book-bundle ATM Fraud Detection with Apache Kafka and KSQL
https://www.confluent.io/ksql http://cnfl.io/demo-scene http://cnfl.io/book-bundle http://cnfl.io/slack @rmoff
@rmoff Resources • CDC Spreadsheet #EOF • Blog: No More Silos: How to Integrate your Databases with Apache Kafka and CDC • #partner-engineering on Slack for questions • BD team (#partners / partners@confluent.io) can help with introductions on a given sales op ATM Fraud Detection with Apache Kafka and KSQL