@rmoff #VoxxedMicroservices From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect
A presentation at Voxxed Microservices in October 2019 in Paris, France by Robin Moffatt
@rmoff #VoxxedMicroservices From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect
@rmoff #VoxxedMicroservices What is Kafka Connect? From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect
Look Ma, No Code! @rmoff #VoxxedMicroservices { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } https://docs.confluent.io/current/connect/ “connection.url”: From Zero to Hero with Kafka Connect
Streaming Pipelines @rmoff #VoxxedMicroservices Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Writing to data stores from Kafka App Kaf ka Con nec t Data Store From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Evolve processing from old systems to new Existing App New App <x> a k f Ka t c e n n o C RDBMS From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Demo http:!//rmoff.dev/kafka-connect-code From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Configuring Kafka Connect Inside the API - connectors, transforms, converters From Zero to Hero with Kafka Connect
Kafka Connect basics Source Kafka Connect @rmoff #VoxxedMicroservices Kafka From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Connectors Connector Source Kafka Connect Kafka From Zero to Hero with Kafka Connect
Connectors @rmoff #VoxxedMicroservices “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Connectors Connector Native data Connect Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf From Zero to Hero with Kafka Connect
The Confluent Schema Registry Avro Schema @rmoff #VoxxedMicroservices Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect From Zero to Hero with Kafka Connect
Converters @rmoff #VoxxedMicroservices key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector From Zero to Hero with Kafka Connect
What about internal converters? @rmoff #VoxxedMicroservices value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka From Zero to Hero with Kafka Connect
Single Message Transforms “config”: { @rmoff #VoxxedMicroservices Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform From Zero to Hero with Kafka Connect
Extensible Connector @rmoff #VoxxedMicroservices Transform(s) Converter From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Confluent Hub hub.confluent.io From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Deploying Kafka Connect Connectors, Tasks, and Workers From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices From Zero to Hero with Kafka Connect
Kafka Connect Standalone Worker @rmoff #VoxxedMicroservices S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets From Zero to Hero with Kafka Connect
“Scaling” the Standalone Worker @rmoff #VoxxedMicroservices JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets Fault-tolerant? Nope. From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Kafka Connect Distributed Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect
Scaling the Distributed Worker @rmoff #VoxxedMicroservices S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status From Zero to Hero with Kafka Connect
Multiple Distributed Clusters @rmoff #VoxxedMicroservices S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Config Config Status Status From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Containers From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Kafka Connect images on Docker Hub kafka-connect-elasticsearch kafka-connect-jdbc kafka-connect-hdfs […] confluentinc/cp-kafka-connect-base confluentinc/cp-kafka-connect From Zero to Hero with Kafka Connect
Adding connectors to a container @rmoff #VoxxedMicroservices Confluent Hub JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices At runtime kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker From Zero to Hero with Kafka Connect
Build a new image @rmoff #VoxxedMicroservices FROM confluentinc/cp-kafka-connect:5.2.1 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect
Automating connector creation @rmoff #VoxxedMicroservices
@rmoff #VoxxedMicroservices Troubleshooting Kafka Connect From Zero to Hero with Kafka Connect
Troubleshooting Kafka Connect @rmoff #VoxxedMicroservices Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect
Troubleshooting Kafka Connect @rmoff #VoxxedMicroservices curl -s “http:!//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java: 1018)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java: 748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java: 212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t!!… 3 more\n” From Zero to Hero with Kafka Connect
The log is the source of truth @rmoff #VoxxedMicroservices $ confluent log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Error Handling and Dead Letter Queues From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices org.apache.kafka.common.errors.SerializationException: Unknown magic byte! From Zero to Hero with Kafka Connect
Mismatched converters @rmoff #VoxxedMicroservices org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ Use the correct Converter for the source data From Zero to Hero with Kafka Connect
Mixed serialisation methods @rmoff #VoxxedMicroservices org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ Use error handling to deal with bad messages From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Error Handling and DLQ Handled Not Handled Convert Start -> read/write from Kafka -> [de]-serialisation Transform -> Connections to a data store Poll / Put -> Read/Write from/to data store* * can be retried by Connect https://cnfl.io/connect-dlq From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Re-processing the Dead Letter Queue Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices Metrics and Monitoring From Zero to Hero with Kafka Connect
REST API @rmoff #VoxxedMicroservices http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect
Confluent Control Center @rmoff #VoxxedMicroservices From Zero to Hero with Kafka Connect
Consumer lag @rmoff #VoxxedMicroservices From Zero to Hero with Kafka Connect
JMX @rmoff #VoxxedMicroservices From Zero to Hero with Kafka Connect
Fully Managed Kafka as a Service
@rmoff #VoxxedMicroservices Free Books! http://cnfl.io/book-bundle From Zero to Hero with Kafka Connect
@rmoff #VoxxedMicroservices #EOF 💬 Join the Confluent Community Slack group at http://cnfl.io/slack https://talks.rmoff.net