Kafka as a Platform: the Ecosystem from the Ground Up Robin Moffatt | #GOTOpia | @rmoff
A presentation at GOTOpia in September 2020 in by Robin Moffatt
Kafka as a Platform: the Ecosystem from the Ground Up Robin Moffatt | #GOTOpia | @rmoff
$ whoami • Robin Moffatt (@rmoff) • Senior Developer Advocate at Confluent (Apache Kafka, not Wikis !) • Working in data & analytics since 2001 • Oracle ACE Director (Alumnus) http://rmoff.dev/talks · http://rmoff.dev/blog · http://rmoff.dev/youtube @rmoff | #GOTOpia | @confluentinc
EVENTS @rmoff | #GOTOpia | @confluentinc
EVENTS @rmoff | #GOTOpia | @confluentinc
• • EVENTS d e n e p p a h g n i h t e Som d e n e p p a h t a Wh
Human generated events A Sale A Stock movement @rmoff | #GOTOpia | @confluentinc
Machine generated events IoT Networking Applications @rmoff | #GOTOpia | @confluentinc
EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc
EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc
K V
LOG @rmoff | #GOTOpia | @confluentinc
K V
K V
K V
K V
K V
K V
K V
Immutable Event Log Old New Events are added at the end of the log @rmoff | #GOTOpia | @confluentinc
TOPICS @rmoff | #GOTOpia | @confluentinc
Topics Clicks Orders Customers Topics are similar in concept to tables in a database @rmoff | #GOTOpia | @confluentinc
PARTITIONS @rmoff | #GOTOpia | @confluentinc
Partitions Clicks p0 P1 P2 Messages are guaranteed to be strictly ordered within a partition @rmoff | #GOTOpia | @confluentinc
PUB / SUB @rmoff | #GOTOpia | @confluentinc
PUB / SUB @rmoff | #GOTOpia | @confluentinc
Producing data Old New Messages are added at the end of the log @rmoff | #GOTOpia | @confluentinc
partition 0 … partition 1 producer … partition 2 … Partitioned Topic
try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) { for (long i = 0; i < 10; i++) { final String orderId = “id” + Long.toString(i); final Payment payment = new Payment(orderId, 1000.00d); final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(“transactions”, payment.getId().toString(), payment); producer.send(record); } } catch (final InterruptedException e) { e.printStackTrace(); }
package main import ( “gopkg.in/confluentinc/confluent-kafka-go.v1/kafka” ) func main() { topic := “test_topic” p, _ := kafka.NewProducer(&kafka.ConfigMap{ “bootstrap.servers”: “localhost:9092”}) defer p.Close() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0}, Value: []byte(“Hello world”)}, nil) }
Producing to Kafka - No Key Time Partition 1 Partition 2 Partition 3 Messages will be produced in a round robin fashion Partition 4 @rmoff | #GOTOpia | @confluentinc
Producing to Kafka - With Key Time Partition 1 A Partition 2 B hash(key) % numPartitions = N Partition 3 C Partition 4 D @rmoff | #GOTOpia | @confluentinc
Producers partition 0 … partition 1 producer … partition 2 … Partitioned Topic • A client application • Puts messages into topics • Handles partitioning, network protocol • Java, Go, .NET, C/C++, Python • Also every other language Plus REST proxy if not
PUB / SUB @rmoff | #GOTOpia | @confluentinc
Consuming data - access is only sequential Read to offset & scan Old New @rmoff | #GOTOpia | @confluentinc
Consumers have a position of their own Old New Sally is here @rmoff | Scan #GOTOpia | @confluentinc
Consumers have a position of their own Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc
Consumers have a position of their own Rick is here Scan Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc
try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(TOPIC)); while (true) { ConsumerRecords<String, Payment> records = consumer.poll(100); for (ConsumerRecord<String, Payment> record : records) { String key = record.key(); Payment value = record.value(); System.out.printf(“key = %s, value = %s%n”, key, value); } } }
c, _ := kafka.NewConsumer(&cm) defer c.Close() c.Subscribe(topic, nil) for { select { case ev := <-c.Events(): switch ev.(type) { case *kafka.Message: km := ev.(*kafka.Message) fmt.Printf(“✅ Message ‘%v’ received from topic ‘%v’\n”, string(km.Value), string(*km.TopicPartition.Topic)) } } }
Consuming From Kafka - Single Consumer Partition 1 Partition 2 C Partition 3 Partition 4 @rmoff | #GOTOpia | @confluentinc
Consuming From Kafka - Multiple Consumers Partition 1 C1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc
Consuming From Kafka - Multiple Consumers C1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc
Consuming From Kafka - Grouped Consumers CC1 1 CC1 1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc
CONSUMERS CONSUMER GROUP COORDINATOR CONSUMER GROUP
Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 C4 Partition 4 @rmoff | #GOTOpia | @confluentinc
Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 3 #GOTOpia | Partition 4 @rmoff | @confluentinc
Consuming From Kafka - Grouped Consumers Partition 1 C1 Partition 2 Partition 3 C2 C3 Partition 4 @rmoff | #GOTOpia | @confluentinc
Consumers partition 0 … partition 1 … consumer A consumer A consumer A partition 2 … Partitioned Topic consumer B • A client application • Reads messages from topics • Horizontally, elastically scalable (if stateless) • Java, Go, .NET, C/C++, Python, everything else Plus REST proxy if not
BROKERS and REPLICATION @rmoff | #GOTOpia | @confluentinc
Leader Partition Leadership and Replication Follower Partition 1 Partition 2 Partition 3 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc
Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc
Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc
Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc
So far, this is Pretty good @rmoff | #GOTOpia | @confluentinc
So far, this is Pretty good but I’ve not finished yet… @rmoff | #GOTOpia | @confluentinc
Streaming Pipelines Amazon S3 RDBMS HDFS @rmoff | #GOTOpia | @confluentinc
Evolve processing from old systems to new Existing New App <x> App RDBMS @rmoff | #GOTOpia | @confluentinc
Streaming Integration with Kafka Connect syslog Sources Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc
Streaming Integration with Kafka Connect Amazon Sinks Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc
Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc
Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff | #GOTOpia | @confluentinc
Extensible Connector Transform(s) @rmoff Converter | #GOTOpia | @confluentinc
hub.confluent.io @rmoff | #GOTOpia | @confluentinc
Fault-tolerant? Nope. Kafka Connect Standalone Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Offsets Worker @rmoff | #GOTOpia | @confluentinc
Fault-tolerant? Yeah! Kafka Connect Distributed Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc
Fault-tolerant? Yeah! Scaling the Distributed Worker S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc
Schema Registry @rmoff | #GOTOpia | @confluentinc
K V
K V
K V
K V
K V ? s i h t s ’ t a h w … t i a W
How do you serialise your data? JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc
APIs are contracts between services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc
But not all services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc
And naturally… {user_id: 53, address: “2 Elm st.”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc
Schemas are about how teams work together {user_id: 53, timestamp: 1497842472} new Date(timestamp) Profile service Quote service Profile database create table ( user_id number, timestamp number) @rmoff Stream processing | #GOTOpia | @confluentinc
Things change… {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc
Moving fast and breaking things {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc
Lack of schemas – Coupling teams and services 2001 2001 Citrus Heights-Sunrise Blvd Citrus_Hghts 60670001 3400293 34 SAC Sacramento SV Sacramento Valley SAC Sacramento County APCD SMA8 Sacramento Metropolitan Area CA 6920 Sacramento 28 6920 13588 7400 Sunrise Blvd 95610 38 41 56 38.6988889 121 16 15.98999977 -121.271111 10 4284781 650345 52 @rmoff | #GOTOpia | @confluentinc
Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc
Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV #
$ https://rmoff.dev/qcon-schemas @rmoff | #GOTOpia | @confluentinc
It isn’t just about the services Software Teams Engineering & Culture Data & Metadata @rmoff | #GOTOpia | @confluentinc
Schemas Schema Registry Topic producer … consumer
partition 0 consumer A … consumer A partition 1 … consumer A partition 2 … consumer B Partitioned Topic @rmoff | #GOTOpia | @confluentinc
consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc
} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc
Streams of events Time @rmoff | #GOTOpia | @confluentinc
Stream Processing Stream: widgets Stream: widgets_red @rmoff | #GOTOpia | @confluentinc
Stream Processing with Kafka Streams Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red @rmoff | #GOTOpia | @confluentinc
consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc
Streams Application Streams Application Streams Application @rmoff | #GOTOpia | @confluentinc
Properties streamsConfiguration = getProperties(SCHEMA_REGISTRY_URL); final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); final SpecificAvroSerde<Movie> movieSerde = getMovieAvroSerde(serdeConfig); final SpecificAvroSerde<Rating> ratingSerde = getRatingAvroSerde(serdeConfig); final SpecificAvroSerde<RatedMovie> ratedMovieSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); StreamsBuilder builder = new StreamsBuilder(); KTable<Long, Double> ratingAverage = getRatingAverageTable(builder); getRatedMoviesTable(builder, ratingAverage, movieSerde); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); streams.start(); } private static SpecificAvroSerde<Rating> getRatingAvroSerde(Map<String, String> serdeConfig) { final SpecificAvroSerde<Rating> ratingSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); return ratingSerde;
final SpecificAvroSerde<Movie> movieSerde = new SpecificAvroSerde<>(); movieSerde.configure(serdeConfig, false); return movieSerde; } public static KTable<Long, String> getRatedMoviesTable(StreamsBuilder builder, KTable<Long, Double> ratingAverage, SpecificAvroSerde<Movie> movieSerde) { builder.stream(“raw-movies”, Consumed.with(Serdes.Long(), Serdes.String())) .mapValues(Parser::parseMovie) .map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)) .to(“movies”, Produced.with(Serdes.Long(), movieSerde)); KTable<Long, Movie> movies = builder.table(“movies”, Materialized .<Long, Movie, KeyValueStore<Bytes, byte[]>>as( “movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); KTable<Long, String> ratedMovies = ratingAverage .join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; }
.join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; } public static KTable<Long, Double> getRatingAverageTable(StreamsBuilder builder) { KStream<Long, String> rawRatings = builder.stream(“raw-ratings”, Consumed.with(Serdes.Long(), Serdes.String())); KStream<Long, Rating> ratings = rawRatings.mapValues(Parser::parseRating) .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating)); KStream<Long, Double> numericRatings = ratings.mapValues(Rating::getRating); KGroupedStream<Long, Double> ratingsById = numericRatings.groupByKey(); KTable<Long, Long> ratingCounts = ratingsById.count(); KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2); KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts, (sum, count) -> sum / count.doubleValue(), Materialized.as(“average-ratings”)); ratingAverage.toStream() /.peek((key, value) -> { // debug only System.out.println(“key = ” + key + “, value = ” + value); })/ .to(“average-ratings”); return ratingAverage;
KTable<Long, Movie> movies = builder.table(“movies”, Materialized. <Long, Movie,KeyValueStore< Bytes, byte[]>> as(“movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); • Java API • Filter, join, aggregate, etc. • Locates stream processing with your application • Scales like a Consumer Group (but better!)
Stream Processing with ksqlDB Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red @rmoff | #GOTOpia | @confluentinc
} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc
SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …); @rmoff | #GOTOpia | @confluentinc
ksqlDB The event streaming database purpose-built for stream processing applications. @rmoff | #GOTOpia | @confluentinc
Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc
Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc
Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc
Stream Processing with ksqlDB Source stream Analytics @rmoff | #GOTOpia | @confluentinc
Stream Processing with ksqlDB Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc
Stream Processing with ksqlDB …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc
Lookups and Joins with ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @rmoff | #GOTOpia | @confluentinc
Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS @rmoff | #GOTOpia | @confluentinc
Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; @rmoff | } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} #GOTOpia | @confluentinc
Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 ORDERS_ENRICHED } @rmoff | #GOTOpia | @confluentinc
Streams & Tables @rmoff | #GOTOpia | @confluentinc
Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc
Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) @rmoff | #GOTOpia | @confluentinc
Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc
{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic Pull and Push queries in ksqlDB ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query @rmoff | Push query #GOTOpia | @confluentinc
{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API curl -s -X “POST” “http://localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} @rmoff | #GOTOpia | @confluentinc
Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Push query All value changes Exits: Immediately Never @rmoff | #GOTOpia | @confluentinc
ksqlDB or Kafka Streams? @rmoff | #GOTOpia | @confluentinc Photo by Ramiz Dedaković on Unsplash
Standing on the Shoulders of Streaming Giants ksqlDB Powered by Ease of use ksqlDB UDFs Kafka Streams Powered by Producer, Consumer APIs Flexibility @rmoff | #GOTOpia | @confluentinc
Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc
Summary @rmoff | #GOTOpia | @confluentinc
@rmoff | #GOTOpia | @confluentinc
K V @rmoff | #GOTOpia | @confluentinc
K V @rmoff | #GOTOpia | @confluentinc
The Log @rmoff | #GOTOpia | @confluentinc
Producer Consumer The Log @rmoff | #GOTOpia | @confluentinc
Producer Consumer Connectors The Log @rmoff | #GOTOpia | @confluentinc
Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc
Apache Kafka Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc
Confluent Platform ksqlDB Producer Consumer Connectors The Log Schema Registry Streaming Engine @rmoff | #GOTOpia | @confluentinc
EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc
EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc
Standby for resource links… @rmoff | #GOTOpia | @confluentinc
Free Books! https://rmoff.dev/gotopia @rmoff | #GOTOpia | @confluentinc
60 DE VA DV $50 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $60 towards your bill % ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer
Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io
Confluent Community Slack group cnfl.io/slack @rmoff | #GOTOpia | @confluentinc
#EOF @rmoff rmoff.dev/talks youtube.com/rmoff