Integrating Oracle and Kafka Robin Moffatt | #ACEsAtHome | @rmoff

$ whoami • Robin Moffatt (@rmoff) • Senior Developer Advocate at Confluent (Apache Kafka, not Wikis 😉) • Working in data & analytics since 2001 • Oracle ACE Director (Alumnus) http://rmoff.dev/talks · http://rmoff.dev/blog · http://rmoff.dev/youtube @rmoff | #ACEsAtHome | @confluentinc

Kafka is an Event Streaming Platform App App App App request-response changelogs App messaging App KAFKA App OR stream App DWH Hadoop @rmoff | processing streaming data pipelines #ACEsAtHome | @confluentinc

What is an Event Streaming Platform? Producer Connectors Consumer The Log Connectors Streaming Engine @rmoff | #ACEsAtHome | @confluentinc

Immutable Event Log Old New Messages are added at the end of the log @rmoff | #ACEsAtHome | @confluentinc

Topics Clicks Orders Customers Topics are similar in concept to tables in a database @rmoff | #ACEsAtHome | @confluentinc

Partitions Clicks p0 P1 P2 Messages are guaranteed to be strictly ordered within a partition @rmoff | #ACEsAtHome | @confluentinc

Messages are just K/V bytes plus headers + timestamp Clicks Header Timestamp Key Value @rmoff | #ACEsAtHome | @confluentinc

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV 👍 👍 👍 😬 https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf @rmoff | #ACEsAtHome | @confluentinc

Consumers have a position all of their own Old New Sally Scan is here @rmoff | #ACEsAtHome | @confluentinc

Consumers have a position all of their own Old New Fred Sally Scan is here Scan is here @rmoff | #ACEsAtHome | @confluentinc

Consumers have a position all of their own George Scan is here Old New Fred Sally Scan is here Scan is here @rmoff | #ACEsAtHome | @confluentinc

Free Books! https://rmoff.dev/aces @rmoff | #ACEsAtHome | @confluentinc

Database Offload Amazon S3 RDBMS <x> RDBMS <y> @rmoff | #ACEsAtHome | @confluentinc

Evolve processing from old systems to new Existing New App <x> App C D C RDBMS Stream Processing @rmoff | #ACEsAtHome | @confluentinc

Evolve processing from old systems to new Existing New App <x> App New App <y> C D C RDBMS Stream Processing @rmoff | #ACEsAtHome | @confluentinc

Evolve processing from old systems to new Existing New App <x> App New App <y> C D C RDBMS Stream Processing @rmoff | #ACEsAtHome | @confluentinc

Streaming Integration with Kafka Connect syslog Sources Kafka Connect @rmoff | Kafka Brokers #ACEsAtHome | @confluentinc

Streaming Integration with Kafka Connect Amazon Sinks Google Kafka Connect @rmoff | Kafka Brokers #ACEsAtHome | @confluentinc

Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect @rmoff | Kafka Brokers #ACEsAtHome | @confluentinc

Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: https://docs.confluent.io/current/ “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff | #ACEsAtHome | @confluentinc

Extensible Connector Transform(s) @rmoff | Converter #ACEsAtHome | @confluentinc

hub.confluent.io @rmoff | #ACEsAtHome | @confluentinc

Single Kafka Connect node S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Worker Offsets Config Status @rmoff | #ACEsAtHome | @confluentinc

Kafka Connect - scalable and fault-tolerant S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff | #ACEsAtHome | @confluentinc

Automatic fault tolerance S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Worker Offsets Config Status @rmoff | #ACEsAtHome | @confluentinc

Change-Data-Capture (CDC) Query-based Log-based @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC SELECT * @rmoff | FROM my_table WHERE ts_col > previous ts #ACEsAtHome | @confluentinc

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC SELECT * @rmoff | FROM my_table WHERE ts_col > previous ts #ACEsAtHome | @confluentinc

Query-based CDC SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC #051024 17:24:13 server id 1 # Position Timestamp

00000004 9d fc 5c 43

end_log_pos 98 Type Master ID Size Master Pos Flags 0f 01 00 00 00 5e 00 00 00 62 00 00 00 00 00

00000017 04 00 35 2e 30 2e 31 35

2d 64 65 62 75 67 2d 6c |..5.0.15.debug.l|

00000027 6f 67 00 00 00 00 00 00

00 00 00 00 00 00 00 00 |og…………..|

00000037 00 00 00 00 00 00 00 00

00 00 00 00 00 00 00 00 |…………….|

00000047 00 00 00 00 9d fc 5c 43

13 38 0d 00 08 00 12 00 |…….C.8……|

00000057 04 04 04 04 12 00 00 4b

00 04 1a @rmoff |…….K…| | #ACEsAtHome | @confluentinc

Log-based CDC Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC Transaction log @rmoff | #ACEsAtHome | @confluentinc

DEMO Photo by Tom Moffatt @rmoff | #ACEsAtHome | @confluentinc

@rmoff | #ACEsAtHome | @confluentinc

Read more: http://cnfl.io/kafka-c Query-based CDC ✅ Usually easier to setup ✅ Requires fewer permissions 🛑 Needs specific columns in source schema to track changes Photo by Matese Fields on Unsplash 🛑 Impact of polling the DB (or higher latencies tradeoff) 🛑 Can’t track deletes 🛑 Can’t track multiple events between polling interval @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC SELECT * @rmoff | FROM my_table WHERE ts_col > previous ts #ACEsAtHome | @confluentinc

Query-based CDC SELECT * CREATE TABLE my_table ( FROM my_table WHERE ts_col > previous ts ID INT, FOO VARCHAR, BAR VARCHAR, WIBBLE VARCHAR, TS_COL TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC SELECT * INSERT @rmoff | FROM my_table WHERE ts_col > previous ts #ACEsAtHome | @confluentinc

Query-based CDC INSERT SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC UPDATE @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC UPDATE SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC DELETE @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC DELETE e p o SELECT * FROM my_table WHERE ts_col > previous ts @rmoff | #ACEsAtHome | N @confluentinc

Query-based CDC orderID status address updateTS SELECT * FROM orders WHERE updateTS > previous ts @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:29 SHIPPED SELECT * FROM orders WHERE updateTS > { previous ts @rmoff | } “orderID”: 42, “status”: “SHIPPED”, “address”: “29 Acacia Road”, “updateTS”: “10:54:29” #ACEsAtHome | @confluentinc

Query-based CDC orderID status address updateTS 42

10:54:00 PENDING INSERT INTO orders (orderID, status) VALUES (42, ‘PENDING’); @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC orderID status address 42 1640 Riverside Drive 10:54:20 PENDING updateTS UPDATE orders SET address = ‘1640 Riverside Drive’ WHERE orderID = 42; @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:25 PENDING UPDATE orders SET address = ‘29 Acacia Road’ WHERE orderID = 42; @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:29 SHIPPED UPDATE orders SET status = ‘SHIPPED’ WHERE orderID = 42; @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC orderID status orderID address status 42 SHIPPED 42 address 29 PENDING Acacia Road — updateTS updateTS 10:54:29 10:54:00 42 PENDING 1640 Riverside Drive 10:54:20 42 PENDING 29 Acacia Road 10:54:25 42 SHIPPED 29 Acacia Road 10:54:29 @rmoff | #ACEsAtHome | @confluentinc

Query-based CDC orderID status address updateTS 42 29 Acacia Road 10:54:29 SHIPPED SELECT * FROM orders WHERE updateTS > { previous ts @rmoff | } “orderID”: 42, “status”: “SHIPPED”, “address”: “29 Acacia Road”, “updateTS”: “10:54:29” #ACEsAtHome | @confluentinc

@rmoff Event-driven Query-based Log-based

Photo by Sebastian Pociecha on Unsp Log-based CDC ✅ Greater data fidelity ✅ Lower latency ✅ Lower impact on source 🛑 More setup steps 🛑 Higher system privileges required 🛑 For propriatory databases, usually $ $$ Read more: http://cnfl.io/kafka-cdc @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC UPDATE Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC UPDATE Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC DELETE Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC DELETE Transaction log @rmoff | #ACEsAtHome | @confluentinc

Log-based CDC Immutable event log @rmoff | #ACEsAtHome | @confluentinc

tl;dr : which tool do I use? • If you’re wanting to do query-based CDC, the choice is simple: confluent.io/hub @rmoff | #ACEsAtHome | @confluentinc

Which Log-Based CDC Tool? • Open Source RDBMS, e.g. MySQL, PostgreSQL • Debezium • (+ paid options) • Proprietory RDBMS, e.g. Oracle, MS SQL, DB2 • Oracle GoldenGate • Debezium + XStream • Attunity • Mainframe e.g. VSAM, IMS • IBM InfoSphere Data Replication • Attunity • SQData • SQData • HVR • tcVISION • tcVISION @rmoff | #ACEsAtHome | @confluentinc

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #ACEsAtHome | @confluentinc

Streams of events Time @rmoff | #ACEsAtHome | @confluentinc

Stream Processing Stream: widgets Stream: widgets_red @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red @rmoff | #ACEsAtHome | @confluentinc

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #ACEsAtHome | @confluentinc

SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …); @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB Source stream @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB Source stream @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB Source stream @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB Source stream Analytics @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB Source stream Applications / Microservices @rmoff | #ACEsAtHome | @confluentinc

Stream Processing with ksqlDB …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices @rmoff | #ACEsAtHome | @confluentinc

Lookups and Joins with ksqlDB ORDERS @rmoff {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} | #ACEsAtHome | @confluentinc

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS @rmoff | #ACEsAtHome | @confluentinc

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; @rmoff | } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} #ACEsAtHome | @confluentinc

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 ORDERS_ENRICHED } @rmoff | #ACEsAtHome | @confluentinc

Streams & Tables @rmoff | #ACEsAtHome | @confluentinc

Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #ACEsAtHome | @confluentinc

Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) @rmoff | #ACEsAtHome | @confluentinc

Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #ACEsAtHome | @confluentinc

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic Pull and Push queries in ksqlDB ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query Push query @rmoff | #ACEsAtHome | @confluentinc

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API curl -s -X “POST” “http://localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} @rmoff | #ACEsAtHome | @confluentinc

Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Push query All value changes Exits: Immediately Never @rmoff | #ACEsAtHome | @confluentinc

on Photo by Want to learn more? CTAs, not CATs (sorry, not sorry) @rmoff | #ACEsAtHome | @confluentinc

Free Books! https://rmoff.dev/aces @rmoff | #ACEsAtHome | @confluentinc

60 DE VA DV $50 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $60 towards your bill 😄 ) Fully Managed Kafka as a Service * Limited availability. Activate by 11th September 2020. Expires after 90 days of activation. Any unused promo value on the expiration date will be forfeited.

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Confluent Community Slack group cnfl.io/slack @rmoff | #ACEsAtHome | @confluentinc

Further reading / watching • Kafka as a Platform: the Ecosystem from the Ground Up http://rmoff.dev/youtube • https://rmoff.dev/kafka101 • Apache Kafka and ksqlDB in Action: Let’s Build a Streaming Data Pipeline! • https://rmoff.dev/ljc-kafka-01 • From Zero to Hero with Kafka Connect • https://rmoff.dev/ljc-kafka-02 • Introduction to ksqlDB • https://rmoff.dev/ljc-kafka-03 • The Changing Face of ETL: Event-Driven Architectures for Data Engineers • https://rmoff.dev/oredev19-changing-face-of-etl • 🚂On Track with Apache Kafka: Building a Streaming Platform solution with Rail Data • https://rmoff.dev/oredev19-on-track-with-kafka @rmoff | #ACEsAtHome | @confluentinc

#EOF @rmoff #ACEsAtHome rmoff.dev/talks