Using Kafka as a message queue
Apache Kafka is a very popular publish/subscribe system, which can be used to reliably process a stream of data. The central concept in Kafka is a topic, which can be replicated across a cluster providing safe data storage. By committing processed message offsets back to Kafka, it is relatively straightforward to implement guaranteed “at-least-once” processing. However, there is one important limitation: you can only commit - or, in other words, acknowledge the processing of - all messages up to a given offset. It is not possible to acknowledge individual messages.
That’s because of the streaming nature of a topic: you can only signal Kafka that all messages up to a given point have been processed, so that in case of failure stream processing restarts from the last committed point. That works great for many scenarios, but every now and then there are situations when you’d like to have selective message acknowledgment, and more important, redelivery of individual unacknowledged messages.
In this post, I’d like to outline a possible implementation of a Kafka-based Message Queue. A proof-of-concept-quality code is available on GitHub, with both embedded and stand-alone examples which should make experimentation quite straightforward.
But why not ...?
Before diving into technical details, if Apache Kafka is a publish-subscribe system, why would we want to implement a message queue on top of it, instead of using a "proper mq", such as RabbitMQ or ActiveMq?
First of all, Kafka has reliable and proven clustering and data replication mechanisms; if you want to make sure your data is safe, and that each message is processed at-least-once - even in case of losing a number of servers - Kafka should be a good candidate as a base for your message processing system. Other systems don’t put so much emphasis on clustering and replication - e.g. in RabbitMQ’s documentation, it is explicitly stated that the system doesn’t cope well with network partitions.
Secondly, although I didn’t do any benchmarks yet, and while the message redelivery tracking does add a significant amount of overhead, the queue still should be pretty fast, taking advantage of the very good performance of the underlying Apache Kafka topics. But, that’s still to be verified.
Finally, convenience - if you already have a working Kafka cluster, and you just need a simple message queue (without the advanced routing capabilities, for example), it might not make sense to set up yet another cluster, and maintain it later. Each additional component adds up to the overall complexity of your system.
General architecture
If you are familiar with how Amazon SQS works, the message queue described below follows a similar philosophy.
The goal is to create an acknowledgment mechanism for individual messages. If a message is not acknowledged for a given period of time, it will be redelivered.
In the implementation we’ll need two topics:
- the
queue
topic, which will hold the messages to process - the
markers
topic, to whichstart
andend
markers will be written for each message (and which will be used to track which messages should be redelivered)
The messages should be processed by a number of queue-client components, according to the flow described below (without any batching for now). As usual in Apache Kafka, we start by creating a regular consumer (probably providing a consumer group id to leverage automatic partition assignment), and then start reading messages from the latest committed offset:
- First, we read a message to process from the
queue
topic - We send a
start
marker containing the offset of the message to themarkers
topic and wait until the send is acknowledged by Kafka - Now we can commit the offset of the message read from
queue
back to Kafka - When the marker is sent and offset committed, the message can be processed
- When (and if) the processing ends successfully, we send an
end
marker to themarkers
topic, which again contains the offset of the message. No need to wait for send acknowledgment.
We will also need to start a number of RedeliveryTracker
components, which will consume the markers
topic and redeliver messages when appropriate. The redelivery trackers should be started in multiple copies across the cluster for fail-over (similar to the queue client components). They will use the standard Kafka auto-partition-assignment mechanism, so just starting a number of copies is all that needs to be done, with no additional clustering work.
The RedeliveryTracker
is an Apache Kafka application that reads data from the markers
queue. It maintains a set of messages which haven’t yet been processed. An unprocessed message is one for which a start
marker has been received, but there was no end
marker yet. Moreover, there’s an in-memory priority queue of message offsets, sorted by timestamps. That priority queue is checked regularly if there are any messages to re-deliver.
If the RedeliveryTacker
detects that a message should be re-delivered, it does a seek
in the source topic (note that this might be an expensive operation, in terms of I/O) and sends the same message again to the queue
topic. Hence this will only perform well if the number of messages to redeliver isn’t very large, but more on this later as well.
Does this guarantee at least once processing?
The 5-step message queue client process can fail in a number of places. But notice that if it fails at any time before step 3 (committing offset back to Kafka), the message will be re-processed after a restart using standard Apache Kafka mechanisms, as we start reading messages from the latest committed offset. It is possible that the message will be also re-delivered by the RedeliveryTracker
, but it will be processed for sure.
If there’s a failure after step 3., we are certain that the start
marker has been sent to the markers
topic. When the start
marker has been sent, as long as there’s at least one copy of a RedeliveryTracker
running, if there’s no end
marker, at some point, the message will become the oldest in the queue and will be redelivered.
Overhead
Obviously, the flow described above adds overhead to message processing. For each message from queue
, we need to send and process two additional ones (the markers).
However, the above describes the current implementation; there are a couple of places where optimizations can be explored.
One optimization that is in place, is that the markers
topic should have the same number of partitions as the queue
topic. Markers for messages coming from a given queue
partition are sent to the same markers
partition. Each tracker of the markers
topic can then create a consumer for each assigned partition (which is used for performing seeks
), and hence redelivers messages from a single queue
partition.
Update 22/06/2017: some of the above-described optimizations are now in place; performance benchmarks can be found in this blog.
Error flooding
If there’s an error during message processing, this often isn’t a one-time event, but a larger problem (could be caused, for example, by a failure of a downstream external component), causing many subsequent messages not to be processed successfully. This can cause a flood of errors, causing a flood of message redeliveries, which will fail to be processed again (assuming the problem isn’t fixed), which will cause more redeliveries, etc., leading to an ever-increasing message load and the whole system eventually halting to a stop.
To prevent such situations, the message processing pipeline should contain some form of back-pressure. When errors occur, instead of processing messages at a normal rate, it should be decreased, or even almost stopped, until a “test” message verifies that the problem is fixed. You could implement this e.g. using a circuit breaker component, by using reactive streams, or other design options. This is not currently handled by the Kafka-based MQ client and would have to be added by hand.
Note that this by no means is a problem of the Kafka-based message queue implementation alone, but a more general remark on how message processing pipelines should work.
Running the example code
You can try out how this works in practice using the code from the kmq
repository, available on GitHub (more about kmq here). The main part of the implementation resides in the core
module, which contains the following classes:
KmqClient
- provides an iterator-style interface for reading messages from aqueue
topic and sending the start markers. After messages are processed, each should be acknowledged using theprocessed()
method. In other words, handles the 5-step message processing flow as described aboveRedeliveryTracker
- sets up a Kafka+Akka application and a daemon service which tracks which messages should be redelivered, and performs the redelivery when needed. Uses theRedeliveryProcessor
andRedeliveryExecutor
classes.MarkerKey
,MarkerValue
- classes representing start & end markers, which are sent to themarker
topicKmqConfig
- configuration class containing the names of thequeue
/marker
topics etc.
The example-java
module contains two variants of a message processing setup, one using an embedded Kafka instance, and one using a stand-alone Kafka instance running in the background. You can either run the single EmbeddedExample
class or the three separate standalone classes for experimentation:
StandaloneSender
sends 100 test messages to thequeue
topicStandaloneProcessor
uses theKmqClient
to process the messages, dropping (failing to process) at random 1 out of 10 messages.StandaloneRedeliveryTracker
starts theRedeliveryTracker
and performs the redeliveries.
There’s also a standalone Scala example in example-scala
, which contains a reimplementation of KmqClient
using reactive-kafka
and Akka streams: StandaloneReactiveClient
. Note that in this solution you automatically get back-pressure support.
Summing up
Please keep in mind that this is PoC-quality code. But even more so, your feedback would be appreciated. Do you think such a solution would be useful for you? Are you maybe using something similar in your projects? Or are you taking another approach for implementing reliable, clustered message queues?
Updates
- 2017-03-06: Updating to reflect recent changes, Kafka-streams and persistent stores are no longer used, replaced by an Akka application