Stateful & Reactive Stream Processing Applications without a Database Apache Kafka Streams Spring Boot 2.0 ❤ @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

$ whoami " • Hans-Peter Grahsl • working & living in Graz • technical trainer at • independent consultant & engineer • associate lecturer • 2 irregular conference speaker @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

challenges in today's data architectures • rising number of apps producing + consuming data • need to integrate ever more data sources • heterogeneous environments all over the place • traditional technologies may struggle to cope with this 3 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

challenges may lead to a GIANT MESS 4 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Apache Kafka 5 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

STREAMING PLATFORM 6 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

much more than messaging • Apache Kafka is offering 3 key capabilities • publish / subscribe to streams of records • (permanently) store streams of records • process streams of records in near real-time fault-tolerance & horizontal scalability 7 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Producer API 8 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Consumer API 9 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Connect API 10 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Streams API 11 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Kafka Streams API • stream processing with a library only approach • lightweight applications • build however & deploy wherever you like • NO(!) additional clusters or frameworks e.g. • Processor API & Streams DSL • configurable delivery guarantees 12 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

writing applications NOT (!) managing clusters 13 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Meet KSQL for skyrocketing productivity 14 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

KSQL • SQL streaming engine for Kafka • concise & expressive • SQL-like language and semantics • NO(!) coding required • extremely low entry barrier • joins, aggregations, windowing • UD(A)Fs - UDTFs pending... • built on top of KStream API 15 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

16 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

central nervous system 17 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

example ? hmmmm... 18 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

example: near real-time Emoji Tracking

HOW TO build this? 21 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

store emoji tracking | step 1 ingest subset of public live tweets from Twitter 22 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

emoji tracking | step 2 process extract emojis - group & count them - maintain top N 23 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

query emoji tracking | step 3 single emoji count - all emoji counts - top N emojis 24 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

notify emoji tracking | step 4 consumable near-realtime change streams of updates 25 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

Let's do it! 26 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

example: step 1 ingest tweets • using Kafka Connect • e.g. this community connector https://github.com/jcustenborder/kafkaconnect-twitter • configure the connector (JSON) • manage connector via REST-like API create | pause | resume | delete | status 28 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

{ "name": "tweets-twitter-source", "config": { "connector.class": "c.g.j.k.c.t.TwitterSourceConnector", "twitter.oauth.accessToken": "...", "twitter.oauth.consumerSecret": ...", "twitter.oauth.consumerKey": "...", "twitter.oauth.accessTokenSecret": "...", "kafka.status.topic": "tweets", "process.deletes": false, "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "filter.keywords": "..." } } 29 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

! ! ! ! NO CODE! ! ! ! ! 31 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

example: step 2 process tweets • using Kafka Streams high-level DSL • grouping and counting emojis • updating top N emoji counts • map tweets to emoji occurrences • only a few lines of Java 32 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

calculate emoji counts • It all starts with tweets like this... !## ⛰ this is a twitter status text with five emojis 33 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

calculate emoji counts Key Value raw input ID !this is a twitter! status ⛰ text with ## five emojis extract emoji list ID [!,!,⛰,#,#] 34 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

calculate emoji counts Key Value flatten the list 35 ID ! ID ! ID ⛰ ID

ID

@hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

calculate emoji counts Key Value set keys to values ! "" ! "" ⛰ ""

""

"" finally group & count by key 36 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

result: continuously updated KTable with emoji counts 37 Key Value ! 2 ⛰ 1

2 ... ... @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

1:1 mapping to KStreams API KTable<String, Long> emojiCounts = tweets.map((id,tweet) -> KeyValue.pair(id, EmojiUtils...)) .flatMapValues(emojis -> emojis) .map((id,emoji) -> KeyValue.pair(emoji, "")) .groupByKey(...).count(...); @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

example: step 3 query results ! • access to state stores with interactive queries • KStreams offers all needed metadata • RPC integration left for developers

Reactive WebAPI powered by Spring Boot 2.0 < 40 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

REST controller @RestController @RequestMapping("interactive/queries/") @CrossOrigin(origins = "*") public class StateStoreController { private final StateStoreService service; [...] } 41 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

REST controller methods @GetMapping("emojis/{code}") public Mono<ResponseEntity<EmojiCount>> getEmoji(@PathVariable String code) { return service.querySingleEmojiCount(code); } @GetMapping("emojis") public Flux<EmojiCount> getEmojis() { return service.queryAllEmojiCounts(); } 42 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

state store access in service StreamsMetadata metadata = kafkaStreams.metadataForKey( "your-store-name", emoji, Serializer... ); 43 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

state store access in service if(itsMe.equals(metadata.hostInfo())) { ReadOnlyKeyValueStore<String,Long> kvStoreEmojiCounts = kafkaStreams.store("your-store-name", QueryableStoreTypes.keyValueStore()); Long count = kvStoreEmojiCounts.get(emoji); return Mono.just( new ResponseEntity<>(new EmojiCount(...),HttpStatus.OK) ); } 44 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

state store access in service String location = String.format("http://%s:%d/.../%s", metadata.host(),metadata.port(),emoji); return Mono.just( ResponseEntity.status(HttpStatus.FOUND) .location(URI.create(location)).build() ); 45 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

example: step 4 real-time notifications • reactively consume from changelog topics • stream any changes to clients using SSE > Project Reactor's reactor-kafka < 48 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

notifications via SSE @GetMapping(path = "emojis/updates/notify", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<EmojiCount> getEmojiCountsStream() { return service.consumeEmojiCountsStream(); } 49 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

! LIVE DASHBOARD !

mission accomplished 51 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

try it yourself! source https://github.com/hpgrahsl/voxxed-days-ticino-2018 slides https://speakerdeck.com/hpgrahsl/stateful-and-reactive-streamprocessing-applications-without-a-database-atvoxxedticino-2018 52 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland

THANK YOU Q&A ? 53 @hpgrahsl | #VDT18 #VoxxedDays Ticino, 20th October 2018, Switzerland