Contents

Getting started with ZIO, part 1

Getting started with ZIO, part 1 webp image

Welcome! This text is the beginning of the series focusing on ZIO - a powerful Scala library for developing asynchronous and concurrent applications with ease. Throughout the series, I will guide you through the process of using various building blocks provided by the ZIO framework to build a fully functional application and ensure a good understanding of ZIO capabilities. We are going to use ZIO in version 2 during the series.

Based on the application, I will show you how to write ZIO services, create efficient streaming workflows, run an HTTP server with web sockets, solve concurrency issues using Software Transactional Memory (STM) and appropriate data structures, configure a ZIO app, and, of course, test it.

What are we building?

Context

You probably heard of Kafka, right? If you haven't, then I encourage you to read up a tiny bit of basic information before you dive into this article series. In a few words, it's a streaming platform giving you the ability to publish (write), store, and subscribe to (read) events.

In application architectures involving Kafka, it is common for it to be an internal component, residing within some VPC and not exposed to the outside world. Of course, this is a desirable setup, preventing unauthorized access to Kafka Brokers and safeguarding our data against malicious or simply mistakenly made operations. However, we can easily imagine a scenario where we may want clients to read certain data.

Let's imagine we are building a platform, where clients (think of external systems) upload data. Within our platform we perform various asynchronous computations on this data, generating notifications stored on Kafka topics internal to the platform. Assume we need to share those notifications with our clients since they need that information to start or proceed with their own workflow. We need to forward the notifications we internally store on topics locked within our infrastructure, and we want to do that in a real-time fashion, meaning that any new notification should be immediately delivered to the subscribed clients.

We need some middleware component to do that, and that's what we will build. Our goal is to stream events from internal topics and forward them to external clients who have subscribed. Bear in mind that there will be many simplifications included as the app is intended for educational purposes, but besides that, the code is production grade.

Overview

Before we delve into the system overview, let's clarify our requirements. Let's assume we essentially want to allow the client to "tail" certain Kafka topics. In other words, if a client unsubscribes and subscribes again in 10 minutes, they will not receive messages that were published during their absence. This makes implementation easier and will keep the code fairly simple and concise. For our service, we will use the name k-tail. Now, let's visualize what we aim for in a simple diagram.

k-tail overview

k tail overview

In our application, we will have two core processes. The first process involves consuming messages from Kafka, while the second focuses on forwarding those messages to subscribed clients. Let's look at the components involved, starting from the right.

To begin, we require a consumer instance that can continuously read new messages from a set of preconfigured topics. These messages will then be temporarily stored in a buffer. This buffer allows independent processing of message consumption and forwarding, enabling flexibility in terms of speed and error handling. Finally, we need a component dedicated to forwarding messages to clients. Let's refer to this component as the broadcast.

Together, these components will build up a web socket server, allowing clients to connect and receive new data in real-time.

Plan

There will be three parts of the Getting started with ZIO series, where we delve into the concepts of ZIO throughout the implementation process of our application.

Part one (this post) will focus on implementing the buffer and consumer components, where the main goal is to get familiar with ZIO types and understand how to create services and manage dependencies.

In part two, we are going to implement broadcast and server components. We will solve some concurrency challenges using ZIO building blocks and see how to run a web socket server.

Part three will focus on testing. We will use the ZIO testing library along with test containers.

Effects

If you have experience writing Scala code, you've likely come across the term effects. They are commonly referred to as a solution for managing parallel computations, concurrency, or I/O in our applications. An effect is typically associated with an output type parameter, such as Future[String] or IO[Int], and it can potentially encounter failures and rely on resources or services for execution.

In many cases, understanding these characteristics requires examining and analyzing the implementation details of methods or values of such types. However, with properly written ZIO code, sometimes looking at the type signature might provide us with much more basic information about a given effect.

In the ZIO framework, effect is defined as ZIO[-R, +E, +A] taking 3 type parameters, which may be confusing at first but after the second or two it becomes quite simple as it represents the following type ZIO[Require, Error, Output]. This type represents a description of a computation that requires things of type R, can fail with an error of type E, or produce a successful output of type A.

On the other hand, if some effect has no requirements, it will be of type ZIO[Any, _, _] and can run under "any" circumstances. Similarly, an effect that is guaranteed not to fail will be of type ZIO[_, Nothing, _].

Effects in ZIO are evaluated lazily, which means that creating a value of type ZIO does not immediately execute the underlying code. To run the effects and execute the program, we need to bootstrap our components and configure the runtime environment. ZIO offers strong compile-time guarantees that help us properly structure and organize dependencies. We will see examples of such features later.

There are a lot of type aliases in the library for the ZIO type. For example, an effect with no requirements that cannot fail is aliased as UIO[+A], taking only the type parameter for successful output. We are going to use type aliases in our application code where possible so you can get used to them.

Services

Now that we have introduced the ZIO type let's proceed with writing our first service, a Buffer. When writing services, there are a few essential steps and good practices that one should follow, I will describe them below. Additionally, I will include the full types as comments for better reasoning and clarity in the code fragments throughout the posts.

Buffer definition

The first step is creating a service definition. And a good practice, or even a "law", is to always use traits for this purpose and never use environment requirements for the service dependencies in the definition itself. What does that mean? It means that if our Buffer implementation requires some other service, it should not be reflected as the requirement in the service interface. Let's go ahead and create a service definition for our Buffer.

trait Buffer {
  def offer(message: Message): UIO[Unit] // ZIO[Any, Nothing, Unit] 
  def poll(): UIO[Option[Message]] // ZIO[Any, Nothing, Option[Message]]
}

Both offer and poll do not require anything and are guaranteed not to fail, so we don't utilize R(no dependencies in the definition) and E type parameters, using type aliases to simplify notation.

It's a common practice to create a companion object for the service with accessor methods. We use ZIO.service[T] variations to access instances of a specific service from the environment. So buffer clients will need to summon the service with ZIO.service[Buffer] and then invoke, for ex., offer on that instance. We can define accessor methods that will simplify this as follows.

object Buffer {
  def offer(message: Message): URIO[Buffer, Unit] =
    ZIO.serviceWithZIO[Buffer](_.offer(message))

  def poll(): URIO[Buffer, Option[Message]] =
    ZIO.serviceWithZIO[Buffer](_.poll())
}

Now, clients can invoke it with simpler syntax, using simply Buffer.offer(...), and the resulting effect type will have Buffer as the environment requirement.

Buffer implementation

The next step is implementation. Let's look at Buffer implementation and discuss the second good practice, which states that we should always pass service dependencies through the class constructor.

final case class BufferImpl(queue: Queue[Message]) extends Buffer {
  override def offer(message: Message): UIO[Unit] =
    queue.offer(message).unit *> 
       ZIO.logInfo(s"buffer message ${message.id}")

  override def poll(): UIO[Option[Message]] = queue.poll
}

We have a single dependency expressed in the class constructor for this specific buffer implementation. Not much to explain here; looks familiar, following the classic OOP style.

Our single dependency is Queue from the ZIO library. It's an asynchronous in-memory data structure with non-blocking operations of offering and polling elements. We use the unbounded version of the Queue, which will become visible when the Queue is instantiated. Queue acts as a buffer for processing messages in a controlled manner. It ensures the order of messages (FIFO), which is typically important when working with Kafka.

After putting the message on a queue, we proceed to sequence another effect execution using the convenient ZIO.logInfo for logging. To combine these two effects, we use the *> operator, which is a variant of flatMap that ignores the result of the effect it is applied to.

Buffer constructor

Having service definition and implementation in place, let's move on to the constructor, the last step for creating ZIO services. But wait, Buffer is just a case class, so the actual class constructor is there already. That's true, but since we want to leverage features of the ZIO environment, we are going to create so-called layers for our services. Here's how we can do it.

object BufferImpl {
  // ZLayer[Any, Nothing, Buffer]
  val live: ULayer[Buffer] = 
     ZLayer(Queue.unbounded[Message].map(BufferImpl(_)))
}

Usually, the layer is defined in a companion object of the implementation class, lifting it to ZLayer. Again a type alias is used to represent the layer without additional requirements (BufferImpl doesn't use any other service) that is guaranteed not to fail upon creation. Method ZLayer.apply() takes a ZIO effect as an argument making it an effectful constructor. This allows us to handle scenarios where the service requires resources like a database connection that may take time to acquire.

By using layers, we can manage such initialization processes effectively, which cannot be easily done with class constructors only. Even the Queue.unbounded constructor is an effect, as it creates a queue within ZIO runtime and returns UIO[Queue[Message]]. By mapping over it, we get an instance of BufferImpl.

Consumer

Similarly, step one is to create a service definition for our Kafka consumer component.

trait KTailConsumer {
  // ZStream[Any, Throwable, Message]
  val consume: Stream[Throwable, Message]
}

Here, we introduce ZStream, another core concept from the library.

Think of it as a pipeline that reads data from some source, transforms it, and produces some output. Streams can be combined, filtered, mapped, or reduced to perform various tasks. They are very declarative, so once you understand that your operation has a "streaming" nature, it becomes easy to model with ZStream semantics.

Consuming messages from Kafka is a very good example of such a use case since it describes a process that takes input elements, processes them, produces some results, and repeats forever.

Like ZIO and ZLayer, ZStream also has three type parameters, and their meanings are identical. In our consumer definition, we specify that it has to implement a stream that can fail with an exception and produce values of type Message. The consume value is merely a description of a streaming pipeline; creating that value doesn't initiate any processing yet.

In the implementation, we are using the zio-kafka - a library from ZIO framework that simplifies integration between ZIO and Kafka by providing ready-to-use stream definitions and common operators. It's worth mentioning that ZIO offers a large set of libraries for common application ops, like different database access, metrics, and JSON support. Here is the full list of official ecosystem extensions and utilities.

final case class KTailConsumerImpl(topics: Set[String], consumer: Consumer)
    extends KTailConsumer {
  override val consume: Stream[Throwable, Message] =
    consumer
      .plainStream[Any, Array[Byte], Array[Byte]](
        Topics(topics),
        Serde.byteArray,
        Serde.byteArray
      )
      .map(record =>
        Message(
          record.offset.topicPartition.topic(),
          record.partition,
          record.offset.offset,
          record.key,
          record.value
        )
      )
}

This time we have a specific dependency for the KTailConfig service expressed in layer type. It's a configuration service for our application. We are going to create it in the next parts of the series, where we will also see how to easily map application configuration files with ZIO.

We have a dependency on the Consumer instance from the zio-kafka library, which offers a method called plainStream. This method returns a ZStream description of an infinite stream of Kafka messages from a particular topic. The key and value are serialized using specified serializers.

To process the messages we apply a transformation using the map operation where we create domain Message objects from Kafka records. The final step is to define a layer for our consumer.

object KTailConsumerImpl {
  // ZLayer[KTailConfig, Throwable, KTailConsumer]
  val live: RLayer[KTailConfig, KTailConsumer] =
    ZLayer.scoped {
      for {
        config <- ZIO.service[KTailConfig]
        topics = config.topics.toSet
        consumer <- Consumer.make(
          ConsumerSettings(config.bootstrapServers)
            .withGroupId(config.groupId)
        )
      } yield KTailConsumerImpl(topics, consumer)
    }
}

When we want to access a specific service using the ZIO environment, we use one of the ZIO.service[T] methods. Once we do, those services become requirements expressed in type parameters (R) of our effects and layers consistently. Here we access the config service and use values from it to create the actual consumer instance.

Scope

Did you notice the use of a different constructor called ZLayer.scoped in the code snippet above? When you see a Scope type in effect requirements, know that it's accessing some resource that should be properly cleaned up afterward. In other words, Scope represents the lifecycle of a resource.

In our case, the KTailConsumerImpl layer creates a Consumer instance with Consumer.make, which assigns a Scope to it that handles shutdown. Delving into the zio-kafka code reveals the place where we have the following code (simplified).

ZIO.acquireRelease { 
    new KafkaConsumer[Array[Byte], Array[Byte]](...) 
} { consumer => consumer.close() }

The ZIO.acquireRelease method allows us to create scoped resources and requires us to provide a release callback where we should perform the cleanup. The Scope parameter is then carried over as a requirement.

In the layer, we created for the k-tail consumer, the for comprehension expression has a type of ZIO[Scope & KTailConfig, Throwable, KTailConsumerImpl]. However, you might notice that Scope is not listed in the layer requirements. That is because of the usage of ZLayer.scoped.

When using it, the Scope requirement is removed, and the acquired resource lifetime becomes bound to the lifetime of the layer itself. As a result, the resource will be released as soon as any workflow that depends on this layer completes its execution. In our example, this will occur when the application shuts down.

Other ZIO types also come with scoped constructors. Take ZStream.scoped for example, and imagine that we are creating a pipeline that reads entries from the database and performs some computations. We would need a scoped database connection pool. But once we create a stream with it, the lifecycle of the pool becomes bound to the stream itself. That arrangement makes sense because the connection pool should be active as long as we are processing records from the database.

This mechanism is clear and simple from the user's perspective. All the heavy lifting is done by the ZIO runtime, which guarantees that the resource will be gracefully closed whether our program succeeds, fails, or is interrupted.

Program

Let's assemble the first ZIO program where we want to consume and buffer messages.

object Main extends ZIOAppDefault {

  private val program: RIO[Buffer & KTailConsumer, Unit] = for {
    consume <- KTailConsumer.consume
    _       <- consume.mapZIO(Buffer.offer).run(ZSink.drain)
  } yield ()

  override def run = program
}

In our application, the entry point extends ZIOAppDefault, which provides a default runtime configuration. We need to override the run method and provide our program logic. In the first line of the program, we define a consumer stream. Then, using mapZIO, we put each message from the stream into a buffer. To transform the stream description into a runnable effect, we need to use one of the run variations on the ZStream type.

We also need to specify a sink, which is the final destination for the stream outputs. In this particular case, since we don't need to operate on the results, we can use ZSink.drain. This will ignore the outputs and drain the stream until it completes. However, it won't compile just yet. See that we have a Buffer & KTailConsumer requirement in our program effect type. If we compile the code now, we will see such kind of error:

Your effect requires services that are not in the environment.
Please provide layers for the following 2 types:

    1. ktail.Buffer
    2. ktail.KTailConsumer

If all compile-time errors were so meaningful, our lives would be much easier, don’t you think?

Let's do what the compiler says and provide the dependencies to our program.

override def run = program
  .provide(
    KTailConfig.live,
    BufferImpl.live,
    KTailConsumerImpl.live
  )

Intuitively we are using the provide method of the ZIO type to specify all the required services using layers. Our application is ready to start.

Summary

In this first post of the series, we have covered some fundamental concepts of the ZIO library. We have learned about the meaning of type parameters in Z types, how to write services, and how to compose our programs and provide the dependencies.

The code for this series is available on GitHub and will be updated as subsequent parts are published. You will also find instructions on how to run the code there. I encourage you to clone and run the code and follow along with upcoming posts.

Our blog also offers more valuable resources on ZIO. If you're interested in learning more about layers, I recommend checking out Adam's in-depth explanation in his post here.

If you have any questions, feel free to ask. Thank you!

Check more from the series:
Getting started with ZIO, part 2
Getting started with ZIO, part 3

Review by: Bartłomiej Żyliński

Blog Comments powered by Disqus.