Hands-on Kafka Streams in Scala
Kafka is getting more and more popular and I think that as a software developer, it’s good to have some hands-on experience with it. The best opportunity to learn it is probably in a real-life project but as always, playing around in a local environment is a good starting point.
In the following blog post, I will show you how you can combine data from multiple topics using Kafka Streams as well as produce and consume data with Kafka. I will also show you how to set up schemas for your data using Avro. Our example domain will describe car metrics like speed and location and our Kafka Streams application will transform them into some driver notifications. The code is written in Scala, so we will keep our types as safe as possible! Let's start.
If you are not familiar with Kafka and stream processing concepts in general, then, as always, the SoftwareMill blog has something for you. Please refer to this article about Kafka use cases by Maria. If you are looking for some verified resources for learning, check out this listing proposed by Michał.
Use case
The domain is fairly simple, so we can focus on Kafka-related code. It defines a car id along with speed, engine and location metrics, as well as location data and driver notifications that our Kafka Streams application will produce. Probably that kind of data needs to be collected by car sensors and processed in order to provide drivers with real-time notifications and advice. Our goal is to inform the driver when the speed limit is exceeded, there's traffic on the road ahead, the fuel level is low, and when the engine rpm is too high.
package object domain {
case class CarId(value: Int)
case class CarSpeed(value: Int)
case class CarEngine(rpm: Int, fuelLevel: Double)
case class CarLocation(locationId: LocationId)
case class DriverNotification(msg: String)
case class LocationId(city: String, street: String)
case class LocationData(speedLimit: Int, trafficVolume: TrafficVolume.Value, gasStationNearby: Boolean)
object TrafficVolume extends Enumeration {
val Low, Medium, High = Value
}
}
Let's create our topics.
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic car-speed
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic car-engine
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic car-location
kafka-topics --create --bootstrap-server kafka:9092 --partitions 1 --topic location-data --config "cleanup.policy=compact"
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic driver-notification
I will explain the cleanup.policy=compact when we get to our streaming application.
Schema
Let's define the schema of our data that is understandable for Kafka. Our docker-compose.yml
defines a Confluent Schema Registry service which exposes a REST Api for managing schemas of our data, either in Avro,
Protobuf or Json Schema format. I chose to use Avro combined with the Avro4s library as it lets us easily generate schemas from case classes as well as serialize/deserialize them.
Each key/value pair that we use as a topic data needs to have a Schema
and RecordFormat
instance. Avro instances of Schema
, Encoder
, and Decoder
are derived from case classes at compile time. Additionally, we are differentiating key and value RecordSchema
by tagging them, we will need that later on.
package object avro {
type KeyRFTag
type KeyRecordFormat[K] = RecordFormat[K] @@ KeyRFTag
type ValueRFTag
type ValueRecordFormat[V] = RecordFormat[V] @@ ValueRFTag
val carIdSchema: Schema = AvroSchema[CarId]
val carSpeedSchema: Schema = AvroSchema[CarSpeed]
implicit val carIdRF: KeyRecordFormat[CarId] = RecordFormat[CarId].taggedWith[KeyRFTag]
implicit val carSpeedRF: ValueRecordFormat[CarSpeed] = RecordFormat[CarSpeed].taggedWith[ValueRFTag]
// same for other types ...
}
The defined Schema
's has to be registered in the Schema Registry service. It falls down to serializing them into JSON format (done by their toString()
method) and sending them to the subjects/{subject}/versions
endpoint. One way of doing this, which I’ve chosen, is to write a simple app and run it after the Schema Registry service starts up.
Partner with Scala Experts to build complex applications efficiently and with improved code accuracy. Working code delivered quickly and confidently. Explore the offer >>
Producing
Since we have no "real" source of car metrics data, we need to generate random data and send it to Kafka ourselves. We configure the Kafka Producer as follows:
private val props: Map[String, Object] = Map(
CLIENT_ID_CONFIG -> "car-metrics-producer",
BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
KEY_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
SCHEMA_REGISTRY_URL_CONFIG -> "http://schema-registry:8081"
)
We point it to the Kafka broker, as well as the Schema Registry instance, specifying KafkaAvroSerializer
from the Kafka library for key/value serialization. That serializer will look up schemas for the given topic in the Schema Registry service and serialize our data accordingly so that consumers using Avro deserializers can read it.
Let's define a generic method that will let us send key/value pairs as long as we provide implicit RecordFormat
instances for them (which we have in our avro
object). We are using a single KafkaProducer
bound to the IndexedRecord
type from the Avro library and the RecordSchema.to()
method for case class to IndexedRecord
conversion.
private def send[K, V](
producer: KafkaProducer[IndexedRecord, IndexedRecord]
)(topic: String, records: Seq[(K, V)])(implicit krf: RecordFormat[K], vrf: RecordFormat[V]): IO[Unit] =
records.traverse {
case (k, v) =>
val p = Promise[Unit]()
producer.send(
new ProducerRecord[IndexedRecord, IndexedRecord](topic, krf.to(k), vrf.to(v)),
new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
Option(exception).map(p.failure).getOrElse(p.success(()))
}
)
IO.fromFuture(IO(p.future)) *> IO(println(s"produced data to [$topic]")) *> IO.sleep(2.seconds)
}.void
We are converting java.util.concurrent.Future
to Scala using the KafkaProducer.send
method with a callback where we complete the defined Promise
. This allows us to convert to scala.concurrent.Future
and next into IO
. Having that, we can produce our data in an infinite loop.
IO(RandomData.carSpeed).flatMap(send(producer)("car-speed", _)).foreverM
We do the same for other topics.
Streaming
It's time to aggregate our data and produce driver notifications. It's common in stream processes to distinct three elements - source, flow, and sink, which, in short terms, means how we collect, transform, and store resulting data. In terms of Kafka Streams, these steps are defined as stream processors and are connected in a graph forming a topology.
Serdes
A Serde
type comes from the Kafka Streams library and defines how a key or value can be serialized and deserialized, therefore how it can be written or read from a topic. We are using Avro, so let's look how we can implement Serde
integrated with the Schema Registry service. Again, we take advantage of the RecordFormat
instances defined earlier.
object AvroSerdes {
private val props = Map("schema.registry.url" -> "http://schema-registry:8081")
implicit def keySerde[K >: Null](implicit krf: KeyRecordFormat[K]): Serde[K] = {
val avroKeySerde = new GenericAvroSerde
avroKeySerde.configure(props.asJava, true)
avroKeySerde.forCaseClass[K]
}
implicit def valueSerde[V >: Null](implicit vrf: ValueRecordFormat[V]): Serde[V] = {
val avroValueSerde = new GenericAvroSerde
avroValueSerde.configure(props.asJava, false)
avroValueSerde.forCaseClass[V]
}
implicit class CaseClassSerde(inner: Serde[GenericRecord]) {
def forCaseClass[T >: Null](implicit rf: RecordFormat[T]): Serde[T] = {
Serdes.fromFn(
(topic, data) => inner.serializer().serialize(topic, rf.to(data)),
(topic, bytes) => Option(rf.from(inner.deserializer().deserialize(topic, bytes)))
)
}
}
}
The reason we need to differentiate the KeyRecordFormat
and ValueRecordFormat
is the GenericAvroSerde.configure
method that needs to know whether it is handling a key or a value.
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys)
Data collection
Having a configuration that points to a Kafka broker and StreamsBuilder
instance to create our topology, let's define our source.
val props = new Properties()
props.put(APPLICATION_ID_CONFIG, "driver-notifier")
props.put(BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
val builder: StreamsBuilder = new StreamsBuilder
Car data topics are by default partitioned by key, CarId
in that case. Each car-related topic has two partitions and the same key schema that matches the definition of topics being co-partitioned. Why is that important from the perspective of Kafka Streams application? It makes the Kafka broker assign the same partition number to the same application instance. Therefore, we can be certain that records for CarId(123)
will be processed in one place.
val carSpeed: KGroupedStream[CarId, CarSpeed] = builder.stream[CarId, CarSpeed]("car-speed").groupByKey
val carEngine: KGroupedStream[CarId, CarEngine] = builder.stream[CarId, CarEngine]("car-engine").groupByKey
val carLocation: KGroupedStream[CarId, CarLocation] = builder.stream[CarId, CarLocation]("car-location").groupByKey
val locationData: KTable[LocationId, LocationData] = builder.table[LocationId, LocationData]("location-data")
We use KGroupedStream
which represents a stream of messages from a given topic. For location data, we use KTable
, which is a representation of a compacted topic. A compacted topic stores only the latest value for a given key. When a new record is ingested, then the corresponding record value is updated or a new one is added in case of a new key. Grouping by key and creating a table required Grouped
and Consumed
instances that come from ImplicitConversions
from the kafka-streams-scala library - a Scala wrapper for Java API. We take advantage of our implicit Serde
instances here.
implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] = Consumed.`with`[K, V]
Next, let's define an aggregate for car data and perform aggregaetion of car related topics.
case class CarData(speed: Option[CarSpeed], engine: Option[CarEngine], location: Option[CarLocation])
object CarData {
val empty: CarData = CarData(None, None, None)
}
val carData: KTable[CarId, CarData] = carSpeed
.cogroup[CarData]({ case (_, speed, agg) => agg.copy(speed = speed.some) })
.cogroup[CarEngine](carEngine, { case (_, engine, agg) => agg.copy(engine = engine.some) })
.cogroup[CarLocation](carLocation, { case (_, location, agg) => agg.copy(location = location.some) })
.aggregate(CarData.empty)
Aggregation is a key- based stateful operation, meaning it needs some kind of persistent storage. Kafka Streams terminology refers to that as state store. By default it uses RocksDB.
The code above with the usage of cogroup
method will create one state store instance. Actually, before Kafka Streams v2.5.0
, we would have to perform separate joins resulting in creation of three state stores in total. The approach here requires us to define an aggregate that gets updated every time a new input comes to a topic. Generally, if we have a set of streams that all together logically form a larger object and are sharing the same key then cogroup
is the way to go. In a real-life scenario, we will most likely also use windowing for our aggregation, but that seemed like a topic for another article for me, so I skipped it for now. Here you can read more about the motivations behind adding the cogroup
feature.
Having aggregated CarData
, we need one more step to collect the input needed for producing driver notifications which is a join with location data.
val carAndLocationData: KTable[CarId, CarAndLocationData] = carData
.filter({ case (_, carData) => carData.location.isDefined })
.join[CarAndLocationData, LocationId, LocationData](
locationData,
keyExtractor = (carData: CarData) => carData.location.get.locationId,
joiner = (carData: CarData, locationData: LocationData) => CarAndLocationData(carData, locationData),
materialized = implicitly[Materialized[CarId, CarAndLocationData, KeyValueStore[Bytes, Array[Byte]]]]
)
)
We filter out the aggregates that do not contain the location data in it in order to safely join based on LocationId
. We call join
specifying the KTable
to join with, in our case, locationData
, keyExtractor
which takes location from our car aggregate, joiner
creating CarAndLocationData
as well as Materialized
coming from ImplicitConversions
.
implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde: Serde[V]): Materialized[K, V, S]
Data transformations
Now we are ready to turn car and location data into notifications using stateless flatMapValues
transformation and store that in our sink.
carAndLocationData.toStream.flatMapValues(DriverNotifications(_)).to("driver-notification")
We do our checks by using PartialFunctions
since they are only defined for some specific contents of the aggregate.
object DriverNotifications {
def apply(data: CarAndLocationData): List[DriverNotification] =
List(checkSpeed, checkTrafficVolume, checkEngineRPM, checkFuelLevel).flatten(_.lift(data))
private val checkSpeed: PartialFunction[CarAndLocationData, DriverNotification] = {
case CarAndLocationData(CarData(Some(speed), _, _), LocationData(speedLimit, _, _)) if speed.value > speedLimit =>
DriverNotification(s"Slow down, speed limit $speedLimit")
}
private val checkTrafficVolume: PartialFunction[CarAndLocationData, DriverNotification] = {
case CarAndLocationData(CarData(_, _, Some(location)), LocationData(_, TrafficVolume.High, _)) =>
DriverNotification(s"High traffic ahead on ${location.locationId.street} street")
}
...
}
Last but not least, we need to build our topology and start the KafkaStreams
application.
val topology = builder.build()
val streams = new KafkaStreams(topology, props)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => streams.close()))
Consuming
We will consume from the driver-notification
topic in order to see what our streams application has produced. Similarly to the producer code, we need some configuration, nothing new here, the important thing is that we're using KafkaAvroDeserializer
for deserialization based on Avro schemas from the Schema Registry service.
val props: Map[String, Object] = Map(
GROUP_ID_CONFIG -> "car-metrics-consumer",
BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
KEY_DESERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroDeserializer],
VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroDeserializer],
SCHEMA_REGISTRY_URL_CONFIG -> "http://schema-registry:8081"
)
Next we need to instantiate KafkaConsumer
and poll for some records. Let's do that in a generic manner assuming that each of our consumers will poll from a single topic and print out the records. We create a Resource
from our consumer and subscribe
to the given topic. In a loop, we poll for 5 seconds, deserialize records, and print them to the console.
private def pollForever[K, V](topic: String)(implicit krf: RecordFormat[K], vrf: RecordFormat[V]): IO[Nothing] =
Resource
.make(IO {
val consumer = new KafkaConsumer[IndexedRecord, IndexedRecord](CarDataConsumer.props.asJava)
consumer.subscribe(Seq(topic).asJava)
consumer
})(c => IO(println(s"[$topic] closing consumer...")) *> IO(c.close()))
.use { consumer =>
val consume: IO[Unit] = for {
records <- IO(consumer.poll(Duration.ofSeconds(5)).asScala.toSeq)
keyValue = records.map { r => (krf.from(r.key()), vrf.from(r.value())) }
_ <- keyValue.traverse { case (k, v) => IO(println(s"[$topic] $k => $v")) }
} yield ()
consume.foreverM
}
Finally, let's look at some notifications from the console output.
pollForever[CarId, DriverNotification]("driver-notification").as(ExitCode.Success)
[driver-notification] CarId(1) => DriverNotification(Shift up a gear)
[driver-notification] CarId(1) => DriverNotification(Low fuel level, navigate to nearest gas station?)
[driver-notification] CarId(2) => DriverNotification(Slow down, speed limit 50)
[driver-notification] CarId(2) => DriverNotification(High traffic ahead on Sezamowa street)
Summary
We got our hands dirty with some Kafka code, introducing basic concepts like producing and consuming data from topics as well as implementing Kafka Streams application in order to do some real time data processing. We also learned how to integrate Kafka components with Schema Registry service as well as handle schema generation, serialization and deserialization of our case classes with Avro. I encourage you to take a look at the whole code available on github and run the example yourself. You can also dive deeper into Kafka and try to experiment with that example as a starting point.
Note that my intention was to mainly get you familiar with some Kafka code. Intentionally, I didn't dive into more complex stuff, configuration, and internals.
If any questions arise, feel free to email me and I will be happy to answer ;) Stay safe!