Contents

Hands-on Kafka Streams in Scala webp image

Kafka is getting more and more popular and I think that as a software developer, it’s good to have some hands-on experience with it. The best opportunity to learn it is probably in a real-life project but as always, playing around in a local environment is a good starting point.

In the following blog post, I will show you how you can combine data from multiple topics using Kafka Streams as well as produce and consume data with Kafka. I will also show you how to set up schemas for your data using Avro. Our example domain will describe car metrics like speed and location and our Kafka Streams application will transform them into some driver notifications. The code is written in Scala, so we will keep our types as safe as possible! Let's start.

If you are not familiar with Kafka and stream processing concepts in general, then, as always, the SoftwareMill blog has something for you. Please refer to this article about Kafka use cases by Maria. If you are looking for some verified resources for learning, check out this listing proposed by Michał.

Use case

The domain is fairly simple, so we can focus on Kafka-related code. It defines a car id along with speed, engine and location metrics, as well as location data and driver notifications that our Kafka Streams application will produce. Probably that kind of data needs to be collected by car sensors and processed in order to provide drivers with real-time notifications and advice. Our goal is to inform the driver when the speed limit is exceeded, there's traffic on the road ahead, the fuel level is low, and when the engine rpm is too high.

package object domain {

  case class CarId(value: Int)

  case class CarSpeed(value: Int)
  case class CarEngine(rpm: Int, fuelLevel: Double)
  case class CarLocation(locationId: LocationId)

  case class DriverNotification(msg: String)

  case class LocationId(city: String, street: String)
  case class LocationData(speedLimit: Int, trafficVolume: TrafficVolume.Value, gasStationNearby: Boolean)

  object TrafficVolume extends Enumeration {
    val Low, Medium, High = Value
  }
}

Let's create our topics.

kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic car-speed
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic car-engine
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic car-location
kafka-topics --create --bootstrap-server kafka:9092 --partitions 1 --topic location-data --config "cleanup.policy=compact"
kafka-topics --create --bootstrap-server kafka:9092 --partitions 2 --topic driver-notification

I will explain the cleanup.policy=compact when we get to our streaming application.

Schema

Let's define the schema of our data that is understandable for Kafka. Our docker-compose.yml defines a Confluent Schema Registry service which exposes a REST Api for managing schemas of our data, either in Avro,
Protobuf or Json Schema format. I chose to use Avro combined with the Avro4s library as it lets us easily generate schemas from case classes as well as serialize/deserialize them.

Each key/value pair that we use as a topic data needs to have a Schema and RecordFormat instance. Avro instances of Schema, Encoder, and Decoder are derived from case classes at compile time. Additionally, we are differentiating key and value RecordSchema by tagging them, we will need that later on.

package object avro {
  type KeyRFTag
  type KeyRecordFormat[K] = RecordFormat[K] @@ KeyRFTag

  type ValueRFTag
  type ValueRecordFormat[V] = RecordFormat[V] @@ ValueRFTag

  val carIdSchema: Schema = AvroSchema[CarId]
  val carSpeedSchema: Schema = AvroSchema[CarSpeed]

  implicit val carIdRF: KeyRecordFormat[CarId] = RecordFormat[CarId].taggedWith[KeyRFTag]
  implicit val carSpeedRF: ValueRecordFormat[CarSpeed] = RecordFormat[CarSpeed].taggedWith[ValueRFTag]

  // same for other types ...
}

The defined Schema's has to be registered in the Schema Registry service. It falls down to serializing them into JSON format (done by their toString() method) and sending them to the subjects/{subject}/versions endpoint. One way of doing this, which I’ve chosen, is to write a simple app and run it after the Schema Registry service starts up.

Producing

Since we have no "real" source of car metrics data, we need to generate random data and send it to Kafka ourselves. We configure the Kafka Producer as follows:

  private val props: Map[String, Object] = Map(
    CLIENT_ID_CONFIG -> "car-metrics-producer",
    BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
    KEY_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
    VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
    SCHEMA_REGISTRY_URL_CONFIG -> "http://schema-registry:8081"
  )

We point it to the Kafka broker, as well as the Schema Registry instance, specifying KafkaAvroSerializer from the Kafka library for key/value serialization. That serializer will look up schemas for the given topic in the Schema Registry service and serialize our data accordingly so that consumers using Avro deserializers can read it.

Let's define a generic method that will let us send key/value pairs as long as we provide implicit RecordFormat instances for them (which we have in our avro object). We are using a single KafkaProducer bound to the IndexedRecord type from the Avro library and the RecordSchema.to() method for case class to IndexedRecord conversion.

  private def send[K, V](
      producer: KafkaProducer[IndexedRecord, IndexedRecord]
  )(topic: String, records: Seq[(K, V)])(implicit krf: RecordFormat[K], vrf: RecordFormat[V]): IO[Unit] =
    records.traverse {
      case (k, v) =>
        val p = Promise[Unit]()
        producer.send(
          new ProducerRecord[IndexedRecord, IndexedRecord](topic, krf.to(k), vrf.to(v)),
          new Callback {
            override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
              Option(exception).map(p.failure).getOrElse(p.success(()))
          }
        )
        IO.fromFuture(IO(p.future)) *> IO(println(s"produced data to [$topic]")) *> IO.sleep(2.seconds)
    }.void

We are converting java.util.concurrent.Future to Scala using the KafkaProducer.send method with a callback where we complete the defined Promise. This allows us to convert to scala.concurrent.Future and next into IO. Having that, we can produce our data in an infinite loop.

IO(RandomData.carSpeed).flatMap(send(producer)("car-speed", _)).foreverM

We do the same for other topics.

Streaming

It's time to aggregate our data and produce driver notifications. It's common in stream processes to distinct three elements - source, flow, and sink, which, in short terms, means how we collect, transform, and store resulting data. In terms of Kafka Streams, these steps are defined as stream processors and are connected in a graph forming a topology.

Serdes

A Serde type comes from the Kafka Streams library and defines how a key or value can be serialized and deserialized, therefore how it can be written or read from a topic. We are using Avro, so let's look how we can implement Serde integrated with the Schema Registry service. Again, we take advantage of the RecordFormat instances defined earlier.

object AvroSerdes {

  private val props = Map("schema.registry.url" -> "http://schema-registry:8081")

  implicit def keySerde[K >: Null](implicit krf: KeyRecordFormat[K]): Serde[K] = {
    val avroKeySerde = new GenericAvroSerde
    avroKeySerde.configure(props.asJava, true)
    avroKeySerde.forCaseClass[K]
  }

  implicit def valueSerde[V >: Null](implicit vrf: ValueRecordFormat[V]): Serde[V] = {
    val avroValueSerde = new GenericAvroSerde
    avroValueSerde.configure(props.asJava, false)
    avroValueSerde.forCaseClass[V]
  }

  implicit class CaseClassSerde(inner: Serde[GenericRecord]) {
    def forCaseClass[T >: Null](implicit rf: RecordFormat[T]): Serde[T] = {
      Serdes.fromFn(
        (topic, data) => inner.serializer().serialize(topic, rf.to(data)),
        (topic, bytes) => Option(rf.from(inner.deserializer().deserialize(topic, bytes)))
      )
    }
  }
}

The reason we need to differentiate the KeyRecordFormat and ValueRecordFormat is the GenericAvroSerde.configure method that needs to know whether it is handling a key or a value.

public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys)

Data collection

Having a configuration that points to a Kafka broker and StreamsBuilder instance to create our topology, let's define our source.

  val props = new Properties()
  props.put(APPLICATION_ID_CONFIG, "driver-notifier")
  props.put(BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")

  val builder: StreamsBuilder = new StreamsBuilder

Car data topics are by default partitioned by key, CarId in that case. Each car-related topic has two partitions and the same key schema that matches the definition of topics being co-partitioned. Why is that important from the perspective of Kafka Streams application? It makes the Kafka broker assign the same partition number to the same application instance. Therefore, we can be certain that records for CarId(123) will be processed in one place.

  val carSpeed: KGroupedStream[CarId, CarSpeed] = builder.stream[CarId, CarSpeed]("car-speed").groupByKey
  val carEngine: KGroupedStream[CarId, CarEngine] = builder.stream[CarId, CarEngine]("car-engine").groupByKey
  val carLocation: KGroupedStream[CarId, CarLocation] = builder.stream[CarId, CarLocation]("car-location").groupByKey
  val locationData: KTable[LocationId, LocationData] = builder.table[LocationId, LocationData]("location-data")

We use KGroupedStream which represents a stream of messages from a given topic. For location data, we use KTable, which is a representation of a compacted topic. A compacted topic stores only the latest value for a given key. When a new record is ingested, then the corresponding record value is updated or a new one is added in case of a new key. Grouping by key and creating a table required Grouped and Consumed instances that come from ImplicitConversions from the kafka-streams-scala library - a Scala wrapper for Java API. We take advantage of our implicit Serde instances here.

  implicit def consumedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K, V] = Consumed.`with`[K, V]

Next, let's define an aggregate for car data and perform aggregaetion of car related topics.

  case class CarData(speed: Option[CarSpeed], engine: Option[CarEngine], location: Option[CarLocation])

  object CarData {
    val empty: CarData = CarData(None, None, None)
  }
  val carData: KTable[CarId, CarData] = carSpeed
    .cogroup[CarData]({ case (_, speed, agg) => agg.copy(speed = speed.some) })
    .cogroup[CarEngine](carEngine, { case (_, engine, agg) => agg.copy(engine = engine.some) })
    .cogroup[CarLocation](carLocation, { case (_, location, agg) => agg.copy(location = location.some) })
    .aggregate(CarData.empty)

Aggregation is a key- based stateful operation, meaning it needs some kind of persistent storage. Kafka Streams terminology refers to that as state store. By default it uses RocksDB.

The code above with the usage of cogroup method will create one state store instance. Actually, before Kafka Streams v2.5.0, we would have to perform separate joins resulting in creation of three state stores in total. The approach here requires us to define an aggregate that gets updated every time a new input comes to a topic. Generally, if we have a set of streams that all together logically form a larger object and are sharing the same key then cogroup is the way to go. In a real-life scenario, we will most likely also use windowing for our aggregation, but that seemed like a topic for another article for me, so I skipped it for now. Here you can read more about the motivations behind adding the cogroup feature.

Having aggregated CarData, we need one more step to collect the input needed for producing driver notifications which is a join with location data.

  val carAndLocationData: KTable[CarId, CarAndLocationData] = carData
    .filter({ case (_, carData) => carData.location.isDefined })
    .join[CarAndLocationData, LocationId, LocationData](
      locationData,
      keyExtractor = (carData: CarData) => carData.location.get.locationId,
      joiner = (carData: CarData, locationData: LocationData) => CarAndLocationData(carData, locationData),
      materialized = implicitly[Materialized[CarId, CarAndLocationData, KeyValueStore[Bytes, Array[Byte]]]]
    )
  )

We filter out the aggregates that do not contain the location data in it in order to safely join based on LocationId. We call join specifying the KTable to join with, in our case, locationData, keyExtractor which takes location from our car aggregate, joiner creating CarAndLocationData as well as Materialized coming from ImplicitConversions.

  implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde: Serde[V]): Materialized[K, V, S]

Data transformations

Now we are ready to turn car and location data into notifications using stateless flatMapValues transformation and store that in our sink.

carAndLocationData.toStream.flatMapValues(DriverNotifications(_)).to("driver-notification")

We do our checks by using PartialFunctions since they are only defined for some specific contents of the aggregate.

object DriverNotifications {

  def apply(data: CarAndLocationData): List[DriverNotification] =
    List(checkSpeed, checkTrafficVolume, checkEngineRPM, checkFuelLevel).flatten(_.lift(data))

  private val checkSpeed: PartialFunction[CarAndLocationData, DriverNotification] = {
    case CarAndLocationData(CarData(Some(speed), _, _), LocationData(speedLimit, _, _)) if speed.value > speedLimit =>
      DriverNotification(s"Slow down, speed limit $speedLimit")
  }

  private val checkTrafficVolume: PartialFunction[CarAndLocationData, DriverNotification] = {
    case CarAndLocationData(CarData(_, _, Some(location)), LocationData(_, TrafficVolume.High, _)) =>
      DriverNotification(s"High traffic ahead on ${location.locationId.street} street")
  }

  ...
}

Last but not least, we need to build our topology and start the KafkaStreams application.

  val topology = builder.build()
  val streams = new KafkaStreams(topology, props)

  streams.start()

  Runtime.getRuntime.addShutdownHook(new Thread(() => streams.close()))

Consuming

We will consume from the driver-notification topic in order to see what our streams application has produced. Similarly to the producer code, we need some configuration, nothing new here, the important thing is that we're using KafkaAvroDeserializer for deserialization based on Avro schemas from the Schema Registry service.

  val props: Map[String, Object] = Map(
    GROUP_ID_CONFIG -> "car-metrics-consumer",
    BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
    KEY_DESERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroDeserializer],
    VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroDeserializer],
    SCHEMA_REGISTRY_URL_CONFIG -> "http://schema-registry:8081"
  )

Next we need to instantiate KafkaConsumer and poll for some records. Let's do that in a generic manner assuming that each of our consumers will poll from a single topic and print out the records. We create a Resource from our consumer and subscribe to the given topic. In a loop, we poll for 5 seconds, deserialize records, and print them to the console.

  private def pollForever[K, V](topic: String)(implicit krf: RecordFormat[K], vrf: RecordFormat[V]): IO[Nothing] =
    Resource
      .make(IO {
        val consumer = new KafkaConsumer[IndexedRecord, IndexedRecord](CarDataConsumer.props.asJava)
        consumer.subscribe(Seq(topic).asJava)
        consumer
      })(c => IO(println(s"[$topic] closing consumer...")) *> IO(c.close()))
      .use { consumer =>
        val consume: IO[Unit] = for {
          records <- IO(consumer.poll(Duration.ofSeconds(5)).asScala.toSeq)
          keyValue = records.map { r => (krf.from(r.key()), vrf.from(r.value())) }
          _ <- keyValue.traverse { case (k, v) => IO(println(s"[$topic] $k => $v")) }
        } yield ()
        consume.foreverM
      }

Finally, let's look at some notifications from the console output.

pollForever[CarId, DriverNotification]("driver-notification").as(ExitCode.Success)
[driver-notification] CarId(1) => DriverNotification(Shift up a gear)
[driver-notification] CarId(1) => DriverNotification(Low fuel level, navigate to nearest gas station?)
[driver-notification] CarId(2) => DriverNotification(Slow down, speed limit 50)
[driver-notification] CarId(2) => DriverNotification(High traffic ahead on Sezamowa street)

Summary

We got our hands dirty with some Kafka code, introducing basic concepts like producing and consuming data from topics as well as implementing Kafka Streams application in order to do some real time data processing. We also learned how to integrate Kafka components with Schema Registry service as well as handle schema generation, serialization and deserialization of our case classes with Avro. I encourage you to take a look at the whole code available on github and run the example yourself. You can also dive deeper into Kafka and try to experiment with that example as a starting point.

Note that my intention was to mainly get you familiar with some Kafka code. Intentionally, I didn't dive into more complex stuff, configuration, and internals.

If any questions arise, feel free to email me and I will be happy to answer ;) Stay safe!

Looking for quality content around stream processing, big data, or machine learning? Subscribe to Data Times for a montlhy dose of fresh tech content curated by SoftwareMill's engineers!

Blog Comments powered by Disqus.