Contents

ZooKeeper-less Kafka

Krzysztof Atłasik

24 Jul 2023.9 minutes read

ZooKeeper-less Kafka webp image

For many years Apache Kafka and Apache ZooKeeper were an inseparable couple. Kafka’s role was to provide reliable message delivery behind pub/sub API, and ZooKeeper worked as a distributed store of cluster’s metadata. ZooKeeper stored information on which replica is the leader of which partition, how partitions are placed on brokers, consumer offsets, and many more. When Kafka was created the decision to utilize the ZooKeeper took a lot of burdens from Kafka developers - they could rely on battle-tested component to ensure the consistency of their messaging system.

But this solution also has drawbacks. It leads to operation difficulties: two distinct infrastructure elements to tune, secure and monitor. It also gave Kafka a bad reputation for being hard to maintain and infrastructure-heavy. And since Kafka is a distributed system as well, it was a very tempting idea to gradually migrate all ZooKeeper’s duties to Kafka itself.

In this article, I will get through how the new quorum-based Kafka setup works and how it’s different from the previous one, but let’s start with the story of how everything started.

A glimpse of history

The journey started officially around 2019 with the creation of KIP-500 (KIP stands for Kafka Improvement Proposal). KIP-500 worked as an umbrella task for several follow-on KIPs, all aimed at creating a new internal replacement for Zookeeper. In 2021, release 2.8 introduced a beta version of the new quorum controller mode using a consensus mechanism KRaft. With version 3.0 it was released as a preview. In 2022 Kafka 3.3 marked KRaft as production ready (but only for new clusters). The recent release of 3.5 provided early access to the feature to migrate (with no downtime) clusters to the new ZK-less configuration. It also marked Zookeeper support as deprecated.

Foundations for new, easier, and better Kafka are already there, however, the quest is still not over. Only with Kafka 3.6, planned for the second half of 2023, the migration to KRaft will leave early access status. The final version that will abandon ZK completely is Kafka 4.0 scheduled for 2024.
timeline

Migration from ZK to KRaft is still in early access in 3.5. Hence, you shouldn't use it in production. KRaft is for now ready only if you start from scratch. You can find more details in KIP-866 / Migration overview or in this tutorial.

The architecture of Kafka with Zookeeper

The Kafka cluster contains multiple nodes called brokers. The minimal number of brokers is just one, but that setup is only viable for testing and development purposes since it offers no redundancy. Normally, production clusters have at least a few brokers. Nevertheless, some clusters reach immense sizes - a testament to Kafka’s scalability.

Brokers communicate with each other using binary TCP-based protocol. They also exchange messages with producer and consumer applications that are writing and reading messages from topics.

ZooKeeper

Finally, brokers are connected to a cluster of ZooKeeper instances - a so‑called ensemble.

Zookeeper stores data in data registers called znodes. ZNodes are arranged in a filesystem-like structure, the name of the znode resembles a file path. The broker config property zk.connect tells the broker which ensemble to connect and on what path. A single ZK ensemble can support multiple Kafka clusters, they just have to reference different paths. When the broker joins the cluster, it creates a distinct znode determined by broker.id property. The address and path in zk.connect have to be the same for all brokers in the cluster and the broker.id has to be unique.

Znodes created by joining brokers have a special type: ephemeral. An ephemeral node is automatically deleted from the registry when the connection between the ZK and broker ceases. It can happen when the broker gracefully leaves the cluster, but ZK will also terminate the session when the Kafka instance dies or is unresponsive (for example because of an extended GC pause). To prove they are alive, brokers need to send heartbeat signals to ZK. If ZK doesn’t detect signs of life from the broker in time configured by zookeeper.session.timeout.ms it assumes the node has failed. ZK emits the notification whenever an ephemeral node is created or deleted. Other Kafka components subscribe to those notifications to have a current view of the state of the cluster.

Zookeeper is a source of truth for cluster configuration and metadata. It stores information about topics, brokers, and ACLs. Zookeeper is also responsible for designating a controller broker (more about this in the next section).

Historically ZK performed even more tasks. Console-based applications (like console-producer.sh or console-consumer.sh) used ZK as a bootstrapping point to the cluster (now we just need to pass an address of the broker via --bootstrap-server param).

Moreover, in early versions of Kafka (pre 0.9) ZK stored information about committed offsets. Currently, that information is stored in internal topic __consumer_offsets.

Controller

One of the brokers in the cluster wields a special role - it is designated as a controller. It performs many administrative tasks, like initiating partition leader elections or partition reassignments between brokers. It also manages cluster metadata. It keeps track of information about topics, partitions, replicas, and their assignments to brokers. It also makes sure that updates of the metadata are persisted in ZK and that they are pushed to other brokers.

There must be only one controller present in the cluster at the time. To ensure that, Kafka utilizes properties of ZK. When brokers join the cluster they race to create an ephemeral znode with a subpath /controller. Since only one znode with a certain path can exist, only a single broker succeeds, and it becomes a new controller. Whenever the controller exits the cluster, ZK notifies all brokers that they must again compete to elect the new controller.
schema 1
Sometimes the broker might have its controller status rescinded (for instance, when it failed to respond with heartbeats because of a long GC pause) but be not aware that it has happened. This way it becomes a so‑called zombie controller. After a new controller is elected, it could lead to a situation where we’ve got two controllers: a regular one and a zombie. To prevent this, every newly elected controller gets a sequential number increased by one - a controller epoch. Brokers can therefore just ignore controller requests sent with a lower epoch.

Before the new controller can start carrying out its duties, it needs to fetch the newest state of metadata from ZooKeeper and push it further to other brokers. It’s a quite fast process for small clusters. Nevertheless, for heftier systems, the fetching can slow down even to a few seconds. This is a giant drawback of the Kafka-ZK setup - the metadata is not stored locally in brokers but needs to be taken from ZK registers. Even a few seconds of delay can create huge problems in large, busy clusters.

The controller election process had gone through many performance refinements over the years, like the introduction of asynchronous transmission of data from ZK to the broker. Yet, it remains an obvious bottleneck, and it doesn’t scale well with a growing number of partitions. Kafka needed a newer and finer approach for managing the cluster’s metadata.

Kafka with KRaft

The core concept of the new quorum controller setup is the fact that Kafka is itself log-based. The changes in the metadata can be presented as messages stored in the log, which can be then streamed to subscribers.

In KRaft mode, we can designate multiple Kafka instances as controllers. The single node can be either working solely as a broker or controller or performing both roles at once (very handy in smaller clusters). This is different from the legacy setup, where we had only one controller. Still, even though we can have multiple controllers, only one is active at the particular moment, and all others work on standby. If the active controller fails, one will take over its tasks.

Only the active controller can make changes to the cluster’s metadata. It persists the updates in a special internal topic (with just one partition) called __cluster_metadata. Messages from that topic are then replicated by all other controllers. This way all of them have almost the newest version of the data in their local replicas. This is a big deal - a new controller no longer has to fetch all the data from Zookeeper. It has all the data in its local log, maybe it just needs to catch up on a few missed messages.
schema 2

Replication

The process of metadata replication is quite similar to data replication of regular Kafka topics. Replicas send the request to the sole partition leader (active controller) with the last offset they have seen and the leader responds with the newest messages. In a happy path scenario, they quickly catch up to the leader.

It has some key differences though. First of all, there’s no concept of in-sync replicas (ISR) that could take over when the leader is unavailable. The election of a new leader (and active controller at the same time) is always quorum based. As you could probably guess from the mode name (KRaft) the algorithm used for selecting the leader is Raft.

Leader election

The active controller needs to regularly send heartbeats to other controllers. If it fails to do so, they conclude it has crashed and start a new election. Every controller assumes itself as the candidate and gets the last leader epoch it knows. It increases it by one and sends it along with requests to other controllers to vote for it. When the controller receives a request for a vote, it checks if it hasn’t seen requests with a later epoch or has already responded to a request with the same epoch. If not, it acknowledges the request. Before asking for votes, controllers wait a random amount of time. This is a countermeasure to prevent all nodes from sending requests at the same time. The controller that gets the quorum of the votes becomes the new leader.

Offset committing

The offset committing mechanism for metadata is also different than in regular topics. Kafka needs to enforce fsync upon appending the new message to the metadata topic. This ensures messages are stored durably on the log, which is required to guarantee the Raft algorithm's correctness. Every record is tagged with the leader epoch number, which can be used to reconcile logs after the election of a new active controller.

How important is KRaft?

KRaft is a giant leap forward for Kafka. It improves the performance of crucial actions, like the recovery time in case of controller failure. It also addresses other limitations of Kafka. With ZK-based metadata management, it takes O(n) to create or delete the topic (where n is a number of already existing topics), because the controller must load the full list of all topic names from ZooKeeper. With KRaft, those actions take O(1).

Without ZooKeeper the setup of Kafka is much smoother. We no longer need to care about ZK performance and fine-tuning. An often overlooked advantage of KRaft is improved developer experience. When we wanted to start a small local cluster of a single Kafka node for testing, we still needed to bring up the Zookeeper. Thanks to KRaft, it’s no longer the case.

Look into the future

Getting rid of ZooKeeper dependency is one of the most awaited features in Kafka’s history. Version 4.0, scheduled for 2024, will drop the support for Zookeeper, and at this point, we’ll be able to say that the epic journey is finally over.

Reviewed by Paweł Stawicki

Blog Comments powered by Disqus.