Go Big with Apache Kafka

A presentation at Conf42: Golang 2021 in June 2021 in by Lorna Jane Mitchell

Slide 1

Slide 1

Go Big With Apache Kafka Lorna Mitchell, Aiven

Slide 2

Slide 2

Apache Kafka “Apache Kafka is an open-source distributed event streaming platform” - https://kafka.apache.org • Designed for data streaming • Real-time data for finance and industry • Very scalable to handle large datasets @lornajane

Slide 3

Slide 3

Modern, scalable, and fast …. remind you of anything? @lornajane

Slide 4

Slide 4

Kafka is a Log We know about logs • append-only • immutable Producers send data. Consumers fetch it. @lornajane

Slide 5

Slide 5

Topics and Partitions • Topic is sharded across partitions • Partition defined by key (usually) • One consumer per consumer group per topic partition @lornajane

Slide 6

Slide 6

Consumer Groups • Different apps can read the same data • Consumers belong to a “consumer group” • One consumer per consumer group per topic partition @lornajane

Slide 7

Slide 7

Replication Factors • A topic has a “replication factor”, this is how many copies of it will exist. • Replication works at the partition level. @lornajane

Slide 8

Slide 8

Let’s Go https://github.com/aiven/thingum-industries/

Slide 9

Slide 9

Let’s Go Three libraries in today’s examples: • confluentinc/confluent-kafka-go • riferrei/srclient • actgardner/gogen-avro @lornajane

Slide 10

Slide 10

Example: Factory Sensors Imaginary IoT application “Thingum Industries” Which machine? What reading? { “machine”: “MagicMaker3000”, “sensor”: “oven_temp”, “value”: 231, “units”: “C” } @lornajane

Slide 11

Slide 11

Example: Producer Connect to the broker 1 2 3 4 5 6 7 8 9 10 11 p, err := kafka.NewProducer(&kafka.ConfigMap{ “bootstrap.servers”: os.Getenv(“BROKER_URI”), “security.protocol”: “SSL”, “ssl.ca.location”: “../ca.pem”, “ssl.certificate.location”: “../service.cert”, “ssl.key.location”: “../service.key”, }) if err != nil { panic(err) } defer p.Close() @lornajane

Slide 12

Slide 12

Example: Producer Send a record to a topic 1 2 3 4 5 6 7 8 9 10 11 12 mySensorReading := avro.MachineSensor{ Machine: “MagicMaker4000”, Sensor: “oven_temp”, Value: (100*rand.Float32() + 150), Units: “C”, } reading, _ := json.Marshal(mySensorReading) p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, Partition: kafka.PartitionAny}, Value: reading}, nil) @lornajane

Slide 13

Slide 13

Example: Consumer Read from a topic 1 2 3 4 5 6 7 8 9 10 11 12 13 c, err := kafka.NewConsumer(&kafka.ConfigMap{ “bootstrap.servers”: os.Getenv(“BROKER_URI”), “group.id”: “CG1”, “auto.offset.reset”: “earliest”, }) c.SubscribeTopics([]string{topic}, nil) for { msg, _ := c.ReadMessage(-1) fmt.Printf(“Message on %s: %s\n”, msg.TopicPartition, string(msg.Value)) } c.Close() @lornajane

Slide 14

Slide 14

Kafka Tooling Kafka itself ships with some useful shell scripts

Slide 15

Slide 15

Kafkacat: CLI Tool https://github.com/edenhill/kafkacat @lornajane

Slide 16

Slide 16

Kafdrop: Web UI https://github.com/obsidiandynamics/kafdrop @lornajane

Slide 17

Slide 17

Cloud Hosted Tools @lornajane

Slide 18

Slide 18

Kafka and Schemas

Slide 19

Slide 19

Schemas Schemas are great! • specify and enforce data format • required by compression formats, e.g. Protobuf, Avro Our favourite strongly-typed language really likes schemas @lornajane

Slide 20

Slide 20

Avro Schemas • Avro format requires a schema • message has schema version information • used to look up fieldnames and reconstruct payload • Schema Registry holds the schema versions for each topic @lornajane

Slide 21

Slide 21

Evolving Schemas • Aim for backwards-compatible changes • to rename: add the new field, keep the old one • safe to add optional fields • Each change is a new version • Avro supports aliases and default values @lornajane

Slide 22

Slide 22

Example: Avro Schema Avro schema example for sensor data { “namespace”: “io.aiven.example”, “type”: “record”, “name”: “MachineSensor”, “fields”: [ {“name”: “machine”, “type”: “string”, “doc”: “The machine whose sensor this is”}, {“name”: “sensor”, “type”: “string”, “doc”: “Which sensor was read”}, {“name”: “value”, “type”: “float”, “doc”: “Sensor reading”}, {“name”: “units”, “type”: “string”, “doc”: “Measurement units”} ] } @lornajane

Slide 23

Slide 23

GoGen makes Avro Structs gogen-avro avro machine_sensor.avsc type MachineSensor struct { // The machine whose sensor this is Machine string json:"machine" // Which sensor was read Sensor string json:"sensor" // Sensor reading Value float32 json:"value" // Measurement units Units string json:"units" } @lornajane

Slide 24

Slide 24

Example: Avro Producer Add awareness of the schema registry, and turn the struct into bytes to add into the payload… 1 2 3 4 5 6 7 8 9 10 11 12 schemaRegistryClient := srclient.CreateSchemaRegistryClient(uri) schema, err := schemaRegistryClient.GetLatestSchema(topic, false) mySensorReading := avro.MachineSensor{ Machine: “MagicMaker4000”, Sensor: “oven_temp”, Value: (100*rand.Float32() + 150), Units: “C”,} var valueBytesBuffer bytes.Buffer mySensorReading.Serialize(&valueBytesBuffer) valueBytes := valueBytesBuffer.Bytes() @lornajane

Slide 25

Slide 25

Example: Avro Producer … then add the schema info, assemble and send. 1 2 3 4 5 6 7 8 9 10 11 12 schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) var recordValue []byte recordValue = append(recordValue, byte(0)) recordValue = append(recordValue, schemaIDBytes…) recordValue = append(recordValue, valueBytes…) p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, Partition: kafka.PartitionAny}, Value: recordValue}, nil) @lornajane

Slide 26

Slide 26

Describing Payloads

Slide 27

Slide 27

AsyncAPI for Apache Kafka AsyncAPI describes event-driven architectures https://www.asyncapi.com We can describe the: • brokers and auth • topics • payloads @lornajane

Slide 28

Slide 28

Describing Payloads The channels section of the AsyncAPI document factorysensor: subscribe: operationId: MachineSensor summary: Data from the in-machine sensors bindings: kafka: clientId: type: string message: name: sensor-reading title: Sensor Reading schemaFormat: “application/vnd.apache.avro;version=1.9.0” payload: $ref: machine_sensor.avsc @lornajane

Slide 29

Slide 29

Documenting Payloads @lornajane

Slide 30

Slide 30

Go with Apache Kafka

Slide 31

Slide 31

Resources • Repo: https://github.com/aiven/thingum-industries • Aiven: https://aiven.io (free trial!) • Karapace: https://karapace.io • AsyncAPI: https://asyncapi.com • Me: https://lornajane.net @lornajane