Apache Kafka is a very popular publish/subscribe system, which can be used to reliably process a stream of data. The central concept in Kafka is a topic, which can be replicated across a cluster providing safe data storage. By committing processed message offsets back to Kafka, it is relatively straightforward to implement guaranteed “at-least-once” processing. However, there is one important limitation: you can only commit - or, in other words, acknowledge processing of - all messages up to a given offset. It is not possible to acknowledge individual messages.
That’s because of the streaming nature of a topic: you can only signal Kafka that all messages up to a given point have been processed, so that in case of failure stream processing restarts from the last committed point. That works great for many scenarios, but every now and then there situations when you’d like to have selective message acknowledgment, and more important, redelivery of individual unacknowledged messages.
In this post I’d like to outline a possible implementation of a Kafka-based Message Queue. A proof-of-concept-quality code is available on GitHub, with both embedded and stand-alone examples which should make experimentation quite straightforward.
But why not ...?
Before diving into technical details, if Kafka is a publish-subscribe system, why would we want to implement a message queue on top of it, instead of using a "proper mq", such as RabbitMQ or ActiveMq?
First of all, Kafka has reliable and proven clustering and data replication mechanisms; if you want to make sure your data is safe, and that each message is processed at-least-once - even in case of loosing a number of servers - Kafka should be a good candidate as a base for your message processing system. Other systems don’t put so much emphasis on clustering and replication - e.g. in RabbitMQ’s documentation it is explicitly stated that the system doesn’t cope well with network partitions.
Secondly, although I didn’t do any benchmarks yet, and while the message redelivery tracking does add a significant amount of overhead, the queue still should be pretty fast, taking advantage of the very good performance of the underlying Kafka topics. But, that’s still to be verified.
Finally, convenience - if you already have a working Kafka cluster, and you just need a simple message queue (without the advanced routing capabilities, for example), it might not make sense to setup yet another cluster, and maintain it later. Each additional component adds up to the overall complexity of your system.
If you are familiar with how Amazon SQS works, the message queue described below follows a similar philosophy.
The goal is to create an acknowledgment mechanism for individual messages. If a message is not acknowledged for a given period of time, it will be redelivered.
In the implementation we’ll need two topics:
queuetopic, which will hold the messages to process
markerstopic, to which
endmarkers will be written for each message (and which will be used to track which messages should be redelivered)
The messages should be processed by a number of queue-client components, according to the flow described below (without any batching for now). As usual in Kafka, we start by creating a regular consumer (probably providing a consumer group id to leverage automatic partition assignment), and then start reading messages from the latest committed offset:
- First, we read a message to process from the
- We send a
startmarker containing the offset of the message to the
markerstopic and wait until the send is acknowledged by Kafka
- Now we can commit the offset of the message read from
queueback to Kafka
- When the marker is sent and offset committed, the message can be processed
- When (and if) the processing ends successfully, we send an
endmarker to the
markerstopic, which again contains the offset of the message. No need to wait for send acknowledgment.
We will also need to start a number of
RedeliveryTracker components, which will consume the
markers topic and redeliver messages when appropriate. The redelivery trackers should be started in multiple copies across the cluster for fail-over (similar to the queue client components). They will use the standard Kafka auto-partition-assignment mechanism, so just starting a number of copies is all that needs to be done, no additional clustering work.
RedeliveryTracker is a Kafka application which reads data from the
markers queue. It maintains a set of messages which haven’t yet been processed. An unprocessed message is one for which a
start marker has been received, but there was no
end marker yet. Moreover, there’s an in-memory priority queue of message offsets, sorted by timestamps. That priority queue is checked regularly if there are any messages to re-deliver.
RedeliveryTacker detects that a message should be re-delivered, it does a
seek in the source topic (note that this might be an expensive operation, in terms of I/O) and sends the same message again to the
queue topic. Hence this will only perform well if the number of messages to redeliver isn’t very large; but more on this later as well.
Does this guarantee at-least-once processing?
The 5-step message queue client process can fail in a number of places. But notice that if it fails at any time before step 3 (committing offset back to Kafka), the message will be re-processed after a restart using standard Kafka mechanisms, as we start reading messages from the latest committed offset. It is possible that the message will be also re-delivered by the
RedeliveryTracker, but it will be processed for sure.
If there’s a failure after step 3., we are certain that the
start marker has been sent to the
markers topic. When the
start marker has been sent, as long as there’s at least one copy of a
RedeliveryTracker running, if there’s no
end marker, at some point the message will become the oldest in the queue and will be redelivered.
Obviously, the flow described above adds overhead to message processing. For each message from
queue, we need to send and process two additional ones (the markers).
However, the above describes the current implementation; there are a couple places where optimizations can be explored.
One optimization that is in place, is that the
markers topic should have the same number of partitions as the
queue topic. Markers for messages coming from a given
queue partition are sent to the same
markers partition. Each tracker of the
markers topic can then create a consumer for each assigned partition (which is used for performing
seeks), and hence redelivers messages from a single
Update 22/06/2017: some of the above described optimizations are now in place; performance benchmarks can be found in this blog.
If there’s an error during message processing, this often isn’t a one-time event, but a larger problem (could be caused, for example, by a failure of a downstream external component), causing many subsequent messages not to be processed successfully. This can cause a flood of errors, causing a flood of message redeliveries, which will fail to be processed again (assuming the problem isn’t fixed), which will cause more redeliveries, etc., leading to an ever-increasing message load and the whole system eventually halting to a stop.
To prevent such situations, the message processing pipeline should contain some form of back-pressure. When errors occur, instead of processing messages at a normal rate, it should be decreased, or even almost stopped, until a “test” message verifies that the problem is fixed. You could implement this e.g. using a circuit breaker component, by using reactive streams, or other design options. This is not currently handled by the Kafka-based MQ client, and would have to be added by hand.
Note that this by no means is a problem of the Kafka-based message queue implementation alone, but a more general remark on how message processing pipelines should work.
Running the example code
You can try out how this works in practice using the code from the
kmq repository, available on GitHub. The main part of the implementation resides in the
core module, which contains the following classes:
KmqClient- provides an iterator-style interface for reading messages from a
queuetopic and sending the start markers. After messages are processed, each should be acknowledged using the
processed()method. In other words, handles the 5-step message processing flow as described above
RedeliveryTracker- sets up a Kafka+Akka application and a daemon service which tracks which messages should be redelivered, and performs the redelivery when needed. Uses the
MarkerValue- classes representing start & end markers, which are sent to the
KmqConfig- configuration class containing the names of the
example-java module contains two variants of a message processing setup, one using an embedded Kafka instance, and one using a stand-alone Kafka instance running in the background. You can either run the single
EmbeddedExample class, or the three separate standalone classes for experimentation:
StandaloneSendersends 100 test messages to the
KmqClientto process the messages, dropping (failing to process) at random 1 out of 10 messages.
RedeliveryTrackerand performs the redeliveries.
There’s also a standalone Scala example in
example-scala, which contains a reimplementation of
reactive-kafka and Akka streams:
StandaloneReactiveClient. Note that in this solution you automatically get back-pressure support.
Please keep in mind that this is PoC-quality code. But even more so, your feedback would be appreciated. Do you think such a solution would be useful for you? Are you maybe using something similar in your projects? Or are you taking another approach for implementing reliable, clustered message queues?
- 2017-03-06: Updating to reflect recent changes, Kafka-streams and persistent stores are no longer used, replaced by an Akka application