Deploying Kafka Streams Applications with Docker and Kubernetes

A presentation at Portland JUG in March 2019 in Portland, OR, USA by Viktor Gamov

Slide 1

Slide 1

Deploying Kafka Streams Applications with Docker and Kubernetes March, 2019 / Portland, OR @gamussa | @pjug | @ConfluentINc

Slide 2

Slide 2

2 @gamussa | @PJUG | @ConfluentINc

Slide 3

Slide 3

Raffle, yeah 🚀 Follow @gamussa 📸🖼🏋 Tag @gamussa With @pjug @confluentinc

Slide 4

Slide 4

4 Special thanks! @gwenshap @gamussa @MatthiasJSax | @PJUG | @ConfluentINc

Slide 5

Slide 5

5 Agenda Kafka Streams 101 How do Kafka Streams applications scale? Kubernetes 101 Recommendations for Kafka Streams @gamussa | @pjug | @ConfluentINc

Slide 6

Slide 6

https://gamov.dev/ks-k8s-stocks @gamussa | @PJUG | @ConfluentINc

Slide 7

Slide 7

7 Kafka Streams – 101 Your App @gamussa | @PJUG | @ConfluentINc Other Systems Kafka Connect Kafka Connect Other Systems Kafka Streams

Slide 8

Slide 8

8 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc

Slide 9

Slide 9

9 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc

Slide 10

Slide 10

10 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc

Slide 11

Slide 11

11 Stock Trade Stats Example KStream<String, Trade> source = builder.stream(STOCK_TOPIC); KStream<Windowed<String>, TradeStats> stats = source .groupByKey() .windowedBy(TimeWindows.of(5000).advanceBy(1000)) .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), Materialized.<~>as(“trade-aggregates”) .withValueSerde(new TradeStatsSerde())) .toStream() .mapValues(TradeStats::computeAvgPrice); stats.to(STATS_OUT_TOPIC, Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); @gamussa | @PJUG | @ConfluentINc

Slide 12

Slide 12

12 Topologies builder.stream Source Node state stores source.groupByKey .windowedBy(…) .aggregate(…) Processor Node mapValues() Processor Node to(…) streams Sink Node @gamussa | @PJUG | @ConfluentINc Processor Topology

Slide 13

Slide 13

How Do Kafka Streams Application Scale? @gamussa | @PJUG | @ConfluentINc

Slide 14

Slide 14

14 Partitions, Tasks, and Consumer Groups input topic Task executes processor topology One consumer group: can be executed with 1 - 4 threads on 1 - 4 machines 4 input topic partitions => 4 tasks result topic @gamussa | @PJUG | @ConfluentINc

Slide 15

Slide 15

15 Scaling with State “no state” Trade Stats App Instance 1 @gamussa | @PJUG | @ConfluentINc

Slide 16

Slide 16

Scaling with State “no state” Trade Stats App Trade Stats App Instance 1 @gamussa Instance 2 | @PJUG | @ConfluentINc 16

Slide 17

Slide 17

17 Scaling with State “no state” Trade Stats App Trade Stats App Instance 1 @gamussa Trade Stats App Instance 2 | @PJUG | @ConfluentINc Instance 3

Slide 18

Slide 18

18 Scaling and FaultTolerance Two Sides of Same Coin @gamussa | @pjug | @ConfluentINc

Slide 19

Slide 19

19 Fault-Tolerance Trade Stats App Trade Stats App Instance 1 @gamussa | Instance 2 @PJUG | Trade Stats App @ConfluentINc Instance 3

Slide 20

Slide 20

20 Fault-Tolerant State State Updates Input Topic Changelog Topic Result Topic @gamussa | @PJUG | @ConfluentINc

Slide 21

Slide 21

21 Migrate State Trade Stats App Instance 1 Trade Stats App Instance 2 restore @gamussa | Changelog Topic @PJUG | @ConfluentINc

Slide 22

Slide 22

22 Recovery Time • Changelog topics are log compacted • Size of changelog topic linear in size of state • Large state implies high recovery times @gamussa | @pjug | @ConfluentINc

Slide 23

Slide 23

23 Recovery Overhead Changelog topic State size: 20 GB (per shard) Active Segment Topic size can grow larger if not compacted Segments (default size 1GB) After compaction Min Topic Size: 21 GB (per shard) Active Segment Segments (default size 1GB) @gamussa Recovery overhead about 5% | @PJUG | @ConfluentINc

Slide 24

Slide 24

24 Recovery Overhead Changelog topic State size: 100 MB (per shard) Active Segment Segments (default size 1GB) Min Topic Size: 1.1 GB Compaction Recovery overhead about 1000% Active Segment Each key is stored up to 11 times… Segment (only 100 MB) @gamussa | @PJUG | @ConfluentINc

Slide 25

Slide 25

25 Recovery Overhead • Recovery overhead is proportional to ○ segment-size / state-size • Segment-size is smaller than state-size => reduced overhead • Update changelog topic segment size accordingly ○ topic config: log.segments.bytes ○ log cleaner interval important, too @gamussa | @pjug | @ConfluentINc

Slide 26

Slide 26

26 Kubernetes Fundamentals @gamussa | @PJUG | @ConfluentINc

Slide 27

Slide 27

27 https://twitter.com/sahrizv/status/1018184792611827712 @gamussa | @PJUG | @ConfluentINc

Slide 28

Slide 28

28 Orchestration ●Compute ●Networking ●Storage ●Service Discovery @gamussa | @PJUG | @ConfluentINc

Slide 29

Slide 29

29 Kubernetes ●Schedules and allocates resources ●Networking between Pods ●Storage ●Service Discovery @gamussa | @PJUG | @ConfluentINc

Slide 30

Slide 30

30 Refresher - Kubernetes Architecture kubectl https://thenewstack.io/kubernetes-an-overview/ @gamussa | @PJUG | @ConfluentINc

Slide 31

Slide 31

31 Pod • Basic Unit of Deployment in Kubernetes • A collection of containers sharing: • Namespace • Network • Volumes @gamussa | @pjug | @ConfluentINc

Slide 32

Slide 32

32 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • Both PV and PVC are ‘resources’ @gamussa | @pjug | @ConfluentINc

Slide 33

Slide 33

33 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • PV is a piece of storage that is provisioned dynamic or static of any individual pod that uses the PV @gamussa | @pjug | @ConfluentINc

Slide 34

Slide 34

34 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • PVC is a request for storage by a User @gamussa | @pjug | @ConfluentINc

Slide 35

Slide 35

35 Storage • Persistent Volume (PV) & Persistent Volume Claim (PVC) • PVCs consume PV @gamussa | @pjug | @ConfluentINc

Slide 36

Slide 36

36 Stateful Workloads @gamussa | @PJUG | @ConfluentINc

Slide 37

Slide 37

37 StatefulSet ● Rely on Headless Headless Service Service to provide network identity ● Ideal for highly available stateful Pod-0 Pod-1 Pod-2 Containers Containers Containers Volumes Volumes Volumes workloads @gamussa | @pjug | @ConfluentINc

Slide 38

Slide 38

Recommendations for Kafka Streams @gamussa | @PJUG | @ConfluentINc

Slide 39

Slide 39

39 Stock Stats App Stock Stats App Stock Stats App Kafka Streams Kafka Streams Kafka Streams Instance 1 Instance 2 Instance 3 @gamussa | @PJUG | @ConfluentINc

Slide 40

Slide 40

40 WordCount App WordCount App WordCount App Kafka Streams Kafka Streams Kafka Streams Instance 1 Instance 2 Instance 3 @gamussa | @PJUG | @ConfluentINc

Slide 41

Slide 41

41 StatefulSets are new and complicated. We don’t need them. @gamussa | @PJUG | @ConfluentINc

Slide 42

Slide 42

42 Recovering state takes time. Statelful is faster. @gamussa | @PJUG | @ConfluentINc

Slide 43

Slide 43

43 But I’ll want to scale-out and back anyway. @gamussa | @PJUG | @ConfluentINc

Slide 44

Slide 44

44 @gamussa | @PJUG | @ConfluentINc

Slide 45

Slide 45

45 I don’t really trust my storage admin anyway @gamussa | @PJUG | @ConfluentINc

Slide 46

Slide 46

46 Recommendations: ● Keep changelog shards small ● If you trust your storage: Use StatefulSets ● Use anti-affinity when possible ● Use “parallel” pod management

Slide 47

Slide 47

apiVersion: apps/v1 kind: “Deployment” metadata: name: “streams-stock-stats” spec: replicas: 1 template: spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - “streams-stock-stats” topologyKey: “kubernetes.io/hostname” containers: - name: kafka-streams-stockstat image: gamussa/kafka-streams-stockstat:latest @gamussa | @PJUG | @ConfluentINc 47

Slide 48

Slide 48

apiVersion: apps/v1 kind: “Deployment” metadata: name: “streams-stock-stats” spec: replicas: 1 template: spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - “streams-stock-stats” topologyKey: “kubernetes.io/hostname” containers: - name: kafka-streams-stockstat image: gamussa/kafka-streams-stockstat:latest @gamussa | @PJUG | @ConfluentINc 48

Slide 49

Slide 49

apiVersion: apps/v1 kind: “Deployment” metadata: name: “streams-stock-stats” spec: replicas: 1 template: spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - “streams-stock-stats” topologyKey: “kubernetes.io/hostname” containers: - name: kafka-streams-stockstat image: gamussa/kafka-streams-stockstat:latest @gamussa | @PJUG | @ConfluentINc 49

Slide 50

Slide 50

apiVersion: apps/v1 kind: StatefulSet spec: serviceName: “streams-stock-stats” replicas: 2 podManagementPolicy: “Parallel” template: spec: containers: - name: kafka-streams-stockstat image: kafka-streams-stockstat:latest volumeMounts: - name: rocksdb mountPath: /var/lib/kafka-streams volumeClaimTemplates: - metadata: name: rocksdb spec: accessModes: [ “ReadWriteOnce” ] resources: requests: storage: 1Gi @gamussa | @PJUG | @ConfluentINc 50

Slide 51

Slide 51

apiVersion: apps/v1 kind: StatefulSet spec: serviceName: “streams-stock-stats” replicas: 2 podManagementPolicy: “Parallel” template: spec: containers: - name: kafka-streams-stockstat image: kafka-streams-stockstat:latest volumeMounts: - name: rocksdb mountPath: /var/lib/kafka-streams volumeClaimTemplates: - metadata: name: rocksdb spec: accessModes: [ “ReadWriteOnce” ] resources: requests: storage: 1Gi @gamussa | @PJUG | @ConfluentINc 51

Slide 52

Slide 52

apiVersion: apps/v1 kind: StatefulSet spec: serviceName: “streams-stock-stats” replicas: 2 podManagementPolicy: “Parallel” template: spec: containers: - name: kafka-streams-stockstat image: kafka-streams-stockstat:latest volumeMounts: - name: rocksdb mountPath: /var/lib/kafka-streams volumeClaimTemplates: - metadata: name: rocksdb spec: accessModes: [ “ReadWriteOnce” ] resources: requests: storage: 1Gi @gamussa | @PJUG | @ConfluentINc 52

Slide 53

Slide 53

53 🛑 Stop! Demo time! @gamussa | @pjug | @ConfluentINc

Slide 54

Slide 54

54 Summary Kafka Streams has recoverable state, that gives streams apps easy elasticity and high availability Kubernetes makes it easy to scale applications It also has StatefulSets for applications with state @gamussa | @pjug | @ConfluentINc

Slide 55

Slide 55

55 Summary Now you know how to deploy Kafka Streams on Kubernetes and take advantage on all the scalability and highavailability capabilities @gamussa | @pjug | @ConfluentINc

Slide 56

Slide 56

56 But what about Kafka itself? @gamussa | @pjug | @ConfluentINc

Slide 57

Slide 57

57 Confluent Operator Automate provisioning Scale your Kafkas and CP clusters elastically Monitor SLAs through Confluent Control Center or Prometheus Operate at scale with enterprise support from Confluent @gamussa | @PJUG | @ConfluentINc

Slide 58

Slide 58

58 Resources and Next Steps https://cnfl.io/helm_video https://cnfl.io/cp-helm https://cnfl.io/k8s https://slackpass.io/confluentcommunity #kubernetes @gamussa | @PJUG | @ConfluentINc

Slide 59

Slide 59

59 One more thing…

Slide 60

Slide 60

https://kafka-summit.org Gamov30 @gamussa | @ @tlberglund | #DEVnexus

Slide 61

Slide 61

Thanks! @gamussa viktor@confluent.io We are hiring! https://www.confluent.io/careers/ @gamussa | @pjug @ | @ConfluentINc

Slide 62

Slide 62

62