Kafka queues: now and in the future
Kafka is the leading messaging & streaming platform and the de-facto standard for transporting large amounts of data. The primary abstraction in Kafka is a topic, divided into partitions. Because Kafka topics are partitioned, they have excellent scaling characteristics, limited only by the partition count.
Topics can be consumed in a publish-subscribe fashion, that is where every client receives every message that is published on the topic. Such broadcast scenarios are useful; however, most often, we'd like a set of clients to consume messages cooperatively so that every subscribed client gets a unique subset of messages. In other words, using JMS terminology, we want to use Kafka as a queue, not a topic.
And that is possible thanks to consumer groups: within a group, each consumer receives a set of partitions from which it can consume messages. The consumer group coordinator ensures that every partition is assigned to some client and that no two clients are assigned the same partition.
This approach has proven to work well over the years. However, it doesn't cover all use cases. That's why there's an ongoing KIP (Kafka Improvement Process) to introduce more queueing capabilities to Kafka, KIP-932: Queues for Kafka.
First, we'll take a look at the characteristics and limitations of consumer groups. Then, we'll examine the new features proposed by KIP-932. Finally, we'll discuss how some of the queueing features can be implemented in Kafka today, using the KMQ pattern.
If you don't know how consumer groups work, do not worry! Our Kafka Visualization tool might help you to get up to speed. On the other hand, if you know consumer groups inside out, you can safely skip the next section and proceed to share groups.
Consumer groups
Consumer groups are well-studied and documented Kafka features, so we'll only briefly overview its characteristics and limitations.
First, it's worth noting that there might be multiple consumer groups for any topic, consuming data independently. That's true of all of the approaches described in this article, and for simplicity, we'll always focus on how a single consumer/share group interacts with a single topic (which usually has many partitions).
Partition assignment
As mentioned above, each consumer that is a member of a consumer group receives a unique set of partitions, from which to consume data. Such a member has exclusive access to the assigned partitions.
Here, the first limitation comes to light: the number of partitions limits the number of members in a consumer group. It makes no sense to have more members than the partition count, as some members would receive no partitions from which to consume data. And while it's possible to adjust the number of partitions on an existing topic, it's a disruptive process which affects data partitioning. Hence it's best to plan upfront, though that is always hard.
When a new member joins a consumer group, group rebalancing occurs. The algorithm is, by default, designed so that each member is assigned roughly an equal number of partitions. However, such a rebalance is disruptive, even though in recent Kafka versions, it's done iteratively. If a partition previously assigned to member A should now be consumed by the new member B, we must first make sure that A stops consuming data from that partition, completes its processing, and only then B can resume. That process is called fencing.
Message processing
As for processing itself, each member fetches messages from the assigned topics in the order they appear on the partitions. Hence, consumption is partially ordered: if message M1 is on the same partition as message M2 and appears earlier in the partition's log, then M1 will be received by the member before M2. However, there are no guarantees regarding ordering of received messages across partitions.
When messages are processed, their offset can be committed to Kafka so that if the consumption process restarts (either due to a crash or due to a rebalance), it starts delivering messages that are yet unprocessed (not committed). Committing amounts to writing to a special __consumer_offsets
topic, which itself is partitioned across Kafka brokers.
However, in a consumer group, messages can only be committed up to an offset: that is, we can only say that all messages with an offset smaller than a given one should be considered committed. It's a simple and fast solution, but it has its limitations: messages can't be acknowledged individually, only in batches, up to an offset.
Of course, one could process messages one-by-one and commit each offset before processing the next message. However, this would have far-reaching negative performance implications. Hence, this approach is very rarely (if at all) used in combination with Kafka. It simply goes against the design of consumer groups and the offset commit mechanism.
Other characteristics
It's also not possible to request redelivery of a single message or mark a message as unprocessable. The pattern of consumption supported by consumer groups might also be problematic if the time it takes to process each message is not similar. This problem is known as head-of-line blocking. A single message, whose processing takes a lot longer than others, might block the processing of any subsequent messages from that partition: in a typical consumer flow, we first fetch a batch of messages, then process them (possibly concurrently), and finally commit their offsets, once all are processed.
If we follow the receive-process-commit offsets algorithm, then we get at-least-once delivery semantics: our process might crash in-between processing & committing the offsets. In such a case, the messages with the uncommitted offsets will be delivered and processed again, when the partition is reassigned.
We can also get at-most-one by flipping the steps in the flow above: receive-commit offsets-process.
When it comes to performance, consumer groups are hard to beat: messages are fetched in batches, and offsets are committed in batches as well, possibly in the background.
Consumer Groups summary
- in a consumer group, each member receives an exclusive set of partitions from which to consume data
- number of consumers, and by extension, parallelism of data processing is limited by the number of partitions
- messages received by consumers are partially ordered within each partition that is assigned
- messages can only be committed "all up to an offset", in batches
- there's no possibility of individual acknowledgment or message redelivery
- if the time to process messages is uneven, the head-of-line blocking problem might impact the latency of processing messages within a partition
- messages are processed using at-least-once or at-most-once semantics
- very low communication overhead: we only need to commit the offsets periodically
- very high performance
Queues for Kafka: share groups
To lift some of the limitations of consumer groups described above, KIP-932: Queues For Kafka proposes the addition of a share group abstraction to the Kafka system. Like consumer groups, share groups allow for cooperative consumption of messages from a Kafka topic. However, there are a number of important differences.
Partition assignment
As each member joins a share group, it is assigned to a number of share-partitions, from which it might fetch messages to process. Unlike with consumer groups, however, this assignment is not exclusive. That is, many members might be assigned the same partition from which they consume data. This lifts the limitation of the number of group members being bounded by the number of partitions. Still, it also introduces a much harder problem to solve: ensuring that each member receives distinct messages to process.
But before diving into that, let's look a bit closer at the process of joining a share group. When a new member joins, it sends a heartbeat to the group's coordinator. The coordinator is located on one of the brokers: a partition leader for the consumer offsets topic. The partition-assignment algorithm is executed on that broker, and as a reply to the heartbeat, the member receives the share-partitions to consume. The assignment might change, and other members will learn about the partitions that are added or revoked from them on their next heartbeats.
No fencing is required: multiple members might consume messages from a single partition. So, while it's possible that more members consume a partition for a short period than determined by the assigning algorithm, it doesn't matter. This simplifies the rebalancing process; there are no "stop the world" fencing events.
Message processing
That's where the role of the group coordinator ends. Next, the members contact each share-partition that's been assigned to them with a request to fetch messages. Each share-partition stores the state of messages that it contains; each message can be:
- available for consumption
- acquired, but not yet acknowledged (that is, being currently processed)
- acknowledged
- rejected
When a new fetch-messages request is received from a member, the share-partition consults its internal state, and determines the messages that should be delivered. The share-partition attempts to deliver messages in batches, if possible—so that any acknowledgments might also happen in batches (that is, by passing offset ranges, instead of individual offsets). This is, of course, important for performance.
From the messages that are available for consumption, the share-partition selects a batch and marks them as acquired. The messages are then returned to the member for processing. The messages remain in the acquired state for a specific period of time. If they are not marked as processed before that time runs out, they transit back to the available state. This is a similar model as e.g. in SQS or GCP's Pub/Sub, where you have to acknowledge the processing of messages within a specific time window, otherwise they will be delivered again.
Other characteristics
Each message (or range of messages) is associated with a delivery counter. That delivery counter is incremented whenever message acquisition times out or when a message is explicitly released by the member (which is a signal that the message should be redelivered). When the number of delivery attempts exceeds a given threshold, a message become rejected, and its delivery is not attempted again. There's no DLQ, at least yet, though it is mentioned as a possible addition in the future. Moreover, the delivery counter should only be treated as an approximation, as due to broker failures, it might not reflect the true number of delivery and processing attempts.
The implementation of share groups in Kafka bounds both the amount of time it might take to process a message (as described above), as well as the number of in-flight messages, that is messages are made available for delivery. This is controlled by share partition start and end offsets (SPSO and SPEO). In addition to the state of messages (acknowledged/ rejected/ delivery count), the SPSO is part of the state of a share partition.
As multiple consumers can receive messages from any partition and messages can be individually acknowledged, there are no ordering guarantees of any kind. When a fetch request arrives, any of the available messages can be returned to the consumer. There might be holes due to some messages being redelivered or already acknowledged; also, different consumers receive different batches of available messages.
Coordinators & persistence
The state of a share partition must be persisted so that it survives broker crashes. This is done on a dedicated topic and is controlled using a share coordinator. Just like the responsibility of group coordinators is distributed across the brokers, the responsibilities of a share coordinator are distributed as well. The share coordinator persists the message states, as well as group epochs in a dedicated, highly partitioned topic. The share coordinator is co-located with the leader of the partition of the __share_group_state
topic, on which data for a given share-partition is stored. Which broker that is can be determined using a Kafka protocol for finding coordinators (the same mechanism is used for the new implementation of consumer groups, group coordinators, and share coordinators). More information on that is available in KIP-848: Next Generation of Consumer Rebalance Protocol.
The state that's persisted in the __share_group_state
topic is carefully chosen and minimalistic. First of all, message acquisition is not persisted. Only when messages are acknowledged or rejected, this becomes part of the persisted state. Thanks to that, the inter-broker traffic is lower, and the amount of data to persist is also lower. However, when a broker crashes, some messages might be delivered again, even if the acquisition time window hasn't yet passed. The same might happen with consumer groups: messages become "acquired" when they get delivered to a consumer, and the offsets are only persisted when messages are acknowledged. While unlikely, two consumers might end up processing the same messages concurrently.
Share group assignments are also not persisted, only the epochs, which are used to protect from zombie group coordinators and for members to find out if the assignments they receive are up-to-date. These optimizations aim to reduce the data stored in the __share_group_state
topic.
Each share coordinator maintains an in-memory view of the state of share-partitions for which it stores data and periodically stores snapshots to the underlying storage. When snapshots for all share-partitions are persisted, any state updates before that can be deleted; this compaction is also part of the share coordinator's logic.
Share groups summary
Summing up, the Queues for Kafka KIP introduces a new concept of share groups, which allow messages to be consumed cooperatively by an arbitrary number of consumers. Three new stateful components run distributed across Kafka brokers: the group coordinator, the share coordinator, and share-partitions. Together, they maintain in-memory views of the state of each share-partition (which messages are available for delivery, which are acknowledged, etc.), as well as the state of member-to-partition assignment. This state is persisted on a new __share_group_state
topic and the existing __consumer_offsets
one.
We've discussed the head-of-line blocking problem for consumer groups. Share groups offer a possible solution to this, if more than one consumer consumes each partition. To achieve that, we should have many more consumers than partitions: by default, the broker-side partition assignor will assign the smallest possible number of partitions to each consumer. However, custom assignors might be provided so that, e.g., each partition is guaranteed to be consumed by multiple consumers. Then, if messages delivered within a batch are processed in parallel, not sequentially, head-of-line blocking might be solved.
Performance is still unknown (the design is not yet final—currently the feature is planned for inclusion in Kafka 4.0), or at least we're not aware of benchmarks. However we shouldn't expect any problems there: fetching messages is done in batches directly from brokers. Acknowledgement can (and should!) be done in batches as well. One detail that might influence latency is that acknowledging messages takes up to 3 network hops (consumer -> share-partition -> share coordinator -> replicas), as compared to 2 network hops in consumer groups (consumer -> consumer offsets -> replicas). However, provided that this is done in batches, this extra cost should be easily amortised.
Let's revisit the characteristics from the summary of consumer groups:
- in a share group, each member receives a non-exclusive set of partitions, from which to consume data
- parallelism of data processing is not limited by the number of partitions
- messages received are not ordered
- messages can be individually acknowledged, rejected or released for re-delivery
- behind the scenes, Kafka attempts to batch both message delivery and acknowledgment
- possibility to solve the head-of-line blocking problem
- messages can be processed using at-least-once, or at-most-once semantics
- low communication overhead: messages need to be acknowledged (possibly in batches), share-partition state needs to be persisted
- performance is unknown, but should be high
KMQ
KMQ is a pattern for implementing individual message acknowledgements, using the functionality provided by Kafka's consumer groups. Hence, it might be run using any Kafka version. You might notice some similarities in KMQ's and share groups' designs. However, KMQ is significantly simpler to implement and deploy, which also means it addresses only some of the shortcomings of consumer groups. KMQ is entirely a consumer-side mechanism, as opposed to significant logic being run as part of share group brokers.
Just as share groups rely on additional components (group and share coordinators, share-partitions) and additional topics to persist meta-data (__consumer_offsets
and __share_group_state
), KMQ uses an additional component: the redelivery tracker, and uses an additional topic for meta-data, the markers
topic.
Message processing
Receiving messages from a topic requires some extra steps on the consumer side. This is similar to using the dedicated KafkaShareConsumer
in share groups and is available as part of the published KMQ package. However, as far as we know, many people use KMQ as a pattern and simply copy the receive message logic to their code base, adjusting it as needed.
Consumers of a KMQ topic should form a consumer group. Hence, the parallelism of processing is bounded by the number of partitions of the topic that holds the messages, just as with consumer groups. Upon receiving a batch of messages to process from the assigned partitions, the consumer should write start
markers for each received message (it's similar to the acquired
state in shared groups). Then comes the crucial difference, compared to consumer groups: once the start markers are written, the consumer should commit the offsets of the consumed topic and only then process the messages. Once messages are processed, the consumer should write end
markers. That's equivalent to acknowledging the messages in share groups.
Message redelivery
An additional component, the redelivery tracker, is needed to redeliver messages properly. It should be deployed as any Kafka application, cooperatively consuming the markers
topic as part of a (different than the one above) consumer group. Just as the responsibilities of share coordinator are distributed across broker nodes in share groups, here the responsibilities of the redelivery tracker are distributed across consumer nodes. If a redelivery tracker node fails, the rebalancing of the consumer group assigns the partitions to other working redelivery tracker instances.
The logic of the redelivery tracker is to consume the start & end markers from the markers
topic, updating its internal state, which reflects the state of the message delivery. KMQ uses a similar timeout mechanism, redelivering messages if an end
marker is not written within the specified message timeout. Redelivering a message amounts to writing the message to the message's topic once again. The delivery counter in the message metadata is updated at that same time. If the delivery counter exceeds the configured threshold, the message is written to a DLQ instead.
The redelivery tracker commits offsets in the markers
topic, using the highest offset. All start
markers are paired with end
markers (which are also written in case a message times out). That way, the markers
topic can be configured with a retention period, e.g., twice that of the message timeout, giving the redelivery tracker enough time to redeliver any timed-out messages.
Lastly, as for performance, writing the start/end markers does impose an overhead; as with the other solutions, however, this can also be done in batches, amortising the cost so that it becomes quite low per-message. As with consumer groups, "acknowledging" messages, that is writing the end markers, takes two network hops (consumer -> markers topic -> replicas). Some time ago we benchmarked KMQ and consumer groups, achieving similar throughput results.
KMQ summary
- each consumer receives an exclusive set of partitions from which to consume data
- parallelism of data processing is limited by the number of partitions
- messages received are not ordered
- messages can be individually acknowledged; while not implemented, adding rejection and release functionalities would be trivial
- batching of writing markers is done by the usual Kafka write batching mechanisms
- head-of-line blocking is a potential problem, just as with consumer groups
- messages are processed in an at-least-once fashion
- medium communication overhead: for each message, a start and end marker needs to be written; offsets need to be periodically committed
- high performance
In comparison
We've examined in detail three approaches to cooperative consumption of messages from a Kafka topic by a number of client consumers. Let's summarise their features:
Consumer groups | Share groups | KMQ pattern | |
---|---|---|---|
Availability | Built-in to Kafka | Possibly coming up in future Kafka versions | Pattern to implement on top of any Kafka version |
Acknowledgments | Only all messages up to a certain offset | Individual message acknowledgments | Individual message acknowledgments |
Number of consumers (processing parallelism) | Bounded by the number of partitions | Unbounded | Bounded by the number of partitions |
Ordering | Messages ordered per-partition | Unordered | Unordered |
Head-of-line blocking | Possible | Can be avoided | Possible |
Delivery semantics | At-least-once / at-most-once | At-least-once / at-most-once | At-least-once |
Communication overhead | Very low | Low | Medium |
Performance | Very high | Probably high | High |
Given its simple foundations, Kafka continues to deliver new functionalities and meet the demands of various use cases. At its core, Kafka provides data durability by replicating topics across several brokers and scalability thanks to topics being divided into a number of partitions. While this design does have its limitations, they can often be avoided by proper application design.
Message queueing is one such example: very often, consumer groups are all that's needed. For the more demanding use cases, patterns such as KMQ might be the answer. To fully solve the queueing problem in Kafka, there's a proposal to enhance its functionality using share groups.
We hope the above overview was informative and will help you choose the right approach to message consumption in Kafka. SoftwareMill offers Kafka architecture consulting, development, deployment, and operation services in various environments (from bare metal through K8S to the cloud). If you would like to use Kafka, are using Kafka already, but would like to scale your deployment, or if you're wondering how to best architect the data flow in your system, it would be great to have you as our client—write us!