Crossing the Streams: Rethinking Stream Processing with Kafka Streams and KSQL DEVNEXUS 2019 | @tlberglund | @gamussa

@gamussa | @ @tlberglund | #DEVnexus

https://cnfl.io/streams-movie-demo

Raffle, yeah 🚀 Follow @gamussa 📸🖼👬 Tag @gamussa @tlberglund @tlberglund With #devnexus

Preface

Streaming is the toolset for dealing with events as they move! @gamussa | @tlberglund | #DEVnexus

Java Apps with Kafka Streams or KSQL Serving Layer (Microservices, Elastic, etc.) API based clustering High Throughput Continuous Streaming platform Computation @gamussa | @ @tlberglund | #DEVnexus

Apache Kafka Event Streaming Platform 101

Event Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Connect Kafka Brokers @gamussa | Zookeeper Nodes @ @tlberglund | #DEVnexus

The log is a simple idea New Old Messages are added at the end of the log @gamussa | @tlberglund | #DEVnexus

The log is a simple idea New Old Messages are added at the end of the log @gamussa | @tlberglund | #DEVnexus

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan Viktor is here @gamussa | Scan @tlberglund | #DEVnexus

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Viktor is here Scan @gamussa | @tlberglund | Scan #DEVnexus

Consumers have a position all of their own Ricardo is here Scan New Old Robin is here @gamussa | Viktor is here Scan @tlberglund | #DEVnexus Scan

Only Sequential Access Old Read to offset & scan @gamussa | @tlberglund | #DEVnexus New

Shard data to get scalability Producer (1) Producer (2) Producer (3) Messages are sent to different partitions Cluster of machines Partitions live on different machines @gamussa | @tlberglund | #DEVnexus

CONSUMERS CONSUMER GROUP CONSUMER GROUP COORDINATOR

Linearly Scalable Architecture Producers Single topic: - Many producers machines - Many consumer machines - Many Broker machines No Bottleneck!! Consumers @gamussa | @tlberglund | #DEVnexus

!// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { !// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count !== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | @ @tlberglund | #DEVnexus

!// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { !// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count !== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | @ @tlberglund | #DEVnexus

!// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { !// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count !== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | @ @tlberglund | #DEVnexus

!// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap!<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer!<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer!<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { !// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count !== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | @ @tlberglund | #DEVnexus

while (true) { !// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count !== null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | @ @tlberglund | #DEVnexus

while (true) { !// consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count !== null) { count = 0; } count += 1; !// actually doing something useful groupByCounts.put(key, count); } } @gamussa | @ @tlberglund | #DEVnexus

if (counter!++ % sendInterval !== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | @ @tlberglund | #DEVnexus

if (counter!++ % sendInterval !== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | @ @tlberglund | #DEVnexus

if (counter!++ % sendInterval !== 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord!<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | @ @tlberglund | #DEVnexus

https://twitter.com/monitoring_king/status/1048264580743479296

LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK ITS GOOD, EXCEPT IT SUCKS @gamussa | @ @tlberglund | #DEVnexus

SO LET ME SHOW KAFKA STREAMS THAT WAY IT MIGHT BE REALLY GOOD @gamussa | @ @tlberglund | #DEVnexus

Talk is cheap! Show me code!

Every framework Wants to be when it grows up Scalable Elastic Stateful @gamussa Fault-tolerant Distributed | @tlberglund | #DEVnexus

@gamussa | @ @tlberglund | #DEVnexus

@

the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa | @tlberglund | #DEVnexus

App Not running inside brokers! Streams API @gamussa | @tlberglund | #DEVnexus

Same app, many instances App Streams API @gamussa | App Streams API @tlberglund App Streams API | Brokers? Nope! #DEVnexus

Before Processing Cluster Shared Database Your Job @gamussa | @tlberglund | #DEVnexus Dashboard

After Dashboard APP Streams API @gamussa | @tlberglund | #DEVnexus

this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT @gamussa | @tlberglund | #DEVnexus

So many places to run you app! …and many more… @gamussa | @tlberglund | #DEVnexus

Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing Runs Everywhere Supports Streams and Tables @gamussa | Elastic, Scalable, Fault-tolerant Exactly-Once Processing @tlberglund | Kafka Security Integration Event-Time Processing #DEVnexus

Talk is cheap! Show me code!

Talk is cheap! Show me code!

Do you think that’s a table you are querying ?

The Stream-Table Duality Table Alice €50 (balance) Stream (payments) Alice + €50 Alice €50 €50 Alice €75 €75 Alice €15 Bob €18 €18 Bob €18 Bob €18 Bob

  • €18 Alice
  • €25 Alice – €60 time @gamussa | @tlberglund | #DEVnexus

Talk is cheap! Show me code!

What’s next?

https://twitter.com/IDispose/status/1048602857191170054

Coding Sophistication Lower the bar to enter the world of streaming Core developers who use Java/Scala streams Core developers who don’t use Java/Scala Data engineers, architects, DevOps/SRE BI analysts User Population @gamussa | @tlberglund | #DEVnexus

KSQL #FTW ksql> 1 UI 2 @gamussa POST /query CLI | 3 @tlberglund REST | #DEVnexus 4 Headless

Interaction with Kafka KSQL (processing) JVM application Kafka with Kafka Streams (processing) (data) Does not run on Kafka brokers Does not run on Kafka brokers @gamussa | @tlberglund | #DEVnexus

Standing on the shoulders of Streaming Giants Ease of use KSQL Powered by KSQL Kafka Streams Powered by Producer, Consumer APIs @gamussa | Flexibility @tlberglund | #DEVnexus

One last thing…

https://kafka-summit.org Gamov30 @gamussa | @ @tlberglund | #DEVnexus

Thanks! @gamussa viktor@confluent.io @tlberglund tim@confluent.io We are hiring! https://www.confluent.io/careers/ @gamussa | @ @tlberglund | #DEVnexus