Getting started with ZIO, part 3
Welcome to the last part of the "Getting started with ZIO" series, where we will focus on writing tests for our streaming application. If you haven't seen part 1 and part 2, I recommend you start there.
Recap
Let's look at the system overview again to remind ourselves what we need to test.
Our server, on one end, consumes Kafka messages from a list of preconfigured topics. While on the other end, it allows clients to subscribe to a topic and receive its messages in real-time.
Since our system provides a single functionality to the clients, we will not test components individually and rather use a black box approach, meaning we will just start the server and test its API.
Full code from the series is available on Github.
Tests
We are going to use the zio-test
framework. I hope you will be convinced (after reading this post) that zio-test
is a top-notch pick when it comes to testing ZIO effects. One of the reasons for this is that tests are just ordinary ZIO values.
It means that it leverages environment requirements, resource management, and features like retries or timeout handling that come with the ZIO
type. Testing effectful code to many Scala developers associates with something like unsafeRunSync()
all over the place. We will see that with zio-test, it's not the case.
Resource management
Our app has one external dependency, Kafka, and we want to use a real one in our tests. Let's see how we can combine testcontainers with ZLayer
to create a Kafka resource for our tests.
import com.dimafeng.testcontainers.KafkaContainer
import zio.*
object TestKafkaContainer {
val live: ULayer[KafkaContainer] =
ZLayer.scoped {
ZIO.acquireRelease(
ZIO.attempt(KafkaContainer.Def().start()).orDie
)(container =>
ZIO.attempt(container.stop()).ignoreLogged
)
}
}
To create managed resources, we use Scope
together with ZIO.acquireRelease
(explained in part 1), which can be later bound to a single test or whole spec with several tests in it. The runtime will spin up and tear down the Kafka container for us at the right moment.
The second resource that we need is the k-tail app itself. Since our main program is just a ZIO
effect, we can run it along with our tests. Again, we are going to use a scoped layer.
object KTail {
val live: RLayer[KTailConfig & SttpClient, Unit] =
ZLayer.scoped {
ZIO
.acquireRelease(
for {
config <- ZIO.service[KTailConfig]
sttp <- ZIO.service[SttpClient]
program <-
Main.program(config = ZLayer.succeed(config)).forkScoped.interruptible
_ <-
sttp
.send(
basicRequest
.get(uri"http://localhost:${config.port}/k-tail/health")
.response(asString)
)
.catchAll(_ => ZIO.succeed(ServiceUnavailable))
.repeatUntil(_.code.isSuccess)
_ <- ZIO.logInfo(s"k-tail started on port ${config.port}")
} yield program
)(program => program.interrupt *> ZIO.logInfo("k-tail stopped"))
.unit
}
private val ServiceUnavailable: Response[Either[String, String]] =
Response(Left("k-tail unavailable"), StatusCode.ServiceUnavailable)
}
Let's explain this code. We are creating a scoped layer with a managed resource using ZIO.acquireRelease
. The resource here is the main program that we want to run in the background. First, we extract the config and sttp client from the environment (those will be provided in the test class code).
Then, we run our program by using the effect from our Main
object and configuration from our test environment. This part is very important since if we do not fork the program, our tests will not be executed since this layer will keep on running the server without proceeding till the end of its own creation. We use forkScoped
, which runs the fiber bound to the Scope
, which in our case will be a spec. We also make the fiber interruptible
, and we need to do it explicitly since the acquire effect of ZIO.acquireRelease
is by default not interruptible, and so are the fibers forked inside it. Without this small adjustment, our tests will run and then "hang" forever since our server will still be running.
Next, we wait until the server is up and healthy, handling initial connection errors with a fallback to a failed response. We yield a reference to the program
fiber that runs the server. In the release handler, we simply interrupt it.
Dynamic configuration
Our service configuration defines the http port on which the server should run as well as Kafka bootstrap, and both of these values are dynamic in tests. We start the server on a random port, and the test container for Kafka also uses a random one. Therefore we cannot use the config layer from our main code, which reads values from application.conf.
Here's how we create our test configuration.
object TestKTailConfig {
val Topic1: String = "test-topic-1"
val Topic2: String = "test-topic-2"
val Topic3: String = "test-topic-3"
val live: URLayer[KafkaContainer, KTailConfig] =
ZLayer {
for {
bootstrapServers <-
ZIO.serviceWith[KafkaContainer](_.bootstrapServers.split(',').toList)
random <- ZIO.random
port <-
random.nextIntBetween(9000, 9999)
config = KTailConfig(
port = port,
bootstrapServers = bootstrapServers,
groupId = "k-tail-server-test",
topics = List(Topic1, Topic2, Topic3)
)
} yield config
}
}
This layer depends on the KafkaContainer
, which has to be created first so we can access thebootstrapServers
property. Then, we choose a random port for our server and define three test topics that will be used in tests.
Spec
Let's define the specifications for our tests. To do that, we extend ZIOSpecDefault
and override the spec
method. Here's the sketch of our test cases.
object KTailSpec extends ZIOSpecDefault {
override def spec =
suite("k-tail socket")(
test("pong on ping") {
...
},
test("broadcast kafka messages, single client") {
...
},
test("broadcast kafka messages, multiple clients") {
...
}
).provideSomeShared[Scope](
KTail.live,
TestKTailConfig.live,
TestKafkaContainer.live,
TestProducer.live,
HttpClientZioBackend.layer()
) @@ TestAspect.timeout(1.minute)
}
We have one suite with three test cases. Also, we provide shared resources to our suite using provideSomeShared[Scope]
, which binds them to the "lifecycle" of our suite. Finally, we use a so-called TestAspect
to set the timeout.
Think of test aspects as operators or extensions that can be attached to tests. For example, we can use them to repeat selected tests or ignore them based on some environment value. We can define timeouts or specify that certain tests should be only run on JVM (in the case of ScalaJS cross-compiled projects) or even only on selected OS.
They can also be used as before and after hooks. Since we do not have a use case to use them extensively, you can read more in the docs.
Utils
We will need several utility methods to use throughout the tests. The first one, subscribe
, will connect to the k-tail socket and subscribe to a specific topic, sending a ping frame to make sure it is connected using the second utility method, ping
, whose code is skipped in the listings.
private def subscribe(topic: String): RIO[KTailConfig & SttpClient, WebSocket[Task]] =
for {
config <- ZIO.service[KTailConfig]
sttp <- ZIO.service[SttpClient]
ws <-
sttp
.send(
basicRequest
.get(uri"ws://localhost:${config.port}/k-tail/$topic")
.response(asWebSocketAlwaysUnsafe[Task])
)
.map(_.body)
_ <- ping(ws) *> ws.receive()
} yield ws
We also need a method to produce messages to test topics. By using zio-kafka
we can do it in just a few lines of code.
private def produce(topic: String, numberOfMessages: Int): RIO[Producer, Unit] =
for {
producer <- ZIO.service[Producer]
_ <-
producer
.produce(topic, key = "1", value = "msg", Serde.string, Serde.string)
.repeatN(numberOfMessages - 1)
} yield ()
Last but not least, we need a method to receive frames in sockets established by subscribe
.
private def receive(
socket: WebSocket[Task],
numberOfMessages: Int
): Task[List[Message]] =
for {
received <- Queue.unbounded[WebSocketFrame]
_ <-
socket
.receive()
.flatMap(received.offer)
.repeatN(numberOfMessages - 1)
_ <- socket.close()
frames <- received.takeAll
messages <- decode(frames)
} yield messages
Here, we read frames sent from the server, and because ordering is relevant for us and we want to verify it in tests, we place them on a queue. Then we close the socket and decode all messages. The code for the decode
method is in the repo, so I will skip it here.
Tests
In the first test, we want to check if the server correctly handles ping messages and replays with pong containing the same bytes as the ping frame.
test("pong on ping") {
for {
socket <- subscribe(Topic1)
ping <- ping(socket)
pong <- socket.receive()
_ <- socket.close()
} yield assertTrue {
pong match
case WebSocketFrame.Pong(payload) => payload sameElements ping.payload
case _ => false
}
}
Next, we test the broadcasting part. We subscribe to a topic and send some messages to Kafka. Then, we verify what was received by the client and what was the order of messages - by looking at their offsets.
import zio.test.Assertion.{ assertion, equalTo, forall }
test("broadcast kafka messages, single client") {
for {
socket <- subscribe(Topic1)
_ <- produce(Topic1, numberOfMessages = 10)
messages <- receive(socket, numberOfMessages = 10)
} yield assert(messages.map(_.topic))(forall(equalTo(Topic1))) &&
assertTrue(messages.map(_.offset) == (0L to 9))
}
We are using Assertion
from zio-test
to perform some checks following the syntax of assert(a: A)(assertion: Assertion[A])
. ZIO Assertion
has some built-in assertions for simple types as well as iterable like forall
, which takes another common assertion for equality check equalTo
. It takes a few moments to get used to them, but it is worth it. They give great error messages if some assertion was not met, logging all the values in a very readable and user-friendly format.
One client is easy to handle, but having all the utility methods in place, we can easily test how our server behaves with, let's say, 200 clients.
test("broadcast kafka messages, multiple clients") {
for {
socketsTopic2 <- ZIO.foreachPar(1 to 100)(_ => subscribe(Topic2))
socketsTopic3 <- ZIO.foreachPar(1 to 100)(_ => subscribe(Topic3))
_ <- produce(Topic2, numberOfMessages = 200)
_ <- produce(Topic3, numberOfMessages = 200)
messagesTopic2 <-
ZIO.foreachPar(socketsTopic2)(receive(_, numberOfMessages = 200))
messagesTopic3 <-
ZIO.foreachPar(socketsTopic3)(receive(_, numberOfMessages = 200))
} yield matchTopic(messagesTopic2, Topic2) &&
matchOffsets(messagesTopic2, 0L to 199) &&
matchTopic(messagesTopic3, Topic3) &&
matchOffsets(messagesTopic3, 0L to 199)
}
Using ZIO.foreachPar
, we can subscribe as well as receive messages over sockets in parallel. A matchTopic
and matchOffsets
are also utility methods added to the class for readability.
Summary
To sum it up, having the same effect and mechanisms like layers both in application code and in tests really speeds things up in terms of development. What is more, it gives us great flexibility while writing our test code.
Remember to:
- use
ZLayer
's to run resourceful things in the background during your tests - take advantage of ZIO
Assertion
for composition and great error feedback - try to keep your tests short
For me personally, it reads best when tests look just like some instructions, and one can understand what the API is doing just by glancing at them. Finally, when writing a spec that should run only under given circumstances or that can be unstable, maybe see what TestAspect
has to offer, I am sure you will find a solution there.
That's the last post of my "Getting started with ZIO" series. I've shown You how to use various building blocks to build a fully functional and non-trivial application. I hope that You have learned something from it, but what is even more important, I hope that my series gave you curiosity for ZIO ecosystem and development using its tools.
What's next?
SoftwareMill has developed and maintains a realword project of a CRUD application built using ZIO. It contains things like authentication and database access, so it’s definitely something to learn or get inspired from. If you are looking for another great production-grade starter for your next ZIO microservice, then you should definitely check this repository.
For further learning resources, I highly recommend reading through ZIO documentation and, as always, trying it yourself.