Contents

Getting started with ZIO, part 3

Jakub Cichy

07 Nov 2023.9 minutes read

Getting started with ZIO, part 3 webp image

Welcome to the last part of the "Getting started with ZIO" series, where we will focus on writing tests for our streaming application. If you haven't seen part 1 and part 2, I recommend you start there.

Recap

Let's look at the system overview again to remind ourselves what we need to test.

k-tail-overview

Our server, on one end, consumes Kafka messages from a list of preconfigured topics. While on the other end, it allows clients to subscribe to a topic and receive its messages in real-time.

Since our system provides a single functionality to the clients, we will not test components individually and rather use a black box approach, meaning we will just start the server and test its API.

Full code from the series is available on Github.

Tests

We are going to use the zio-test framework. I hope you will be convinced (after reading this post) that zio-test is a top-notch pick when it comes to testing ZIO effects. One of the reasons for this is that tests are just ordinary ZIO values.

It means that it leverages environment requirements, resource management, and features like retries or timeout handling that come with the ZIO type. Testing effectful code to many Scala developers associates with something like unsafeRunSync() all over the place. We will see that with zio-test, it's not the case.

Resource management

Our app has one external dependency, Kafka, and we want to use a real one in our tests. Let's see how we can combine testcontainers with ZLayer to create a Kafka resource for our tests.

import com.dimafeng.testcontainers.KafkaContainer
import zio.*

object TestKafkaContainer {

  val live: ULayer[KafkaContainer] =
    ZLayer.scoped {
      ZIO.acquireRelease(
        ZIO.attempt(KafkaContainer.Def().start()).orDie
      )(container =>
        ZIO.attempt(container.stop()).ignoreLogged
      )
    }
}

To create managed resources, we use Scope together with ZIO.acquireRelease (explained in part 1), which can be later bound to a single test or whole spec with several tests in it. The runtime will spin up and tear down the Kafka container for us at the right moment.

The second resource that we need is the k-tail app itself. Since our main program is just a ZIO effect, we can run it along with our tests. Again, we are going to use a scoped layer.

object KTail {

  val live: RLayer[KTailConfig & SttpClient, Unit] =
    ZLayer.scoped {
      ZIO
        .acquireRelease(
          for {
            config <- ZIO.service[KTailConfig]
            sttp   <- ZIO.service[SttpClient]
            program <-
              Main.program(config = ZLayer.succeed(config)).forkScoped.interruptible
            _ <-
              sttp
                .send(
                  basicRequest
                    .get(uri"http://localhost:${config.port}/k-tail/health")
                    .response(asString)
                )
                .catchAll(_ => ZIO.succeed(ServiceUnavailable))
                .repeatUntil(_.code.isSuccess)
            _ <- ZIO.logInfo(s"k-tail started on port ${config.port}")
          } yield program
        )(program => program.interrupt *> ZIO.logInfo("k-tail stopped"))
        .unit
    }

  private val ServiceUnavailable: Response[Either[String, String]] =
    Response(Left("k-tail unavailable"), StatusCode.ServiceUnavailable)
}

Let's explain this code. We are creating a scoped layer with a managed resource using ZIO.acquireRelease. The resource here is the main program that we want to run in the background. First, we extract the config and sttp client from the environment (those will be provided in the test class code).

Then, we run our program by using the effect from our Main object and configuration from our test environment. This part is very important since if we do not fork the program, our tests will not be executed since this layer will keep on running the server without proceeding till the end of its own creation. We use forkScoped, which runs the fiber bound to the Scope, which in our case will be a spec. We also make the fiber interruptible, and we need to do it explicitly since the acquire effect of ZIO.acquireRelease is by default not interruptible, and so are the fibers forked inside it. Without this small adjustment, our tests will run and then "hang" forever since our server will still be running.

Next, we wait until the server is up and healthy, handling initial connection errors with a fallback to a failed response. We yield a reference to the program fiber that runs the server. In the release handler, we simply interrupt it.

Dynamic configuration

Our service configuration defines the http port on which the server should run as well as Kafka bootstrap, and both of these values are dynamic in tests. We start the server on a random port, and the test container for Kafka also uses a random one. Therefore we cannot use the config layer from our main code, which reads values from application.conf.

Here's how we create our test configuration.

object TestKTailConfig {

  val Topic1: String = "test-topic-1"
  val Topic2: String = "test-topic-2"
  val Topic3: String = "test-topic-3"

  val live: URLayer[KafkaContainer, KTailConfig] =
    ZLayer {
      for {
        bootstrapServers <-
          ZIO.serviceWith[KafkaContainer](_.bootstrapServers.split(',').toList)
        random <- ZIO.random
        port <-
          random.nextIntBetween(9000, 9999)
        config = KTailConfig(
          port = port,
          bootstrapServers = bootstrapServers,
          groupId = "k-tail-server-test",
          topics = List(Topic1, Topic2, Topic3)
        )
      } yield config
    }
}

This layer depends on the KafkaContainer, which has to be created first so we can access thebootstrapServers property. Then, we choose a random port for our server and define three test topics that will be used in tests.

Spec

Let's define the specifications for our tests. To do that, we extend ZIOSpecDefault and override the spec method. Here's the sketch of our test cases.

object KTailSpec extends ZIOSpecDefault {

  override def spec =
    suite("k-tail socket")(
      test("pong on ping") {
        ...
      },
      test("broadcast kafka messages, single client") {
        ...
      },
      test("broadcast kafka messages, multiple clients") {
        ...
      }
    ).provideSomeShared[Scope](
      KTail.live,
      TestKTailConfig.live,
      TestKafkaContainer.live,
      TestProducer.live,
      HttpClientZioBackend.layer()
    ) @@ TestAspect.timeout(1.minute)
}

We have one suite with three test cases. Also, we provide shared resources to our suite using provideSomeShared[Scope], which binds them to the "lifecycle" of our suite. Finally, we use a so-called TestAspect to set the timeout.

Think of test aspects as operators or extensions that can be attached to tests. For example, we can use them to repeat selected tests or ignore them based on some environment value. We can define timeouts or specify that certain tests should be only run on JVM (in the case of ScalaJS cross-compiled projects) or even only on selected OS.

They can also be used as before and after hooks. Since we do not have a use case to use them extensively, you can read more in the docs.

Utils
We will need several utility methods to use throughout the tests. The first one, subscribe, will connect to the k-tail socket and subscribe to a specific topic, sending a ping frame to make sure it is connected using the second utility method, ping, whose code is skipped in the listings.

private def subscribe(topic: String): RIO[KTailConfig & SttpClient, WebSocket[Task]] =
  for {
    config <- ZIO.service[KTailConfig]
    sttp   <- ZIO.service[SttpClient]
    ws <-
      sttp
        .send(
          basicRequest
            .get(uri"ws://localhost:${config.port}/k-tail/$topic")
            .response(asWebSocketAlwaysUnsafe[Task])
        )
        .map(_.body)
    _ <- ping(ws) *> ws.receive()
  } yield ws

We also need a method to produce messages to test topics. By using zio-kafka we can do it in just a few lines of code.

private def produce(topic: String, numberOfMessages: Int): RIO[Producer, Unit] =
  for {
    producer <- ZIO.service[Producer]
    _ <-
      producer
        .produce(topic, key = "1", value = "msg", Serde.string, Serde.string)
        .repeatN(numberOfMessages - 1)
  } yield ()

Last but not least, we need a method to receive frames in sockets established by subscribe.

private def receive(
    socket: WebSocket[Task],
    numberOfMessages: Int
): Task[List[Message]] =
  for {
    received <- Queue.unbounded[WebSocketFrame]
    _ <-
      socket
        .receive()
        .flatMap(received.offer)
        .repeatN(numberOfMessages - 1)
    _        <- socket.close()
    frames   <- received.takeAll
    messages <- decode(frames)
  } yield messages

Here, we read frames sent from the server, and because ordering is relevant for us and we want to verify it in tests, we place them on a queue. Then we close the socket and decode all messages. The code for the decode method is in the repo, so I will skip it here.

Tests
In the first test, we want to check if the server correctly handles ping messages and replays with pong containing the same bytes as the ping frame.

test("pong on ping") {
  for {
    socket <- subscribe(Topic1)
    ping   <- ping(socket)
    pong   <- socket.receive()
    _      <- socket.close()
  } yield assertTrue {
    pong match
      case WebSocketFrame.Pong(payload) => payload sameElements ping.payload
      case _                            => false
  }
}

Next, we test the broadcasting part. We subscribe to a topic and send some messages to Kafka. Then, we verify what was received by the client and what was the order of messages - by looking at their offsets.

import zio.test.Assertion.{ assertion, equalTo, forall }

test("broadcast kafka messages, single client") {
  for {
    socket   <- subscribe(Topic1)
    _        <- produce(Topic1, numberOfMessages = 10)
    messages <- receive(socket, numberOfMessages = 10)
  } yield assert(messages.map(_.topic))(forall(equalTo(Topic1))) &&
    assertTrue(messages.map(_.offset) == (0L to 9))
}

We are using Assertion from zio-test to perform some checks following the syntax of assert(a: A)(assertion: Assertion[A]). ZIO Assertion has some built-in assertions for simple types as well as iterable like forall, which takes another common assertion for equality check equalTo. It takes a few moments to get used to them, but it is worth it. They give great error messages if some assertion was not met, logging all the values in a very readable and user-friendly format.

One client is easy to handle, but having all the utility methods in place, we can easily test how our server behaves with, let's say, 200 clients.

test("broadcast kafka messages, multiple clients") {
  for {
    socketsTopic2  <- ZIO.foreachPar(1 to 100)(_ => subscribe(Topic2))
    socketsTopic3  <- ZIO.foreachPar(1 to 100)(_ => subscribe(Topic3))
    _              <- produce(Topic2, numberOfMessages = 200)
    _              <- produce(Topic3, numberOfMessages = 200)
    messagesTopic2 <- 
      ZIO.foreachPar(socketsTopic2)(receive(_, numberOfMessages = 200))
    messagesTopic3 <- 
      ZIO.foreachPar(socketsTopic3)(receive(_, numberOfMessages = 200))
  } yield matchTopic(messagesTopic2, Topic2) &&
    matchOffsets(messagesTopic2, 0L to 199) &&
    matchTopic(messagesTopic3, Topic3) &&
    matchOffsets(messagesTopic3, 0L to 199)
}

Using ZIO.foreachPar, we can subscribe as well as receive messages over sockets in parallel. A matchTopic and matchOffsets are also utility methods added to the class for readability.

Summary

To sum it up, having the same effect and mechanisms like layers both in application code and in tests really speeds things up in terms of development. What is more, it gives us great flexibility while writing our test code.

Remember to:

  • use ZLayer's to run resourceful things in the background during your tests
  • take advantage of ZIO Assertion for composition and great error feedback
  • try to keep your tests short

For me personally, it reads best when tests look just like some instructions, and one can understand what the API is doing just by glancing at them. Finally, when writing a spec that should run only under given circumstances or that can be unstable, maybe see what TestAspect has to offer, I am sure you will find a solution there.

That's the last post of my "Getting started with ZIO" series. I've shown You how to use various building blocks to build a fully functional and non-trivial application. I hope that You have learned something from it, but what is even more important, I hope that my series gave you curiosity for ZIO ecosystem and development using its tools.

What's next?

SoftwareMill has developed and maintains a realword project of a CRUD application built using ZIO. It contains things like authentication and database access, so it’s definitely something to learn or get inspired from. If you are looking for another great production-grade starter for your next ZIO microservice, then you should definitely check this repository.

For further learning resources, I highly recommend reading through ZIO documentation and, as always, trying it yourself.

Review by: Bartłomiej Żyliński

Blog Comments powered by Disqus.