Contents

Running Kafka Connect Cluster as a Native Kubernetes Application

Nowadays, it is clear to probably everybody — Kubernetes is the de-facto standard if we are talking about orchestration. There are good reasons for this: it creates an abstraction layer between the system we want to run and underlying hardware or cloud resources.

In this post, I’ll try to describe the way to run a Kafka Connect cluster on the Kubernetes cluster. But not only run: I’ll try to have Kafka Connect as a native Kubernetes application.

Features of a native Kubernetes application

Let’s try to enumerate the native Kubernetes application features:

  1. The application is specifically designed to run on Kubernetes.
  2. The application is entirely managed by Kubernetes APIs, so one can use any tool that “talks” to the API — the first and probably most important example is kubectl.
  3. To fulfil the above requirement, the application provides the Custom Resource Definition and a controller that “understands” the logic from this CRD. The CRDs and the controller make up something called the Kubernetes Operator.
  4. The Operator can create and manage the application using Kubernetes Manifests.
  5. The Operator can manage the application declaratively — so they can use the GitOps model.

Let’s try to deploy the Kafka Connect cluster and check if it fulfils those requirements.

Requirements

Imagine you are the DevOps Engineer in some company. The company uses the Kubernetes cluster to run the services. There is also a MySQL database as a part of the system.

One day, the CTO comes to you and asks you to get all records from the MySQL database into Kafka Cluster so the company can run some analytics software that consumes Kafka messages and creates some statistics. As an additional requirement, the developers want to use Schema Registry and protobuf serialization. This sounds like a perfect job for Kafka Connect!

Prerequisites

I’m assuming we already have the k8s cluster. The Kafka cluster and the MySQL run on k8s. We also use the GitOps model to deploy the applications on the Kubernetes cluster.

In this example, I use the FluxCD as a continuous delivery tool which supports GitOps and the Strimzi Kafka Operator to deploy the Kafka cluster, but one can use any other tools, for example ArgoCD and MSK (the AWS managed Kafka cluster). I use the MySQL image from the Debezium tutorial.

You can find the GitOps repository that describes the whole system here. There are a lot of simplifications there but this article is not supposed to answer The Ultimate Question of Life, the Universe, and Everything ;)

The Plan

It turns out the Strimzi Kafka Operator provides several CRDs. Obviously, in the first place is kafka — the object that describes the Kafka Cluster as k8s manifest. But besides Kafka itself, we can also manage such objects as… kafkaconnects or kafkaconnectors!

The latter is especially interesting: normally, when we run a Kafka Connect cluster in a distributed mode, we have to configure the connectors by performing some REST API calls. It spoils our all-is-defined-declaratively world. But the kafkaconnector CRD solves this issue. Stay tuned!

We are on Kubernetes, so everything is inside containers. We will also need a Docker image with all connectors and libraries we need inside.

So the plan is:

  1. Create a proper docker image.
  2. Deploy the Schema Registry.
  3. Create a kafkaconnect manifest.
  4. Create a kafkaconnector manifest.
  5. Make coffee and watch how everything is working.

The Docker image

The goal is to read records from the mysql database. We’ll use use the Debezium connector which reads data from mysql db using the technique called Change Data Capture (CDC). Technically the connector is a part of the mysql system as one of the followers.

The procedure is described in the Strimzi documentation, so one can use it as a reference. Basically we want to add all plugins and libs to the already existing base image provided by Strimzi.

Let’s create a Dockerfile:

FROM quay.io/strimzi/kafka:latest-kafka-3.0.0
USER root:root
COPY ./plugins/ /opt/kafka/plugins/
USER 1001

The plugins directory contains all connectors and converters we need. In our case the requirement was to read from mysql and use the Schema Registry with protobuf serialisation, so we need the debezium mysql plugin and the protobuf converter.

Let’s build the image:

docker build -t gkocur/kafka-connect-debezium-protobuf:1 .

and push it to the docker hub:

docker push gkocur/kafka-connect-debizium-protobuf:1

Schema Registry

I’ll use the helm chart provided as a part to Confluent Platform. There is only the binary helm package with the whole platform, but I use flux, co I’m able to install the helm chart directly from the source.

Let’s create the GitSource:

apiVersion: source.toolkit.fluxcd.io/v1beta1
kind: GitRepository
metadata:
  name: confluent
  namespace: flux-system
spec:
  interval: 1m0s
  ref:
    branch: master
  url: https://github.com/confluentinc/cp-helm-charts.git

And the HelmRelease which is installed from this source:

apiVersion: helm.toolkit.fluxcd.io/v2beta1
kind: HelmRelease
metadata:
  name: schema-registry
  namespace: kafka
spec:
  interval: 1m
  chart:
    spec:
      chart: ./charts/cp-schema-registry
      sourceRef:
        kind: GitRepository
        name: confluent
        namespace: flux-system
  releaseName: schema-registry
  values:
    kafka:
      enabled: false
      bootstrapServers: kafka-cluster-kafka-bootstrap:9092
      fullnameOverride: sr

The GitSource and HelmRelease are CRDs provided by FluxCD. If you are not familiar with it — try it, it’s a great piece of software!

Create Kafkaconnect

Let’s prepare the kafkaconnect object which uses the custom docker image.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: gkocur/kafka-connect-debezium-protobuf:1
  replicas: 1
  bootstrapServers: kafka-cluster-kafka-bootstrap:9092
  config:
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
  externalConfiguration:
    volumes:
      - name: connector-config
        secret:
          secretName: mysql-credentials

A few comments about it:

  1. Note the annotation strimzi.io/use-connector-resource: "true". It tells the connector to manage connectors only defined as kafkaconnector manifest! If you create a connector with REST API and this annotation is present and set to true such connector will be deleted.
  2. The config.providers, config.providers.file.class, and externalConfiguation — it allows us to provide a credentials in native k8s way — as a secret which is mounted inside connect pods so we can use it inside the connector configuration. The common and secure way to pass secrets.

After adding this manifest to the gitops repository we have the kafka connect cluster up and running:

$ kubectl get kafkaconnect -n kafka
NAME              DESIRED REPLICAS   READY
connect-cluster   1                  True

$ kubectl -n kafka get pods -l strimzi.io/kind=KafkaConnect
NAME                                      READY   STATUS    RESTARTS   AGE
connect-cluster-connect-66cb786d6-l7t7j   1/1     Running   0          3m4s

But the connect without the connector does… nothing. We have to add the connector and configure it.

Creating Kafka Connector

Fortunately — we can create a connector as a kubernetes manifest:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "inventory-connector"
  namespace: kafka
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    database.hostname: mysql.mysql
    database.port: "3306"
    database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}"
    database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
    database.server.id: "184054"
    database.server.name: "dbserver1"
    database.whitelist: "inventory"
    database.history.kafka.bootstrap.servers: "kafka-cluster-kafka-bootstrap:9092"
    database.history.kafka.topic: "schema-changes.inventory"
    include.schema.changes: "true"
    key.converter: io.confluent.connect.protobuf.ProtobufConverter
    key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
    value.converter: io.confluent.connect.protobuf.ProtobufConverter
    value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081

Let’s analyse it. The most important part is the config map — it’s a connector configuration in yaml format. The content of this part depends on the plugin we use. In our case — the debezium mysql connector — it’s documented here. The interesting part are the credentials — it’s taken from the properties file mounted from the secret, so we don’t need to include credentials in the gitops repository which is always a very bad practice.

Please note the strimzi.io/cluster annotation — it tells the operator on which connect cluster this connector should run (yes, we can have multiple!)

Let’s commit this manifest to gitops repo and wait a while.

If everything went well we should see the new kafka topics:

$ bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap.kafka:9092 --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
_schemas
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
schema-changes.inventory

Let’s check the content of one of the topics using protobuf console consumer (please note — you will need a confluent kafka installed)

$ bin/kafka-protobuf-console-consumer  --bootstrap-server kafka-cluster-kafka-bootstrap.kafka:9092 --topic dbserver1.inventory.customers --from-beginning --property schema.registry.url=http://schema-registry-cp-schema-registry.kafka:8081
{"after":{"id":1001,"firstName":"Sally","lastName":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"1.7.0-SNAPSHOT","connector":"mysql","name":"dbserver1","tsMs":"1635579054543","snapshot":"true","db":"inventory","sequence":"","table":"customers","serverId":"0","gtid":"","file":"mysql-bin.000003","pos":"156","row":0,"thread":"0","query":""},"op":"r","tsMs":"1635579054544"}
{"after":{"id":1002,"firstName":"George","lastName":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"1.7.0-SNAPSHOT","connector":"mysql","name":"dbserver1","tsMs":"1635579054544","snapshot":"true","db":"inventory","sequence":"","table":"customers","serverId":"0","gtid":"","file":"mysql-bin.000003","pos":"156","row":0,"thread":"0","query":""},"op":"r","tsMs":"1635579054544"}
{"after":{"id":1003,"firstName":"Edward","lastName":"Walker","email":"ed@walker.com"},"source":{"version":"1.7.0-SNAPSHOT","connector":"mysql","name":"dbserver1","tsMs":"1635579054544","snapshot":"true","db":"inventory","sequence":"","table":"customers","serverId":"0","gtid":"","file":"mysql-bin.000003","pos":"156","row":0,"thread":"0","query":""},"op":"r","tsMs":"1635579054544"}
{"after":{"id":1004,"firstName":"Anne","lastName":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"1.7.0-SNAPSHOT","connector":"mysql","name":"dbserver1","tsMs":"1635579054544","snapshot":"true","db":"inventory","sequence":"","table":"customers","serverId":"0","gtid":"","file":"mysql-bin.000003","pos":"156","row":0,"thread":"0","query":""},"op":"r","tsMs":"1635579054544"}

Troubleshooting

If something goes wrong one can restart the connector task or the whole connector adding annotation to the connector manifest.

To restart the connector add:

strimzi.io/restart=true

To restart the task with id=0 add:

strimzi.io/restart-task=0

Thats all!

Using the Strimzi Kafka Operator we can not only run easily the Kafka Cluster but also the Kafka Connect Cluster. The whole connect cluster is managed using k8s manifests, so we can describe it as a code and store it in the gitops repository.
BTW — if you are interested in introducing kafka, kafka connect, gitops model, everything-as-a-code approach or any other technology mentioned in this article — please contact us! We’ll be more than happy to help your organization.

Appendix

Actually, you don’t have to build the connector image manually — the strimzi operator can do it for you! It’s described in the excellent strimzi documentation and I’ll leave it as a exercise for the reader :)

Blog Comments powered by Disqus.