What problems does Kafka solve in distributed systems?
Usually, the story of a successful startup begins with a centralized, monolithic application.
At first, the performance of the service is terrific. Every user’s request can be handled with low and steady latency. Resource load is acceptable and the application can run even on commodity hardware.
Some time passes and the company thrives. With the constant influx of new users, the number of concurrent requests to the service increases dramatically. The application becomes more and more sloppy. The resource utilization goes through the roof and response times are completely unpredictable, especially when the service is under high stress.
The obvious solution for the problem is scaling up the machine that’s hosting the application. Adding more RAM and using a more capable CPU can buy some time for the service, but it’s not possible to scale vertically without any limits. Very soon, the application becomes unresponsive even on expensive, high-end hardware.
Another means of scaling is possible by setting up additional instances of the application.
Truly stateless services can be scaled this way indefinitely. Without a shared state, each application’s process can take some part of input data (like user requests) and then calculate results in parallel.
In real-life, such services are quite rare and there’s usually some shared state. Sometimes the state is kept only in memory. In other instances, it needs to be saved in a durable store. Nevertheless, all application instances need to cooperate to not corrupt the state during updates. They form a distributed cluster of independent computing nodes.
Centralized vs distributed applications
Developing centralized applications offers many advantages. There’s only a single running instance that contains the whole state. Ensuring the order of updates and overall data consistency is much easier if all state changes go through a single entry point.
There are some flaws as well. To name a few, if there’s only a single component that can alter the data, it becomes the single-point-of-failure. Additionally, when multiple concurrent agents contend to perform updates, they need to be coordinated by some kind of concurrency control mechanism. Whether it is pessimistic or optimistic locking, it can result in poor overall performance.
Frequently appearing web architecture utilizes the relational database as that single common component responsible for managing the state. This way, all application instances can be stateless and easily scaled horizontally. Because all data transformations run as part of ACID transactions, no data is corrupted by simultaneous modifications. This kind of solution can have all the limitations I mentioned above, like limited scalability or insufficient reliability.
There are many ways for increasing the performance and availability of the database. A secondary read-only replica can improve the performance of reads and take over the place of a primary node in case of its failure. The data model can also be sharded. That means that tables are split into shards, each holding only a fraction of the data. Each shard then can be stored on a different node and replicated to another node(s) if necessary.
I’d like to mention other database systems designed to be distributed out-of-the-box like Apache Cassandra or CockroachDB. But considerations about databases are not the point of this blog post. You can read more on choosing the right DB for your project in this article:
What I’d like to focus on is another infrastructure component helpful in building complex distributed systems: Apache Kafka. According to the official site, Kafka is:
an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Initially, it was an internal project in Linkedin, but since 2012, it’s been a project of the Apache foundation. Later employees involved in creating Kafka in Linkedin founded Confluent, a company focused around the open-source Apache Kafka.
Kafka architecture overview
The fundamental unit of data in Kafka is a message. It’s represented by a pair of attributes: key and value. Value contains a payload (actual data) of the message. Messages are stored in topics, which are in essence write-ahead and append-only logs.
Messages are passed to the topic by producers. Producer is just an application that’s using Producer API to create and send messages to the cluster. Once a message is processed and appended to the topic, it can’t be modified. The message lives in the topics until an elapsed time goes by (by default, it’s seven days). After that, it can be garbage collected by the process called topic compaction.
Kafka operates on multiple nodes called brokers. Each broker can hold multiple partitions of various topics. Partitions are just parts of the topic that store some portion of messages. Kafka producers use a hashing function (default or provided by the user) on a key of the message to determine the target partition for storing that message. The hashing function should be consistent, which means it should yield the same value for equal keys. This key might represent the identity of some entity, like a bank account or employee. Messages can be understood as changes in the state of this entity. For some use cases, it’s important to ensure that all messages with the same key end up in the same partition. Sometimes it doesn’t matter. The key can be then null and Kafka will send the message to the partitions using a round-robin mode.
Subsequent messages stored in the partition receive a monotonically increasing number: offset, which uniquely identifies the message position within that partition. A pair of values: partition name and offset can serve as a global identifier of the message in the topic.
Additionally, the ordering of messages is guaranteed only in the scope of a single partition. That means that messages will be stored according to the order they were produced, but only within the partition. This is an important feature, very helpful in case it’s necessary to rebuild the current state of the entity from events.
The single partition can be replicated and reside on many brokers. Duplicating partitions across nodes is necessary for ensuring data durability and high availability of the cluster.
Finally, messages stored in topics can be read by consumers, applications using Consumer API. Each consumer subscribes to one or more topics and then periodically polls brokers for new messages.
Consumers are organized into consumer groups. Every consumer in the group is subscribed to an exclusive subset of partitions of the topic. If there’s a single consumer in the group, it will get all the partitions. In case there are two consumers, partitions will be split in half and so on. By dividing partition assignments, Kafka can parallelize the process of reading data by consuming applications.
There’s a catch. Kafka can only assign a single partition to at most one consumer (but one consumer can get many partitions). So in case there are more consumers in the group than partitions, some of them will stay idle.
Adding partitions to existing topics is possible but can be burdensome. For that reason, it’s a good practice to take into account potential future upscaling of consumers or brokers and overprovision of partitions while creating a new topic.
All low-level communication broker-broker, consumer-broker, and producer-broker takes place through a TCP network protocol.
Scalability and resiliency with Kafka
Distributed systems can’t truly scale without the parallelization of tasks. If there's a single component that is responsible for coordinating all operations, sooner or later, it will become a bottleneck. How does Kafka get around that problem?
In Kafka, the unit of parallelism is partition. Partitioning is conceptually similar to database sharding. Each partition holds some of the topic data independent of another. Partitions are spread across brokers. If the topic has more partitions than the number of brokers in the cluster, some nodes would hold more than one partition of a particular topic.
Each partition has its leader, which is a broker responsible for handling all reads and writes to the partition. Additionally, there can be one or more follower replicas whose responsibility is to replicate data appended to the primary, leader replica.
Sometimes the follower can’t keep up with the leader. If the cluster notices the partition is lagging behind with replication, it moves it out of the ISR (in-sync replicas) list until it catches up.
If the broker crashes or it’s not reachable, then all of its leader partitions also become unavailable. Kafka responds instantaneously. One of the follower replicas takes over and is promoted as the new leader. For this reason, no broker in the cluster is a single point of failure for topics with a replication factor bigger than one. In Confluent Cloud, the minimum value for the replication factor is 3. In general, configuring a topic for 3 replicas is considered a trade-off between ensuring data durability and performance.
Kafka can be very easily horizontally scaled simply by adding additional brokers to the cluster. Some known Kafka clusters consist of up to thousands of nodes. After a new node is added to the system, some of the partitions from the existing brokers can be moved to a new one. The more partitions a topic has, the more work can be done in parallel, thus allowing more flexible scaling. On the other hand, having too many partitions may also cause performance problems, like increased recovery time after the crash of a broker. So it’s very important to reach that sweet spot when choosing the number of partitions for a new topic.
Reliability of Kafka based systems
Another problem of distributed systems is related to finding a balance between reliability and availability/performance.
Sometimes the reliability of the messaging system plays a rather minor role. For example, for events produced by telemetry sensors measuring temperature, even if some of the readings are lost, it’s still possible to interpolate values from others.
By setting the producer to setting acks to 0, we configure Kafka to function in a fire-and-forget mode. Even if the message never reaches the broker, the delivery will be never retried and the message will be effectively lost.
We can make the system more robust by setting acks to 1. The producer will now retry sending messages until broker acknowledgement. There’s still a small chance that a message will be lost if the broker goes down after it acked the message, but before it was replicated to follower replicas.
If producer setting acks is set to all, the message is acked only when it is persisted on all in-sync replicas. With this setting, we can ensure very high reliability. Apart from cases when a disaster destroys whole datacenters, we can be sure that a message will not be lost if it was acked. From a cluster perspective, it's another duty for brokers that can slightly increase latency and lower the throughput.
Kafka takes only in-sync replicas into consideration when waiting for acks. Setting
min.insync.replicas to 2 ensures that the leader will ack only if there’s at least one follower in-sync replica that stored the message in its log. If there are no such replicas at the moment of producing a new message, the request will fail. The consequence of that is that we sacrifice some cluster availability and fault tolerance in exchange for ensured data durability.
Furthermore, only replicas on the ISR list can be elected as a new leader of the partition. By setting
unclean.leader.election.enable=true, we can relax this requirement. This has a drawback: if an under-replicated partition is chosen the leader, some messages will be lost. On the other hand, allowing for unclean elections greatly improves fault tolerance and the availability of the cluster.
After the receipt of the message is acknowledged by all replica brokers, Kafka updates the offset called a high watermark. Only messages previous to high watermark are made available for consumers. This makes sure consumers won’t get messages that can possibly be lost.
Kafka knows which messages were already seen by consumers in the consumer group by saving the offset of the last message that was read per partition. With default settings, these offsets are stored in internal topic
__consumer_offsets. Alternatively, they can be stored in the local storage of the consumer application, for example as part of an ACID transaction.
In Kafka, messages are stored durably in brokers (until they are compacted). So if a consumer encounters any problem with processing the received data, it is just enough to not commit the offset. The message will be then resent next time the consumer will be polling the broker for messages. This assures messages will be processed by the consumers eventually, even in case of failure.
As you can see, Apache Kafka can be fine-tuned for better availability or assured data consistency. Unfortunately, we can’t get everything at once. This is all in accordance with CAP theorem that states that a distributed system can be either tuned for consistency and partition tolerance (CP) or availability and partition tolerance (AP).
Achieving great performance
Optimizing a distributed system for great performance while retaining availability and reliability can be challenging in many ways.
Kafka was designed with performance in mind. It is able to handle more than 10 GB of data per second and achieve latency under 10ms. Moreover, it remains stable even when working under high stress.
Furthermore, Kafka can be adjusted for lower latency or higher throughput. For example, producers can put messages that are intended for the same partitions into batches. This can be configured by settings batch.size (sets default batch size) and
linger.ms (sets the upper bound in milliseconds on the delay while waiting for messages to batch). Batching is particularly beneficial for throughput in combination with compression. On the other hand, if having low latency is crucial, producers can be configured to send a message right away and not attempt to batch. This will also increase the overall number of TCP requests from the producer.
Another producer setting worth mentioning in terms of performance is
max.in.flight.requests.per.connection. Using any value greater than 1 allows the producer to send requests in parallel without waiting for ack from previous requests. This setup greatly improves performance but can cause some issues if some of the requests succeed and some fail and need to be retried. If later messages are acked before the earlier ones, the order of events will be mixed up. This can be mitigated by turning on yet another producer setting
max.in.flight.requests.per.connection no greater than 5, it will ensure the right ordering of messages. It will also eliminate duplicate messages, making producer-broker communication work with exactly-once delivery guarantee.
For unencrypted traffic, Kafka uses the so-called zero-copy principle. That means it avoids doing memory allocations when writing messages to disk or reading them and sending them over the network. Data is transferred directly from the read buffer to the socket buffer.
Unfortunately, this optimization is lost if the broker needs to encrypt the data using TLS. For that reason, it might be worth considering a TLS termination and using unencrypted Kafka communication in internal networks, especially for applications with strict performance requirements.
In the previous paragraph, I described how Kafka can be tweaked for superior latency and throughput at the expense of reliability. The general rule of thumb is the more we loosen the requirements for consistency, the better performance we can get.
Reactive applications with Kafka
Kafka is a great tool for building reactive systems. The Reactive Manifesto defines the key characteristics of a reactive system: responsive, resilient, elastic, and message-driven.
This architecture style relies on non-blocking, asynchronous message-passing that ensures loose coupling between components. Async communication enables producers to create messages even when consumers are offline. Also, consumers can process messages independently from producers. Messages are pulled from brokers by consumers, which allows for fine-grained flow control and back-pressure.
The built-in resilience and scalability of Kafka help with creating truly reactive systems: elastic, fault-tolerant, and responsive.
Are there any downsides?
Kafka is a fantastic platform for creating distributed and reactive systems. On the other hand from the operations perspective, it is just another piece of infrastructure that your team would need to maintain and monitor. An amazing set of features comes with the cost of a fairly complicated setup.
Before version 2.8, the Kafka cluster required an additional cluster of Zookeeper nodes for storing some of the metadata. After the eponymous ticket, KIP-500 was finally delivered and released in April 2021, Kafka became self-sufficient. This simplified the architecture by consolidating all metadata management in Kafka itself.
Still, the proper configuration of the cluster is not trivial. A multitude of available settings can be sometimes overwhelming and misconfiguration of the cluster can cause data loss during outages of one of the brokers. For example, the default value of broker setting
default.replication.factor is 1. That means all topics created without explicitly passing replication factor will have no replication!
Knowledge of how Kafka should be monitored, configured, and maintained is a must-have if you plan to run your cluster. If you don't feel ready to handle your own Kafka system, you can always choose a managed service from one of plenty of cloud providers (like MSK from Amazon or Confluent Cloud).
Building distributed systems is not an easy job. Kafka makes this task easier by providing a framework we can build our system around. Kafka takes on the burden of handling all the problems related to distributed computing: node failures, replication or ensuring data integrity. It makes Kafka a great candidate for the fundamental piece of architecture, a central log that can serve as a source of truth for other services. These services can stay completely stateless or keep local copies of data.
Kafka can be flexibly tweaked for better performance, availability or durability. This makes it suitable for a wide variety of applications. Since its launch in 2011, it has been adopted by many high-profile tech companies like LinkedIn, Netflix or Uber. I dare say it became de facto the default choice for a distributed pub/sub system (although it has capable competitors, like Apache Pulsar).
I hope this article gave you some insights into how Kafka can solve problems related to managing distributed systems. Good luck with building resilient and fault-tolerant applications!
If you want to compare characteristics of Kafka and other message queues read this article: Evaluating persistent, replicated message queues.