No More Silos: Integrating Databases ® and Apache Kafka @rmoff Berlin Buzzwords 2021 #bbuzz

Photo by Emily Morter on Unsplash @rmoff | #bbuzz

Analytics - Database Offload RDBMS HDFS / S3 / BigQuery etc @rmoff | #bbuzz

Real-time Event Stream Enrichment order events customer orders C D C RDBMS <y> customer Stream Processing @rmoff | #bbuzz

Evolve processing from old systems to new Existing App New App <x> RDBMS Stream Processing @rmoff | #bbuzz

“ But streaming…I’ve just got data in a database…right? @rmoff | #bbuzz

@rmoff

“ Bold claim: all your data is event streams @rmoff | #bbuzz

Human generated events A Sale A Stock movement @rmoff | #bbuzz

Machine generated events Networking IoT Applications @rmoff | #bbuzz

Databases @rmoff | #bbuzz

Do you think that’s a table you are querying? @rmoff | #bbuzz

The Stream Table Duality Account ID Balance 12345 €50 @rmoff | #bbuzz

Time The Stream Table Duality Account ID Amount 12345 + €50 Account ID Balance 12345 €50 @rmoff | #bbuzz

Time The Stream Table Duality Account ID Amount 12345 + €50 12345

  • €25 Account ID Balance 12345 €75 @rmoff | #bbuzz

Time The Stream Table Duality Account ID Amount 12345 + €50 12345

  • €25 12345 -€60 Account ID Balance 12345 €15 @rmoff | #bbuzz

Time The Stream Table Duality Stream Table Account ID Amount 12345 + €50 12345

  • €25 12345 -€60 Account ID Balance 12345 €15 @rmoff | #bbuzz

The truth is the log. The database is a cache of a subset of the log. —Pat Helland Immutability Changes Everything http://cidrdb.org/cidr2015/Papers/CIDR15_Paper16.pdf Photo by Bobby Burch on Unsplash @rmoff | #bbuzz

Photo by Vadim Sherbakov on Unsplash @rmoff | #bbuzz

Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers @rmoff | #bbuzz

Streaming Integration with Kafka Connect Amazon Sinks Google Kafka Connect Kafka Brokers @rmoff | #bbuzz

Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect Kafka Brokers @rmoff | #bbuzz

Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff | #bbuzz

Kafka Connect basics Source Kafka Connect Kafka @rmoff | #bbuzz

Connectors Connector Source Kafka Connect Kafka @rmoff | #bbuzz

Connectors Connector Native data Connect Record Source Kafka Connect Kafka @rmoff | #bbuzz

Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka @rmoff | #bbuzz

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV 👍 👍 👍 😬 https://rmoff.dev/qcon-schemas @rmoff | #bbuzz

The Confluent Schema Registry Avro Schema Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect @rmoff | #bbuzz

Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka @rmoff | #bbuzz

Extensible Connector Transform(s) Converter @rmoff | #bbuzz

hub.confluent.io @rmoff | #bbuzz

Change-Data-Capture (CDC) Query-based Log-based @rmoff | #bbuzz

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Log-based CDC #051024 17:24:13 server id 1 # Position Timestamp

00000004 9d fc 5c 43

end_log_pos 98 Type Master ID 0f 01 00 00 00 Size 5e 00 00 00 Master Pos Flags 62 00 00 00 00 00

00000017 04 00 35 2e 30 2e 31 35

2d 64 65 62 75 67 2d 6c |..5.0.15.debug.l|

00000027 6f 67 00 00 00 00 00 00

00 00 00 00 00 00 00 00 |og…………..|

00000037 00 00 00 00 00 00 00 00

00 00 00 00 00 00 00 00 |…………….|

00000047 00 00 00 00 9d fc 5c 43

13 38 0d 00 08 00 12 00 |…….C.8……|

00000057 04 04 04 04 12 00 00 4b

00 04 1a |…….K…| @rmoff | #bbuzz

Log-based CDC Transaction log @rmoff | #bbuzz

Log-based CDC Transaction log @rmoff | #bbuzz

Log-based CDC Transaction log @rmoff | #bbuzz

Log-based CDC Transaction log @rmoff | #bbuzz

Demo Time! https://rmoff.dev/cdc-demo @rmoff | #bbuzz

@rmoff | #bbuzz

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC CREATE TABLE my_table ( ID INT, FOO VARCHAR, BAR VARCHAR, WIBBLE VARCHAR, TS_COL TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC INSERT SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC INSERT SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC UPDATE @rmoff | #bbuzz

Query-based CDC UPDATE SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #bbuzz

Query-based CDC DELETE @rmoff | #bbuzz

Query-based CDC DELETE SELECT * FROM my_table WHERE ts_col > previous ts e p o N @rmoff | #bbuzz

Query-based CDC orderID status address updateTS SELECT * FROM orders WHERE updateTS > previous ts ⏰now = 10:54:00 previous ts = 10:53:30 @rmoff | #bbuzz

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:29 SHIPPED SELECT * FROM orders WHERE updateTS > previous ts ⏰now = 10:54:30 previous ts = 10:54:00 { } “orderID”: 42, “status”: “SHIPPED”, “address”: “29 Acacia Road”, “updateTS”: “10:54:29” @rmoff | #bbuzz

Query-based CDC orderID status address updateTS 42

10:54:01 PENDING INSERT INTO orders (orderID, status) VALUES (42, ‘PENDING’); @rmoff | #bbuzz

Query-based CDC orderID status address 42 1640 Riverside Drive 10:54:20 PENDING updateTS UPDATE orders SET address = ‘1640 Riverside Drive’ WHERE orderID = 42; @rmoff | #bbuzz

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:25 PENDING UPDATE orders SET address = ‘29 Acacia Road’ WHERE orderID = 42; @rmoff | #bbuzz

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:29 SHIPPED UPDATE orders SET status = ‘SHIPPED’ WHERE orderID = 42; @rmoff | #bbuzz

Query-based CDC orderID status orderID address status 42 SHIPPED 42 address 29 PENDING Acacia Road — updateTS updateTS 10:54:29 10:54:01 42 PENDING 1640 Riverside Drive 10:54:20 42 PENDING 29 Acacia Road 10:54:25 42 SHIPPED 29 Acacia Road 10:54:29 @rmoff | #bbuzz

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:29 SHIPPED SELECT * FROM orders WHERE updateTS > previous ts ⏰now = 10:54:30 previous ts = 10:54:00 { } “orderID”: 42, “status”: “SHIPPED”, “address”: “29 Acacia Road”, “updateTS”: “10:54:29” @rmoff | #bbuzz

Query-based CDC ✅ Usually easier to setup ✅ Requires fewer permissions 🛑 Needs specific columns in source schema to track changes 🛑 Impact of polling the DB (or higher latencies tradeoff) Photo by Matese Fields on Unsplash 🛑 Can’t track deletes 🛑 Can’t track multiple events between polling interval Read more: http://cnfl.io/kafka-cdc @rmoff | #bbuzz

@rmoff | #bbuzz

Log-based CDC Transaction log @rmoff | #bbuzz

Log-based CDC UPDATE Transaction log @rmoff | #bbuzz

Log-based CDC UPDATE Transaction log @rmoff | #bbuzz

Log-based CDC DELETE Transaction log @rmoff | #bbuzz

Log-based CDC DELETE Transaction log @rmoff | #bbuzz

Log-based CDC DELETE Transaction log @rmoff | #bbuzz

Log-based CDC Commit log @rmoff | #bbuzz

Photo by Sebastian Pociecha on Unsplash Log-based CDC ✅ Greater data fidelity ✅ Lower latency ✅ Lower impact on source 🛑 More setup steps 🛑 Higher system privileges required 🛑 For propriatory databases, usually $$$ Read more: http://cnfl.io/kafka-cdc @rmoff | #bbuzz

tl;dr : which tool do I use? Query-based CDC: confluent.io/hub @rmoff | #bbuzz

tl;dr : which tool do I use? Log-based CDC - Oracle confluent.io/hub @rmoff | #bbuzz

tl;dr : which tool do I use? Log-based CDC - MySQL, Postgres, MongoDB, etc confluent.io/hub @rmoff | #bbuzz

on Photo by Want to learn more? CTAs, not CATs (sorry, not sorry) @rmoff | #bbuzz

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

#EOF @rmoff rmoff.dev/talks youtube.com/rmoff