A presentation at Scale By the Bay by Viktor Gamov
Event Streaming with Kafka Streams Scale By The Bay 2020 / Online @gAmUssA | #bythebay | @confluentinc
@gamussa | #bythebay | @confluentinc
Preface
Stream Processing is the toolset for dealing with events as they move! @gamussa | #bythebay | @confluentinc
Java Apps with Kafka Streams or KSQL Serving Layer (Microservices, Elastic, etc.) Event Streaming Continuous platform Computation API based clustering @gamussa | #bythebay | @confluentinc
Apache Kafka® Event Streaming Platform 101
Event Streaming Platform Architecture Application Application Application KSQL Native Client library Kafka Streams Kafka Streams Load Balancer * REST Proxy Schema Registry Kafka Brokers @gamussa | Kafka Connect Zookeeper Nodes @ #bythebay | @confluentinc
The log is a simple idea New Old Messages are added at the end of the log @gamussa | #bythebay | @confluentinc
Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan Viktor is here @gamussa | #bythebay | @confluentinc Scan
Consumers have a position all of their own Ricardo is here Scan New Old Robin is here Scan @gamussa | #bythebay | @confluentinc Viktor is here Scan
Shard data to get scalability Producer (1) Cluster of machines Producer (2) Producer (3) Messages are sent to different partitions Partitions live on different machines @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
// in-memory store, not persistent Map<String, Integer> groupByCounts = new HashMap<>(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties()); KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProperties())) { consumer.subscribe(Arrays.asList(“A”, “B”)); while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
while (true) { // consumer poll loop ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); Integer count = groupByCounts.get(key); if (count == null) { count = 0; } count += 1; // actually doing something useful groupByCounts.put(key, count); } } @gamussa | #bythebay | @confluentinc
if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc
if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc
if (counter++ % sendInterval == 0) { for (Map.Entry<String, Integer> groupedEntry : groupByCounts.entrySet()) { ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>(“group-by-counts”, groupedEntry.getKey(), groupedEntry.getValue()); producer.send(producerRecord); } consumer.commitSync(); } } } @gamussa | #bythebay | @confluentinc
https://twitter.com/monitoring_king/status/1048264580743479296
LET’S TALK ABOUT THIS FRAMEWORK OF YOURS. I THINK@gamussa ITS GOOD, EXCEPT IT SUCKS | #bythebay | @confluentinc
SO LET ME SHOW KAFKA STREAMS @gamussa | #bythebay BE | @confluentinc THAT WAY IT MIGHT REALLY GOOD
Talk is cheap! Show me code!
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream<String, Long> stream = streamsBuilder.stream(Arrays.asList(“A”, “B”)); // actual work stream.groupByKey() .count() .toStream() .to(“group-by-counts”, Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = streamsBuilder.build(); final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProperties()); kafkaStreams.start(); @gamussa | #bythebay | @confluentinc
@gamussa | #bythebay | @confluentinc
Every framework Wants to be when it grows up Scalable Elastic Stateful Fault-tolerant Distributed @gamussa | #bythebay | @confluentinc
Where do I put my compute? @gAmUssA | #bythebay | @confluentinc
Where do I put my state? @gAmUssA | #bythebay | @confluentinc
The actual question is Where is my code? @gAmUssA | #bythebay | @confluentinc
the KAFKA STREAMS API is a JAVA API to BUILD REAL-TIME APPLICATIONS @gamussa | #bythebay | @confluentinc
App Streams API Not running inside brokers! @gamussa | #bythebay | @confluentinc
Same app, many instances App App App Streams API Streams API Streams API @gamussa | #bythebay | @confluentinc Brokers? Nope!
Before Processing Cluster Shared Database Your Job @gamussa | #bythebay | @confluentinc Dashboard
After Dashboard APP Streams API @gamussa | #bythebay | @confluentinc
this means you can DEPLOY your app ANYWHERE using WHATEVER TECHNOLOGY YOU WANT @gamussa | #bythebay | @confluentinc
So many places to run you app! …and many more… @gamussa | #bythebay | @confluentinc
Things Kafka Stream Does Enterprise Support Open Source Powerful Processing incl. Filters, Transforms, Joins, Aggregations, Windowing Runs Everywhere Supports Streams and Tables Elastic, Scalable, Fault-tolerant Exactly-Once Processing @gamussa | #bythebay | @confluentinc Kafka Security Integration Event-Time Processing
Want to learn more? @gamussa | #bythebay | @confluentinc
Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io Watch full version https://gamov.dev/developer
@gamussa | #bythebay | @confluentinc
All things change constantly! And dealing with constantly changing data at low latency is pretty hard. It doesn’t need to be that way. Kafka has become a de-facto standard for ingesting event-based data and is considered the central nervous system for data in many organizations top of Kafka. It allows you to transform streams, perform analysis, and stateful operations on the incoming data. In this presentation, Viktor will cover: • A quick intro into Kafka 101. • What is Kafka Streams. • How Kafka Streams enables/simplifies event-based processing.
By the end of this presentation, you will understand the basics of Kafka Streams and how you can start using it to implement event streaming applications!
Here’s what was said about this presentation on social media.
wanna learn what @kafkastreams is about? Join me today on "#EventStreaming with @KafkaStreams at @ScaleByTheBay 2020 https://t.co/0WON81Khdb #bythebay
— Viktor Gamov @ 🛋 🏡 🗽 (@gAmUssA) November 12, 2020