Introduction to ksqlDB Robin Moffatt #ljcjug @rmoff
A presentation at LJC Virtual Meetup in May 2020 in by Robin Moffatt
Introduction to ksqlDB Robin Moffatt #ljcjug @rmoff
@rmoff #ljcjug Apache Kafka Producer Consumer The Log Connectors Streaming Engine Introduction to ksqlDB
} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 @rmoff #ljcjug Photo by Franck V. on Unsplash {
@rmoff #ljcjug Streams of events Time Introduction to ksqlDB
Stream Processing @rmoff #ljcjug Stream: widgets Stream: widgets_red Introduction to ksqlDB
Stream Processing with Kafka Streams @rmoff #ljcjug Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red Introduction to ksqlDB
} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 @rmoff #ljcjug Photo by Franck V. on Unsplash {
@rmoff #ljcjug SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 Photo by Franck V. on Unsplash “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …);
ksqlDB @rmoff #ljcjug The event streaming database purpose-built for stream processing applications. Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug Source stream Analytics Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug Source stream Applications / Microservices Introduction to ksqlDB
Stream Processing with ksqlDB @rmoff #ljcjug …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices Introduction to ksqlDB
Photo by Raoul Droog on Unsplash @rmoff #ljcjug DEMO https://rmoff.dev/ksqldb-demo Introduction to ksqlDB
Interacting with ksqlDB Photo by Tim Mossholder on Unsplash
ksqlDB - Confluent Control Center @rmoff #ljcjug Introduction to ksqlDB
ksqlDB - REST API @rmoff #ljcjug Introduction to ksqlDB
ksqlDB - Native client (coming soon) @rmoff #ljcjug Introduction to ksqlDB
@rmoff #ljcjug @rmoff #KafkaMeetup What else can ksqlDB do? Photo by Sereja Ris on Unsplash Introduction to ksqlDB
Lookups and Joins with ksqlDB ORDERS @rmoff #ljcjug {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB
Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB
Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB
Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS_ENRICHED { } “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 Introduction to ksqlDB
Connecting ksqlDB to other systems Photo by Mak on Unsplash @rmoff #ljcjug Introduction to ksqlDB
Connecting ksqlDB to other systems @rmoff #ljcjug syslog Google BigQuery Amazon S3 Introduction to ksqlDB
Connecting ksqlDB to other systems @rmoff #ljcjug CREATE SOURCE CONNECTOR syslog SOURCE_MYSQL_01 WITH ‘connector.class’ = ‘database.hostname’ ‘table.whitelist’ = ( ‘MySqlConnector’, = ‘mysql’, ‘demo.customers’); Introduction to ksqlDB
Connecting ksqlDB to other systems @rmoff #ljcjug CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH ( ‘connector.class’ = ‘ElasticsearchSinkConnector’, ‘connection.url’ = ‘http://elasticsearch:9200’, ‘topics’ = ‘orders’); Google BigQuery Amazon S3 Introduction to ksqlDB
@rmoff #ljcjug Streams & Tables Introduction to ksqlDB
Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } @rmoff #ljcjug ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Introduction to ksqlDB
Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | @rmoff #ljcjug SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) Introduction to ksqlDB
Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB @rmoff #ljcjug { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Introduction to ksqlDB
Kafka topic Pull and Push queries in ksqlDB { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store @rmoff #ljcjug ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query Push query Introduction to ksqlDB
{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API @rmoff #ljcjug curl -s -X “POST” “http:#//localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} Introduction to ksqlDB
Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Exits: Immediately @rmoff #ljcjug Push query All value changes Never Introduction to ksqlDB
@rmoff #ljcjug Under the covers of ksqlDB Introduction to ksqlDB Photo by Vinicius de Moraes on Unsplash
@rmoff #ljcjug Kafka cluster consume produce ksqlDB Introduction to ksqlDB
@rmoff #ljcjug JVM Kafka cluster consume produce ksqlDB Kafka Streams RocksDB Introduction to ksqlDB
K & ^ Kafka Fully Managed as a Service L Q S
Running ksqlDB - self-managed @rmoff #ljcjug DEB, RPM, ZIP, TAR downloads http://confluent.io/download Docker images ksqlDB Server confluentinc/ksqldb-server (JVM process) …and many more… Introduction to ksqlDB
@rmoff #ljcjug Scaling ksqlDB Kafka cluster ksqlDB Introduction to ksqlDB
@rmoff #ljcjug Scaling ksqlDB Kafka cluster ksqlDB Work split by partition ksqlDB ksqlDB cluster Introduction to ksqlDB
Think Applications, not database instances ksqlDB cluster Inventory Kafka cluster @rmoff #ljcjug ksqlDB cluster Fraud ksqlDB cluster Orders Introduction to ksqlDB
Kafka Clusters Kafka cluster A @rmoff #ljcjug ksqlDB cluster Replicator ksqlDB ksqlDB cluster cluster Kafka cluster B Introduction to ksqlDB
@rmoff #ljcjug ksqlDB or Kafka Streams? Photo by Ramiz Dedaković Unsplash Introduction to onksqlDB
ksqlDB Builds on Streams @rmoff #ljcjug ksqlDB Kafka Streams Consumer, Producer Introduction to ksqlDB
ksqlDB supports UDF, UDAF, UDTF @rmoff #ljcjug Introduction to ksqlDB
@rmoff #ljcjug ksqlDB code lifecycle Web UI and CLI for development and testing REST API to deploy code for Production Single ksqlDB node ksqlDB clustered for scale and availability REST Desired ksqlDB queries have been identified “Hmm, let me try out this idea…” curl -s -X “POST” “http://localhost:8088/ksql” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{ “ksql”:”CREATE STREAM LONDON AS SELECT * FROM MOVEMENTS WHERE LOCATION=”’london”’;”, “streamsProperties”: { “ksql.streams.auto.offset.reset”: “earliest” } }’ Introduction to ksqlDB
@rmoff #ljcjug Monitoring ksqlDB Confluent Control Center JMX https://www.confluent.io/blog/troubleshooting-ksql-part-2 Introduction to ksqlDB
Photo by Tucker Good on Unsplash @rmoff #ljcjug Want to learn more? CTAs, not CATs (sorry, not sorry) Introduction to ksqlDB
Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io
Confluent Community Slack group @rmoff #ljcjug cnfl.io/slack Introduction to ksqlDB
Free Books! https://rmoff.dev/ljcjug @rmoff #ljcjug Introduction to ksqlDB
More ksqlDB examples Photo by Tengyart on Unsplash
@rmoff #ljcjug Filtering with ksqlDB ORDERS Introduction to ksqlDB
@rmoff #ljcjug Filtering with ksqlDB ORDERS ksqlDB CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE=’New York’; Introduction to ksqlDB
@rmoff #ljcjug Filtering with ksqlDB ORDERS ksqlDB CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE=’New York’; ORDERS_NY Introduction to ksqlDB
@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US US UK ORDERS_UK UK Introduction to ksqlDB
@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US US INSERT INTO ORDERS_COMBINED SELECT ‘US’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; UK ORDERS_UK UK INSERT INTO ORDERS_COMBINED SELECT ‘UK’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; Introduction to ksqlDB
@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US UK US INSERT INTO ORDERS_COMBINED SELECT ‘US’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS US FROM ORDERS; ORDERS_UK UK UK UK INSERT INTO ORDERS_COMBINED SELECT ‘UK’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS US FROM ORDERS_UK; ORDERS_COMBINED Introduction to ksqlDB
@rmoff #ljcjug Transform data with ksqlDB - split streams US UK UK US ORDERS_COMBINED Introduction to ksqlDB
@rmoff #ljcjug Transform data with ksqlDB - split streams US UK CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’US’; UK US ORDERS_COMBINED CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’UK’; Introduction to ksqlDB
@rmoff #ljcjug Transform data with ksqlDB - split streams US UK CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’US’; US US ORDERS_US US UK ORDERS_COMBINED CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’UK’; UK UK ORDERS_UK Introduction to ksqlDB
Message transformation with ksqlDB ORDERS s i h t t r e v n o C o t p m a t s e m e l ti b a d a e r n a m u h t a m for @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } Drop these address field s Introduction to ksqlDB
Message transformation with ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } ORDERS_NO_ADDRESS_DATA AS ORDERS ksqlDB CREATE STREAM SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; Introduction to ksqlDB
Message transformation with ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } AS ORDERS_NO_ADDRESS_DATA } ORDERS ksqlDB CREATE STREAM SELECT TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS ORDER_TIMESTAMP, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; ORDERS_NO_ADDRESS_DATA { “order_ts”: “2020-02-14 15:10:58”, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5 } Introduction to ksqlDB
Schema manipulation - flatten records { ORDERS s d l e fi d e t s e N } @rmoff #ljcjug “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } Introduction to ksqlDB
Schema manipulation with ksqlDB ORDERS ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_FLAT AS SELECT […] } ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; Introduction to ksqlDB
Schema manipulation with ksqlDB @rmoff #ljcjug { ORDERS ksqlDB “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_FLAT AS SELECT […] } ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; ORDERS_FLAT {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} Introduction to ksqlDB
Reserialising data with ksqlDB Avro ORDERS @rmoff #ljcjug {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} Introduction to ksqlDB
@rmoff #ljcjug Reserialising data with ksqlDB Avro ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT=’DELIMITED’) AS SELECT * FROM ORDERS_FLAT; Introduction to ksqlDB
@rmoff #ljcjug Reserialising data with ksqlDB Avro ksqlDB CSV ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT=’DELIMITED) AS SELECT * FROM ORDERS_FLAT; ORDERS_CSV 1560045914101,24644,Item_0,1,43078 De 1560047305664,24643,Item_29,3,209 Mon 1560057079799,24642,Item_38,18,3 Autu 1560088652051,24647,Item_6,6,82893 Ar 1560105559145,24648,Item_0,12,45896 W 1560108336441,24646,Item_33,4,272 Hef 1560123862235,24641,Item_15,16,0 Dort 1560124799053,24645,Item_12,1,71 Knut Introduction to ksqlDB