Go Big With Apache Kafka Lorna Mitchell, Aiven
A presentation at Conf42: Golang 2021 in June 2021 in by Lorna Jane Mitchell
Go Big With Apache Kafka Lorna Mitchell, Aiven
Apache Kafka “Apache Kafka is an open-source distributed event streaming platform” - https://kafka.apache.org • Designed for data streaming • Real-time data for finance and industry • Very scalable to handle large datasets @lornajane
Modern, scalable, and fast …. remind you of anything? @lornajane
Kafka is a Log We know about logs • append-only • immutable Producers send data. Consumers fetch it. @lornajane
Topics and Partitions • Topic is sharded across partitions • Partition defined by key (usually) • One consumer per consumer group per topic partition @lornajane
Consumer Groups • Different apps can read the same data • Consumers belong to a “consumer group” • One consumer per consumer group per topic partition @lornajane
Replication Factors • A topic has a “replication factor”, this is how many copies of it will exist. • Replication works at the partition level. @lornajane
Let’s Go https://github.com/aiven/thingum-industries/
Let’s Go Three libraries in today’s examples: • confluentinc/confluent-kafka-go • riferrei/srclient • actgardner/gogen-avro @lornajane
Example: Factory Sensors Imaginary IoT application “Thingum Industries” Which machine? What reading? { “machine”: “MagicMaker3000”, “sensor”: “oven_temp”, “value”: 231, “units”: “C” } @lornajane
Example: Producer Connect to the broker 1 2 3 4 5 6 7 8 9 10 11 p, err := kafka.NewProducer(&kafka.ConfigMap{ “bootstrap.servers”: os.Getenv(“BROKER_URI”), “security.protocol”: “SSL”, “ssl.ca.location”: “../ca.pem”, “ssl.certificate.location”: “../service.cert”, “ssl.key.location”: “../service.key”, }) if err != nil { panic(err) } defer p.Close() @lornajane
Example: Producer Send a record to a topic 1 2 3 4 5 6 7 8 9 10 11 12 mySensorReading := avro.MachineSensor{ Machine: “MagicMaker4000”, Sensor: “oven_temp”, Value: (100*rand.Float32() + 150), Units: “C”, } reading, _ := json.Marshal(mySensorReading) p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, Partition: kafka.PartitionAny}, Value: reading}, nil) @lornajane
Example: Consumer Read from a topic 1 2 3 4 5 6 7 8 9 10 11 12 13 c, err := kafka.NewConsumer(&kafka.ConfigMap{ “bootstrap.servers”: os.Getenv(“BROKER_URI”), “group.id”: “CG1”, “auto.offset.reset”: “earliest”, }) c.SubscribeTopics([]string{topic}, nil) for { msg, _ := c.ReadMessage(-1) fmt.Printf(“Message on %s: %s\n”, msg.TopicPartition, string(msg.Value)) } c.Close() @lornajane
Kafka Tooling Kafka itself ships with some useful shell scripts
Kafkacat: CLI Tool https://github.com/edenhill/kafkacat @lornajane
Kafdrop: Web UI https://github.com/obsidiandynamics/kafdrop @lornajane
Cloud Hosted Tools @lornajane
Kafka and Schemas
Schemas Schemas are great! • specify and enforce data format • required by compression formats, e.g. Protobuf, Avro Our favourite strongly-typed language really likes schemas @lornajane
Avro Schemas • Avro format requires a schema • message has schema version information • used to look up fieldnames and reconstruct payload • Schema Registry holds the schema versions for each topic @lornajane
Evolving Schemas • Aim for backwards-compatible changes • to rename: add the new field, keep the old one • safe to add optional fields • Each change is a new version • Avro supports aliases and default values @lornajane
Example: Avro Schema Avro schema example for sensor data { “namespace”: “io.aiven.example”, “type”: “record”, “name”: “MachineSensor”, “fields”: [ {“name”: “machine”, “type”: “string”, “doc”: “The machine whose sensor this is”}, {“name”: “sensor”, “type”: “string”, “doc”: “Which sensor was read”}, {“name”: “value”, “type”: “float”, “doc”: “Sensor reading”}, {“name”: “units”, “type”: “string”, “doc”: “Measurement units”} ] } @lornajane
GoGen makes Avro Structs gogen-avro avro machine_sensor.avsc type MachineSensor struct { // The machine whose sensor this is Machine string json:"machine"
// Which sensor was read Sensor string json:"sensor"
// Sensor reading Value float32 json:"value"
// Measurement units Units string json:"units"
}
@lornajane
Example: Avro Producer Add awareness of the schema registry, and turn the struct into bytes to add into the payload… 1 2 3 4 5 6 7 8 9 10 11 12 schemaRegistryClient := srclient.CreateSchemaRegistryClient(uri) schema, err := schemaRegistryClient.GetLatestSchema(topic, false) mySensorReading := avro.MachineSensor{ Machine: “MagicMaker4000”, Sensor: “oven_temp”, Value: (100*rand.Float32() + 150), Units: “C”,} var valueBytesBuffer bytes.Buffer mySensorReading.Serialize(&valueBytesBuffer) valueBytes := valueBytesBuffer.Bytes() @lornajane
Example: Avro Producer … then add the schema info, assemble and send. 1 2 3 4 5 6 7 8 9 10 11 12 schemaIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID())) var recordValue []byte recordValue = append(recordValue, byte(0)) recordValue = append(recordValue, schemaIDBytes…) recordValue = append(recordValue, valueBytes…) p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, Partition: kafka.PartitionAny}, Value: recordValue}, nil) @lornajane
Describing Payloads
AsyncAPI for Apache Kafka AsyncAPI describes event-driven architectures https://www.asyncapi.com We can describe the: • brokers and auth • topics • payloads @lornajane
Describing Payloads The channels section of the AsyncAPI document factorysensor: subscribe: operationId: MachineSensor summary: Data from the in-machine sensors bindings: kafka: clientId: type: string message: name: sensor-reading title: Sensor Reading schemaFormat: “application/vnd.apache.avro;version=1.9.0” payload: $ref: machine_sensor.avsc @lornajane
Documenting Payloads @lornajane
Go with Apache Kafka
Resources • Repo: https://github.com/aiven/thingum-industries • Aiven: https://aiven.io (free trial!) • Karapace: https://karapace.io • AsyncAPI: https://asyncapi.com • Me: https://lornajane.net @lornajane