envelopmenuskypeburger-menulink-externalfacebooktwitterlinkedin2crossgithub-minilinkedin-minitwitter-miniarrow_rightarrow_leftphonegithubphone-receiverstack-overflow

Evaluating persistent, replicated message queues

Introduction

Message queues are useful in a number of situations; any time we want to execute a task asynchronously, we put the task on a queue and some executor (could be another thread/process/machine) eventually runs the task. Depending on the use case, the queues can give various guarantees on message persistence and delivery. For some use-cases, it is enough to have an in-memory, volatile message queue. For others, we want to be sure that once the message send completes, it is persistently enqueued and will be eventually delivered, despite node or system crashes.

In the following tests we will be looking at queueing systems at the 'safe' side of this spectrum which try to make sure that messages are not lost by:

  • persisting messages to disk
  • replicating messages across the network

Version history

4 May 2015 updated with new versions, added ActiveMQ; new site
1 Jul 2014 original at Adam Warski’s blog

Tested queues

There is a number of open-source messaging projects available, but only a handful support both persistence and replication. We’ll evaluate the performance and characteristics of 6 message queues:

Logos

While SQS isn’t an open-source messaging system, it matches the requirements and it can be interesting to compare self-hosted solutions with an as-a-service one.

MongoDB isn’t a queue of course, but a document-based NoSQL database, however using some of its mechanisms it is very easy to implement a message queue on top of it.

As we have only evaluated queues with JVM clients, EventStore isn’t included in the comparison, but we hope to change that in the future.

If you know of any other messaging systems, which provide durable, replicated queues, let us know!

Queue characteristics

On the sender side, we want to have a guarantee that if a message send call completes successfully, the message will be eventually processed. Of course, we will never get a 100% "guarantee", so we have to accept some scenarios in which messages will be lost, such as a catastrophic failure destroying all of our geographically distributed servers. Still, we want to minimise message loss. That’s why:

  • messages should survive a restart of the server, that is messages should be persisted to a durable store (hard disk). However, we accept loosing messages due to unflushed disk buffers (we do not require fsyncs to be done for each message).
  • messages should survive a permanent failure of a server, that is messages should be replicated to other servers. We accept loosing messages which have been accepted, but haven’t yet been replicated (asynchronous replication). Some systems offer synchronous replication (message send call completes only after data is replicated), which of course is better, and additionally protects from message loss due to hard disk buffers not being flushed. We’ll make it clear later which systems offer what kind of replication.

On the receiver side, we want to be able to receive a message and then acknowledge that the message was processed successfully. Note that receiving alone should not remove the message from the queue, as the receiver may crash at any time (including right after receiving, before processing). But that could also lead to messages being processed twice (e.g. if the receiver crashes after processing, before acknowledging); hence our queue should support at least once delivery.

With at-least-once-delivery, message processing should be idempotent, that is processing a message twice shouldn’t cause any problems. Once we assume that characteristic, a lot of things are simplified; message acknowledgments can be done asynchronously, as no harm is done if an ack is lost and the message re-delivered. We also don’t have to worry about distributed transactions. In fact, almost no system provides exactly-once delivery (even if it claims to - always read the fine print), it’s always a choice between at-most-once and at-least-once. However, the cost here is shifted to writing the message processing code appropriately.

Testing methodology

We’ll be looking at how fast (in terms of throughput, messages/second) we can send and receive messages using a single message queue cluster. Ideally, we want to have 3 identical, replicated nodes running the message queue server.

All sources for the tests are available on GitHub.

Each test run is parametrised by the type of the message queue tested, optional message queue parameters, number of client nodes, number of threads on each client node and message count. A client node is either sending or receiving messages; in the tests we used from 1 to 8 client nodes of each type (there’s always the same number of client and receiver nodes), each running from 1 to 25 threads.

Each Sender thread tries to send the given number of messages as fast as possible, in batches of random size between 1 and 10 messages. For some queues, we’ll also evaluate larger batches, up to 100 or 1000 messages. After sending all messages, the sender reports the number of messages sent per second.

The Receiver tries to receive messages (also in batches), and after receiving them, acknowledges their delivery (which should cause the message to be removed from the queue). When no messages are received for a minute, the receiver thread reports the number of messages received per second.

Test setup

The queues have to implement the Mq interface. The methods should have the following characteristics:

  • send should be synchronous, that is when it completes, we want to be sure (what sure means exactly may vary) that the messages are sent
  • receive should receive messages from the queue and block them; if the node crashes, the messages should be returned to the queue and re-delivered
  • ack should acknowledge delivery and processing of the messages. Acknowledgments can be asynchronous, that is we don’t have to be sure that the messages really got deleted.

Server setup

Both the clients, and the messaging servers used m3.large EC2 instances; each such instance has 2 virtual CPUs, 7.5GB of RAM and a 32GB SSD. All instances were started in a single availability zone (eu-west-1a). While for production deployments it is certainly better to have the replicas distributed across different locations (in EC2 terminology - different availability zones), as the aim of the test was to measure performance, a single availability zone was used to minimise the effects of network latency as much as possible.

The servers were provisioned automatically using Chef through Amazon OpsWorks; when possible, each server was running a single Docker container, which allowed e.g. to quickly deploy a new version of clients to multiple machines. For a detailed description of that approach, see here.

Mongo

Version server 3.0.1, java driver 2.13.0
Replication configurable, asynchronous & synchronous
Last tested 4 May 2015

Mongo has two main features which make it possible to easily implement a durable, replicated message queue on top of it: very simple replication setup (we’ll be using a 3-node replica set), and various document-level atomic operations, like find-and-modify. The implementation is just a handful of lines of code; take a look at MongoMq.

We are also able to control the guarantees which send gives us by using an appropriate write concern when writing new messages:

  • WriteConcern.ACKNOWLEDGED (previously SAFE) ensures that once a send completes, the messages have been written to disk (but the buffers may not be yet flushed, so it’s not a 100% guarantee); this corresponds to asynchronous replication
  • WriteConcern.REPLICA_ACKNOWLEDGED ensures that a message is written to the majority of the nodes in the cluster; this corresponds to synchronous replication

The main downside of the Mongo-based queue is that:

  • messages can’t be received in bulk – the find-and-modify operation only works on a single document at a time
  • when there’s a lot of connections trying to receive messages, the collection will encounter a lot of contention, and all operations are serialised.

And this shows in the results: sends are faster then receives. But overall the performance is quite good!

As Mongo recently got a major update (to version 3), it is also very interesting to compare the old storage engine (mmap) and new (wired tiger), which promised document-level locking, efficient disk space usage and improved performance. So we have two additional test variables: storage engine and synchronous/asynchronous replication.

A single-thread, single-node, asynchronous replication, mmap setup achieves 7 250 msgs/s sent and 1 600 msgs/s received. The maximum send throughput with multiple thread/nodes that I was able to achieve is about 10 900 msgs/s, while the maximum receive rate is 2 760 msgs/s.

Interestingly, when using wired tiger, the performance is worse, at least by a factor of 2! It seems that in that usage pattern (short-lived documents, high volume of find-and-modify operations) the new storage engine doesn’t work that well. Furthermore, receive performance decreases with the number of threads. The more concurrency, the lower overall throughput.

Results in detail when using asynchronous replication:

Engine Threads Nodes Send msgs/s Receive msgs/s
mmap 1 1 7 242 1 601
mmap 5 1 10 687 2 761
mmap 1 2 8 928 2 426
mmap 5 2 10 963 2 673
wired tiger 1 1 4 409 1 145
wired tiger 5 1 5 293 832
wired tiger 1 2 3 769 907
wired tiger 5 2 5 937 630

If we use synchronous replication (wait for the replica to acknowledge the writes, instead of just one node), the send throughput falls to 8 000 msgs/s, and the receive to about 2 800 msgs/s. As before, results when using wired tiger are worse than when using mmap:

Engine Threads Nodes Send msgs/s Receive msgs/s
mmap 1 1 1 952 1 630
mmap 25 1 8 006 2 819
mmap 1 2 2 649 2 463
mmap 5 2 7 191 2 618
wired tiger 1 1 1 531 1 239
wired tiger 5 1 2 866 884
wired tiger 25 1 3 777 422
wired tiger 1 2 2 039 933
wired tiger 5 2 3 004 530

Overall in my opinion, not bad for a very straightforward queue implementation on top of Mongo.

SQS

Version amazon java sdk 1.9.25
Replication ?
Last tested 4 May 2015

SQS, Simple Message Queue, is a message-queue-as-a-service offering from Amazon Web Services. It supports only a handful of messaging operations, far from the complexity of e.g. AMQP, but thanks to the easy to understand interfaces, and the as-a-service nature, it is very useful in a number of situations.

SQS provides at-least-once delivery. It also guarantees that if a send completes, the message is replicated to multiple nodes; quoting from the website:

"Amazon SQS runs within Amazon’s high-availability data centers, so queues will be available whenever applications need them. To prevent messages from being lost or becoming unavailable, all messages are stored redundantly across multiple servers and data centers."

We don’t really know how SQS is implemented, but it most probably spreads the load across many servers, so including it here is a bit of an unfair competition: the other systems use a single replicated cluster, while SQS can employ multiple replicated clusters and route/balance the messages between them. But since we have the results, let’s see how it compares.

A single thread on single node achieves 405 msgs/s sent and the same number of msgs received.

These results are not impressive, but SQS scales nicely both when increasing the number of threads, and the number of nodes. On a single node, with 50 threads, we can send up to 14 930 msgs/s, and receive up to 4 800 msgs/s.

On an 8-node cluster, these numbers go up to 109 460 msgs/s sent, and 41 000 msgs/s received!

SQS

RabbitMQ

Version 3.5.0-1, java amqp client 3.5.0
Replication synchronous
Last tested 4 May 2015

RabbitMQ is one of the leading open-source messaging systems. It is written in Erlang, implements AMQP and is a very popular choice when messaging is involved. It supports both message persistence and replication, with well documented behaviour in case of e.g. partitions.

We’ll be testing a 3-node Rabbit cluster. To be sure that sends complete successfully, we’ll be using publisher confirms, a Rabbit extension to AMQP, instead of transactions:

"Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional, publish the message, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced."

The confirmations are cluster-wide, so this gives us pretty strong guarantees: that messages will be both written to disk, and replicated to the cluster (see the docs):

"When will messages be confirmed? For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message."

Such strong guarantees are probably one of the reasons for mediocre performance. A single-thread, single-node gives us 1000 msgs/s sent&received:

Nodes Threads Send msgs/s Receive msgs/s
1 1 1 025 1 029
1 5 2 527 2 509
1 25 3 488 3 488

This scales nicely as we add threads/nodes, up to 3 600 msgs/s, which seems to be the maximum that Rabbit can achieve:

RabbitMQ

The RabbitMq implementation of the Mq interface is again pretty straightforward. We are using the mentioned publisher confirms, and setting the quality-of-service when receiving so that at most 100 messages are delivered unconfirmed.

Interestingly, sending the messages in larger batches doesn’t affect overall throughput, it stays at around 3 500 msgs/s, falling down to 2 900 msgs/swhen we batch up to 1000 messages.

Nodes Threads Send msgs/s Receive msgs/s Notes
1 25 3 488 3 488 max batch 10
2 25 3 663 3 628 max batch 10
4 25 3 551 3 528 max batch 10
4 5 3 610 3 587 max batch 100
4 5 2 897 2 695 max batch 1000

Another side-note: RabbitMQ has a great web-based console, available with almost no setup.

HornetQ

Version 2.4.0, java driver 2.4.5
Replication synchronous
Last tested 1 Jul 2014

HornetQ, written by JBoss and part of the JBossAS (implements JMS) is a strong contender. Since some time it supports over-the-network replication using live-backup pairs. I tried setting up a 3-node cluster, but it seems that data is replicated only to one node. Hence here we will be using a two-node cluster.

This raises a question on how partitions are handled; if a node dies, the fact is detected automatically, but then we can end up with two live servers (unless we have more nodes in the cluster), and that rises the question what happens with the data on both primaries when the connection is re-established. Overall, the replication support and documentation is worse than for Mongo and Rabbit.

Although it is not clearly stated in the documentation (see send guarantees), replication is synchronous, hence when the transaction commits, we can be sure that messages are written to journals on both nodes. That is similar to Rabbit, and corresponds to Mongo’s replica-safe write concern.

The HornetMq implementation uses the core Hornet API. For sends, we are using transactions, for receives we rely on the internal receive buffers and turn off blocking confirmations (making them asynchronous). Interestingly, we can only receive one message at a time before acknowledging it, otherwise we get exceptions on the server. But this doesn’t seem to impact performance.

Speaking of performance, it is very good! A single-node, single-thread setup achieves 1 100 msgs/s. With 25 threads, we are up to 12 800 msgs/s! And finally, with 25 threads and 4 nodes, we can achieve 17 000 msgs/s.

Nodes Threads Send msgs/s Receive msgs/s
1 1 1 108 1 106
1 5 4 333 4 318
1 25 12 791 12 802
2 1 2 095 2 029
2 5 7 855 7 759
2 25 14 200 13 761
4 1 3 768 3 627
4 5 11 572 10 708
4 25 17 402 17 160

One final note: when trying to send messages using 25 threads in bulks of up to 1000, I once got into a situation where the backup considered the primary dead even though it was working, and another time when the sending failed because the “address was blocked” (in other words, queue was full and couldn’t fit in memory), even though the receivers worked all the time. Maybe that’s due to GC? Or just the very high load?

ActiveMQ

Version 5.11.1
Replication configurable, asynchronous & synchronous
Last tested 4 May 2015

ActiveMQ is one of the most popular message brokers. In many cases it’s "the" messaging server when using JMS. However, it gained replication features only recently. Until version 5.9.0, it was possible to have a master-slave setup using a shared file system (e.g. SAN) or a shared database; these solutions require either specialised hardware, or are constrained by a relational database (which would have to be clustered separately).

However, it is now possible to use the Replicated LevelDB storage, which uses Zookeeper for cluster coordination (like Kafka).

Replication can be both synchronous and asynchronous; in fact, there’s a lot of flexibility. By setting the sync configuration option of the storage, we can control how many nodes have to receive the message and whether it should be written to disk or not before considering a request complete:

  • quorum_mem corresponds to synchronous replication, where a message has to be received by a majority of servers and stored in memory
  • quorum_disk is even stronger, requires the message to be written to disk
  • local_mem is asynchronous replication, where a message has to be stored in memory only. Even if disk buffers are flushed, this doesn’t guarantee message delivery in case of a server restart
  • local_disk is asynchronous replication where a message has to be written to disk on one server

In the tests we’ll be mainly using quorum_mem, with a cluster of 3 nodes. As we require a quorum of nodes, this setup should be partition tolerant, however there’s no documentation on how partitions are handled and how ActiveMQ behaves in such situations.

The ActiveMq implementation uses standard JMS calls using the OpenWire protocol to connect to the ActiveMQ server. For sends, we create a producer with delivery mode set to PERSISTENT, and for receives we create a consumer with CLIENT_ACKNOWLEDGE, as we manually acknowledge message delivery.

Performance-wise, ActiveMQ does better than RabbitMQ, achieving at most 3 900 msgs/s with synchronous replication, and 5 450 msgs/s with asynchronous replication. This seems to be the maximum and is achieved with 1 node and 25 threads:

Nodes Threads Send msgs/s Receive msgs/s
1 1 657 657
1 5 1 863 1 863
1 25 3 907 3 891

Adding more nodes doesn’t improve the results, in fact, they are slightly worse. Interestingly, using the stronger quorum_disk guarantee has no big effect on performance, the broker tops out at 3 600 msgs/s:

Nodes Threads Send msgs/s Receive msgs/s Notes
1 25 3 907 3 891 quorum_mem
2 25 3 482 3 383 quorum_mem
4 25 3 778 3 726 quorum_mem
4 5 3 688 3 648 quorum_disk
4 5 6 951 6 875 local_mem
4 5 5 455 5 424 local_disk

ActiveMQ

Kafka

Version 0.8.2.1
Replication configurable, asynchronous & synchronous
Last tested 4 May 2015

Kafka takes a different approach to messaging. The server itself is a streaming publish-subscribe system. Each Kafka topic can have multiple partitions; by using more partitions, the consumers of the messages (and the throughput) may be scaled and concurrency of processing increased.

On top of publish-subscribe with partitions, a point-to-point messaging system is built, by putting a significant amount of logic into the consumers (in the other messaging systems we’ve looked at, it was the server that contained most of the message-consumed-by-one-consumer logic; here it’s the consumer).

Each consumer in a consumer group reads messages from a number of dedicated partitions; hence it doesn’t make sense to have more consumer threads than partitions. Messages aren’t acknowledged on server (which is a very important design difference!), but instead message offsets processed by consumers are written to Zookeeper, either automatically in the background, or manually. This allows Kafka to achieve much better performance.

Such a design has a couple of consequences:

  • messages from each partition are processed in-order. A custom partition-routing strategy can be defined
  • all consumers should consume messages at the same speed. Messages from a slow consumer won’t be "taken over" by a fast consumer
  • messages are acknowledged “up to” an offset. That is messages can’t be selectively acknowledged.
  • no "advanced" messaging options are available, such as routing, delaying messages, re-delivering messages, etc.

You can read more about the design of the consumer in Kafka’s docs.

To achieve guaranteed sends and at-least-once delivery, I used the following configuration (see the KafkaMq class):

  • topic is created with a replication-factor of 3
  • for the sender, the request.required.acks option is set to 1 (asynchronous replication - a send request blocks until it is accepted by the partition leader) or -1 (synchronous replication; in conjunction with min.insync.replicas topic config set to 2 a send request blocks until it is accepted by at least 2 replicas - a quorum when we have 3 nodes in total)
  • consumer offsets are committed every 10 seconds manually; during that time, message receiving is blocked (a read-write locked is used to assure that). That way we can achieve at-least-once delivery (only committing when messages have been "observed").

Now, to the results. Kafka’s performance is great. With asynchronous replication, a single-node single-thread achieves about 2 550 msgs/s, and the best result was 30 000 msgs/s with 25 sending&receiving threads and 4 nodes:

Nodes Threads Send msgs/s Receive msgs/s
1 1 2 582 2 586
1 5 10 943 9 503
1 25 24 113 21 733
2 1 4 799 4 573
2 5 13 516 11 691
2 25 26 892 24 310
4 1 8 103 7 222
4 5 17 950 15 969
4 25 31 859 28 205

When using synchronous replication, the numbers fall down to 1 000 msgs/s and 11 000 msgs/s respectively. Which makes sense - with synchronous replication there are 3 network hops, instead of 1, so we have a 3x slow-down:

Nodes Threads Send msgs/s Receive msgs/s
1 1 1 179 1 079
1 5 3 470 3 427
1 25 9 321 9 113
2 1 1 793 1 768
2 5 4 706 4 571
2 25 10 645 10 267
4 1 2 905 2 851
4 5 6 520 6 327
4 25 11 308 10 951

Kafka has a big scalability potential, by adding nodes and increasing the number of partitions; however how it scales exactly is another topic, and would have to be tested.

Summary

As always, which message queue you choose depends on specific project requirements. All of the above solutions have some good sides:

  • SQS is a service, so especially if you are using the AWS cloud, it’s an easy choice: good performance and no setup required
  • if you are using Mongo, it is easy to build a replicated message queue on top of it, without the need to create and maintain a separate messaging cluster
  • if you want to have high persistence guarantees, RabbitMQ ensures replication across the cluster and on disk on message send. It’s also a very popular choice and used in many projects
  • ActiveMQ is a popular and widely used messaging broker with good performance, wide protocol support
  • HornetQ has great performance with a very rich messaging interface and routing options
  • Kafka offers the best performance and scalability

When looking only at the throughput, Kafka is a clear winner (unless we include SQS with multiple nodes, but as mentioned, that would be unfair):

Summary 1

It is also interesting to see how sending more messages in a batch improves the throughput. Rabbit achieves the same performance as before, HornetQ gains a 1.2x speedup and Kafka a 3x speedup, achieving about 90 000 msgs/s!

Summary 2

There are of course many other aspects besides performance, which should be taken into account when choosing a message queue, such as administration overhead, partition tolerance, feature set regarding routing, etc.