Contents

Kafka pitfalls - Q&A with a Kafka Architect

Jarosław Kijanowski

22 Oct 2020.12 minutes read

The idea of the tl;db (too long; didn't bother) blog series is to present blog posts, podcasts and conference talks in a distilled form.

This session, available here on YouTube, was held with one of SoftwareMill's architects, Andrzej Ludwikowski, and was about failures and problems encountered while working with Apache Kafka for the last 5 years.

Kafka pitfalls with a Software Architect

Why have you included Kafka in your architecture at first?

We required an architecture that was able to react to events in real time in a continuous manner. Basically we desired streaming capabilities, which is hard to simulate using most of databases. That's why we decided to introduce Apache Kafka to our ecosystem. After 2 years in production, I can say that it was a good move.

What is the first problem, you want to cover?

It has something to do with miss-configuration of the replication factor. Don't judge me too early, we understood the concept, but at that time it seemed logical to set it up in that precise way. I believe that you need a bit of background on how the project was designed.

Let's take a look at the first diagram.

ES with CQRS based architecture

The architecture is based on Event Sourcing and CQRS.

The domain part, responsible for handling commands, persists state in the form of events in an event store - which by the way is Apache Cassandra.

Events are read by projectors, which update various read models. These read models are then queried by many separate services.

Thanks to such an approach we completely separate the write-handling side of the system from read-handling part. This gives us maximum flexibility when it comes to scaling those parts, because we can do this completely separately. An almost classic CQRS architecture.

Since events are not only read by projectors, but also by other micro-services, we didn't want to expose the event store globally. We decided to introduce an additional layer, an event bus in form of Kafka. Every micro-service is able to read messages from the event bus since their schemas are public in our ecosystem.

Have you used the Schema Registry with Avro?

No, we used Google's Protocol Buffer instead and to be honest we are pretty satisfied with that choice but this is a whole different story for another conversation.

Back to your previous question. We use Apache Cassandra for the event store to persist events in an append-only fashion. This means, that Cassandra is the source of truth. We believed that in such a case, we don't have to care much about data being safely stored in Kafka, because it's only an additional buffer. Regardless of any failures, we can always go back to Cassandra, read all events and restore the state on Kafka. Moreover we had some constraints on disk space for the Kafka cluster. Therefore we decided to go with replication factor 2, ack all and min.isr 1

min in-sync replicas, RF, ack

When something can fail, it will definitely fail

The same was true for our Kafka cluster. Eventually one node went down and we lost a few events. Not that we were surprised. With this configuration, it's possible that Kafka may lose some data. Check out the link in the resource section below for more details.

Our issue here was, that we believed we could easily restore all the messages from the source of truth. Although it's possible, it turned out to take a lot of time and some additional manual steps. It's definitely not what you want to do, when your system is down and your users are waiting. Additionally, some of the projectors had been written in a way to stop processing events, if those events would introduced consistency issues - even if subsequent messages could make the state consistent again.

In consequence, we ended up with a system, where users were issuing requests/commands, but the read models were not updated accordingly. That's actually a good thing, that not the whole system was down, but just a part of it, one of the principles of reactive micro-services was met. Still, the part that was not working, was a pretty important one.

What's your key take away from this?

To achieve consistency between Cassandra and Kafka, I mean any DB with Kafka, it's way more cheaper to increase the cost for disk space, than recovering from the source of truth. Therefore we ended up with the following correct configuration: replication factor 3, ack all and min.isr 2.

proper min in-sync replicas, RF, ack

What other issues did you encounter?

It's connected with the infamous update of the Kafka client library in version 2.1.0. It was described many times in blog posts, among others by our colleague. I don't want to go much into details, but in a nutshell, the Kafka producer working with default properties, after this release, may not guarantee ordering on partition level. And when you rely heavily on correct order of messages - this is a huge risk for you.

It's been mentioned in the release notes, but only buried deep in the KIP-92 wiki page under Migration.

This is basically a non backward compatible change in my opinion. Anyway AFAIR we have read the release notes.

The problem is, we don't use the standard Java client producer to send messages to Kafka, but an elastic wrapper called Alpakka Kafka connector, also known as Reactive Kafka. While upgrading Alpakka we checked carefully what had changed, but somehow we overlooked the update of the Kafka client itself.

Once again one of the nodes in our Kafka cluster died. But the outcome was even worse than the previous one. This time we didn't lose any events, but their order got mixed up.

Fortunately some of the projectors were written in a way not to pick up any messages that were not in the right order. Otherwise some of our read models would get messed up very badly. Without this we probably wouldn't even realize that we have a problem.

After the first failure we learned our lesson and developed a mechanism that was able to drop and recreate the read models from the latest snapshot.
After understanding the core issue with the change in the Kafka producer, we configured it correctly, to maintain ordering again.

Producer settings

The lesson learnt here however is to read not only the release notes of libraries you use, but also to read the release notes of their dependencies.

Can you expect something worse?

Unfortunately yes. One day, there was a power failure and our whole data-center went down. All Kafka nodes were hit at the same time. Such things should never happen, but here we are.

The consequences of the power loss were dramatic. After the data-center was operational again, one node in one of our Kafka clusters couldn't start and join it, because its internal files were corrupted. We've been very lucky this time, since we could afford recreating the cluster from scratch, since data on this cluster was only for temporary usage.

Another Kafka cluster, the more important one, started successfully, but it was missing some events, nothing new. Although those events were actually processed by the projectors. This creates a whole new problematic situation.

Here's why.

DC blackout

Some of the projectors are not committing offsets at all. They store them in a database together with the outcome of the processed message, so called manual offset management. Doing this in a transactional manner allows achieving effectively-once processing.

During the failure, several projectors performed some actions and persisted the offset in the database, let's say value 123. But the last message, that was flushed to disk on the Kafka broker side, had the offset 122. Not a single node happened to persist the event with offset 123 as they all went down at the exact same time. When the cluster started again, the nodes agreed on the latest persisted offset - the value 122.

DC after recovery

This creates a bizarre situation, where your Kafka cluster is missing data, but this data was actually consumed by the projectors.

Let's explain why acknowledged data did not make it to the physical hard drive

When a broker acknowledges a producer's write it stores the data only in the page cache of the operating system. It is then the responsibility of the operating system to flush the data to disk.

It's perfectly fine, if the broker does not physically persist the data and blows up, because once it's joining the cluster again, it will first synchronize with other replicas and fetch the missing data.

Now the issue described above is that all nodes went down at the same time. There was no healthy replica to synchronize from.
Though you can change this behavior and flush for example after a given period of time or a given number of messages but then you lose on write performance.

Moreover you're still not absolutely safe, since even after the message is flushed from the page cache, there is another devil waiting, called the disk cache.

This is also true for Cassandra. When a write is acknowledged it goes into the page cache first and is flushed asynchronously.
Distributed systems are durable (let's say) only when some nodes survive the crash and have a copy of the data. Even then it's not so simple, but going back to the projector.

DC after recovery

When the projector started and asked for a message with offset 124, because that was the next expected offset from Kafka, the broker replied with not having this message, and then the projector started to read from the very beginning. That's the chosen behavior on projectors in case of a missing offset.

Consumer (projector) setting

In consequence we started reading millions and millions of messages. That put the Kafka and projector nodes at a heavy and prolonged load.
We couldn't wait that long so we had to manually adjust the offsets, not a very pleasant thing to do on production, when dealing with 50 partitions.

What is your advice when the whole datacenter is losing power

I will repeat myself once again, distributed systems like Kafka and Cassandra need to be set up on a proper infrastructure. When deployed in the cloud each node should reside in a different availability zone. Nodes should never go down all together, at the same time. However if your cluster lives on on-premise machines in a not so well operated data center and you are forced to use it, things get trickier. In that case, the only thing you can do is to prepare for a data center outage. Which is pretty challenging, no doubt.

Are there any failures that you've been able to prevent, because they have been noticed on staging?

The issues discussed so far would be difficult to recreate in our testing environment. But there are a few interesting problems I'd like to mention, which had been noticed before we went live.

The first issue is related to misinterpreting Kafka metrics which, in my opinion, are poorly explained in the documentation.

We had to move around 500GB of data from our SQL database to Kafka. Having set the replication factor to 3 it resulted in quite high requirements for disk space. Fortunately Kafka allows to enable compression of data before sending it to the nodes. For this to happen you have to choose between different algorithms and settings like linger.ms and batch.size.

Compression settings and metrics

We aimed for maximum compression, taking into account that consumers and producers will be under extra CPU load. Therefore we observed the producer's average compression rate metric. It turned out we misunderstood the value of the metric. It doesn't mean - the higher the value, the better the compression. It's the other way round. The value 1 means no compression at all and the lower the rate is, the smaller the size of the payload is.

A very similar and counterintuitive scenario happened to us with the producer's average batch size metric. This metric shows the size of the batch after the compression has been performed. No wonder we were not able to achieve expected value as configured by the batch.size parameter.

Have you considered Kafka Connect, the Schema Registry, Kafka Streams and ksql to stream data from SQL to Kafka?

We had high hopes for Kafka Connect. Although we like the concept very much, limitations of the connectors showed up very quickly.

Kafka Connect

For example, the jdbc source connector did not work well with older versions of MariaDB. It turned out that there was a bug on the db side. Our version of MariaDB didn't handle timestamps properly. That prevented us to use the connector or even to re-configure it appropriately.

Next one, the BigQuery sink connector is not able to deduplicate messages or rather does not allow to customize the default behavior of the deduplication mechanism, which is currently based only on consumer offsets. We would require it to happen on the selected message's field level, otherwise we would end up with a lot of duplicate messages in BigQuery. Another blocker for us.

Finally the source connector for Cassandra has some limitations as well. It allows to read data only from tables with a specific schema. A schema that permits to run queries for a given range. That obviously makes sense when it comes to performing any queries on Cassandra and to avoid full table scans. But if you are not so familiar with Cassandra you might be fooled that this connector will solve all your migration problems.

Kafka Connect is a great framework to build and run connectors up on. What I suggest is that you first create a proof of concept with a particular connector to see, if it fits your specific needs. Because as usual in such generic frameworks, the devil is in the detail.

Do you have any further advice on using Kafka in a real-world project?

It has been already mentioned, but let me stress it again - when operating a live Kafka cluster, monitoring is key. This means collecting and visualizing the complete set of producer, consumer and broker metrics.

Don't worry about the amount of data. You don't have to understand all of the metrics from the beginning, like me, but at least start gathering them. This will allow you to catch anomalies and with time you'll get familiar with them, you will learn which are the most important ones, which should be checked first in case of some problems. In our case we used Prometheus and Grafana for collecting metrics and sending alerts, which are also a pretty important thing to have on production.

There are also monitoring as a service tools available, like Datadog.

Last but not least, before you introduce Kafka into your architecture, I think it's worth to speak to someone who is already experienced with that - nomen omen - topic. Fortunately I'm working for a company with many highly experienced Kafka developers at SoftwareMill, check out our portfolio or our blog.

Resources

Blog Comments powered by Disqus.