Integrating Java APIs with Cats Effect
Cats Effect is an excellent and performant framework for doing asynchronous computations in a functional manner. It provides an IO
monadic datatype for controlling the actions of your application. Effects wrapped in instances of the IO
monad are lazily evaluated. They can be conveniently composed together with the flatMap
function creating bigger programs and then evaluated in a resource-safe runtime.
Cats Effect is not only a very popular library utilizing the power of functional programming. It is also the whole ecosystem of related libraries allowing for seamless integration of many routine tasks, like communication with databases, creating HTTP servers or clients, parsing JSON and so on. Some libraries are community-managed and some are developed under the guidance of Typelevel, an organisation behind Cats Effect’s maintenance.
Moreover, since Scala is a language that compiles to bytecode (there’s also Scala.js and Native, but I will focus on JVM in this article), our programs can take advantage of any Java library. The challenge is that Java’s libraries are not “cats-aware”, very often they are synchronous and blocking and sometimes expose clunky interfaces.
In this article, I will integrate the basic functionalities of Google’s PubSub client library for Java to work with Cats Effect 3. There are already many projects implementing interop with various GPC resources and we should in most cases prefer w maintained libraries that our own solution. The point of this article is not to create a full-fledged, cats-based PubSub client, but rather to explore tools provided by Cats Effect.
All code examples are written in Scala 3.
Blocking the future
First I will integrate the publishing of messages. Impure implementation of the publish
function using PubSub Publisher
might look like this:
val publisher = Publisher.newBuilder(“my topic name”).build()
def publish(message: String): String = {
val message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(message))
.build()
val future = publisher.publish(message)
future.get()
}
As a first step, we can simply wrap all effectful function calls into the IO
datatype with IO.apply
and compose it with for-comprehension:
def publish(message: String): IO[String] = for {
message <- IO(
PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(message))
.build()
)
future = publisher.publish(message)
messageId <- IO.blocking(future.get())
} yield messageId
Method publish
returns ApiFuture
, which is just an override of plain old Future
from java.util.concurrent
. If API returned CompetableFuture
we could simply call IO.fromCompletableFuture
to create IO
, but there’s no equivalent for older Future
. Therefore to obtain the outcome of the publishing we need to invoke get
, which blocks the current thread until a response arrives. This is an unsafe operation on the JVM because it blocks a thread, that could otherwise be reused for other computations. This is especially dangerous if we’re blocking on computation-oriented thread pool with a low and fixed number of threads.
Looking ahead, this operation might not be as wasteful when virtual threads finally arrive on JVM, but for now, Project Loom is still not production ready.
The usual approach for dealing with blocking actions is just running them on a separate thread pool, that’s usually unbounded and caching (reusing threads). To give a hint for Cats Effect runtime to perform an action on the blocking pool, we use a special combinator IO.blocking
.
When the fiber (green thread implemented in Cats Effect runtime) is cancelled, it will try to interrupt all actions running in the blocking call. Interrupts are ignored by Java’s Future (they can be interrupted with the cancel
method). Even though nobody will be waiting for the result of the operation it will still be pending. If we want to mark the operation as not interruptible we can use another combinator IO.uninterruptible
. There’s also interruptible
which is basically an alias for blocking
and interruptibleMany
.
This variant retries the attempt to interrupt the underlying thread several times before it gives up.
Integrating asynchronous callbacks
Method publish
from google’s library is returning an instance of the extended Future
interface called ApiFuture
, which allows us to attach a callback that will be evaluated when the pending action is over. With the helper method com.google.api.core.ApiFutures.addCallback
we can set up two callback functions onSuccess
and onFailure
. Cats Effect has a special tool for integrating callback-based APIs: methods IO.async
and IO.async_
from the Async typeclass.
First, I’ll discuss IO.async_
(with an underscore at the end). It takes one argument: a method of signature: (Either[Throwable, A] => Unit) => Unit
. It looks scary, but only at first glance. Either[Throwable, A] => Unit
is just a function that accepts Either
and returns Unit
. We need to invoke it in the callback. If the action succeeded we pass Right
with some value and if it failed we need to pass Left
with Throwable
.
val result: IO[String] = IO.async_[String] { (cb: Either[Throwable, String] => Unit) =>
ApiFutures.addCallback(
future,
new ApiFutureCallback[String]() {
override def onFailure(error: Throwable): Unit = cb(error.asLeft)
override def onSuccess(messageId: String): Unit =
cb(messageId.asRight)
}
)
}
In the example above when the action fails and the onFailure
callback is called, we call cb
passing Left
containing exception. This will create a failed IO
with that error.
When onSuccess
is called we invoke cb
passing Right
containing the id of the message, which was just sent to PubSub. Only the first call of cb
has any effect. Any further calls of cb
are silently ignored. It is ok, in our case since we create attach a new callback for every created Future
.
There’s also IO.async
(without the underscore). This time it takes a function with an even more convoluted signature (Either[Throwable, A] => Unit) => IO[Option[IO[Unit]]]
. The difference is that the latter variant allows us to optionally pass IO
that will be executed when a fiber invoking IO.async
is cancelled (thus Option[IO[Unit]]
).
We can utilize that finalizer to pass an action invoking cancel
on Future
. Method cancel
takes a boolean parameter determining whether Future
will be only cancelled if it wasn’t yet scheduled or it should actively try to interrupt the thread running the Future
. I will use the cancel(false)
variant since Java’s interrupt logic is not very reliable.
The other difference is that IO.async
expects that we’ll suspend (wrap in IO
) action for creating a callback.
IO.async[String] { (callback: Either[Throwable, String] => Unit) =>
IO {
ApiFutures.addCallback(
future,
new ApiFutureCallback[String]() {
override def onFailure(error: Throwable): Unit =
callback(error.asLeft)
override def onSuccess(messageId: String): Unit =
callback(messageId.asRight)
}
)
Some(IO(future.cancel(false))) //finalizer
}
}
I will use the last approach based on IO.async
for my final implementation. Since Publisher
from PubSub API allocates some resources, I will wrap its creation and shutdown into Resource
from Cats Effect. I will also create another resource for the executor that will be passed as the second argument to addCallback
.
import com.google.cloud.pubsub.v1.{Publisher => GooglePublisher}
import com.google.pubsub.v1.PubsubMessage
import cats.effect.IO
import cats.effect.Resource
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import com.google.api.core.ApiFutures
import com.google.api.core.ApiFutureCallback
import com.google.protobuf.ByteString
import cats.syntax.all.*
final class Publisher private (
internal: GooglePublisher,
executor: ExecutorService
):
def publish(message: String): IO[String] = for {
pubSubMessage <- IO(
PubsubMessage
.newBuilder()
.setData(ByteString.copyFromUtf8(message))
.build()
)
future = internal.publish(pubSubMessage)
messageId <- IO.async[String] {
(callback: Either[Throwable, String] => Unit) =>
IO {
ApiFutures.addCallback(
future,
new ApiFutureCallback[String]() {
override def onFailure(error: Throwable): Unit =
callback(error.asLeft)
override def onSuccess(messageId: String): Unit =
callback(messageId.asRight)
},
executor
)
Some(IO(future.cancel(false)))
}
}
} yield messageId
object Publisher:
def create(topicName: String): Resource[IO, Publisher] = for {
executor <- Resource.make(
IO(Executors.newCachedThreadPool())
)(ec => IO(ec.shutdown()) *> IO(ec.awaitTermination(10, TimeUnit.SECONDS)))
googlePublisher <- Resource.make(
IO(GooglePublisher.newBuilder(topicName).build())
)(p =>
IO(p.shutdown()) *> IO.interruptible(
p.awaitTermination(10, TimeUnit.SECONDS)
)
)
} yield Publisher(googlePublisher, executor)
Executing unsafe actions with Dispatcher
In the previous paragraph we implemented an integration for publishing API, now let’s try to do the same with consumer API.
Creating an impure subscriber is dead easy:
Subscriber
.newBuilder(
“my-subscription-name”,
(message, consumer) => {
// do something with the message
consumer.ack()
}
).build()
The builder expects two arguments: the name of the subscription and an instance of MessageReceiver
, which is just a functional interface of type PubsubMessage message, AckReplyConsumer consumer) => void
working as the callback. If it’s again a callback-based API we could simply use IO.async
like last time, right?
The problem is that callback passed to the subscriber will be invoked every time a new message arrives and a limitation of async
is that only the first call has any effect.
Fortunately, we can use another approach. Cats Effect has a Queue
datatype that we can use as a buffer of incoming messages. We can insert messages into the queue with offer(a: A): F[Unit]
. Later we could check if the queue stores any messages using tryTake: F[Option[A]]
.
I will go on with that approach. First, I need to create a case class
to keep the payload of the message and an IO
to wrap the effect of confirming the receipt of the message.
final case class PubSubMessage(id: String, payload: String, ack: IO[Unit])
My first naive implementation of the callback looks like this:
def createReceiver(queue: Queue[IO, PubsubMessage]): MessageReceiver = (message, consumer) => {
val processMessage: IO[Unit] = for {
payload <- IO(message.getData().toStringUtf8())
_ <- queue.offer(
PubSubMessage(message.getMessageId(), payload, IO(consumer.ack()))
)
} yield ()
??? //how should I call processMessage?
}
The issue is that processMessage
returns IO[Unit]
, which means it is just a lazy description of the computation. I’d need to explicitly run it to put the message into the queue. One way to achieve this would be running unsafeRunSync
on the IO
, but then I would break the rule stating that calling this method by user-land code should be avoided.
Instead, we can use another tool tailor-made for this situation. Dispatcher
can help us evaluate effects wrapped in IO
from the impure callback. Dispatcher
offers many methods for evaluating IO
, but for our use case I will use unsafeRunSync(io: IO[A])
.
def createReceiver(dispatcher: Dispatcher[IO], queue: Queue[IO, PubsubMessage]): MessageReceiver = (message, consumer) => {
val processMessage: IO[Unit] = for {
payload <- IO(message.getData().toStringUtf8())
_ <- queue.offer(
PubSubMessage(message.getMessageId(), payload, IO(consumer.ack()))
)
} yield ()
dispatcher.unsafeRunSync(processMessage)
}
Dispatcher
is based on Supervisor
which is a resource responsible for managing the fiber lifecycle. When Dispatcher
finalizes all underlying fibers are cancelled.
Another resource we need to take care of is Subscriber
from PubSub API. We’ll wrap it in Resource
to ensure it’s properly terminated during the application’s shutdown.
I also need a way to expose messages buffered in the queue. I will create two kinds of methods: the first one will return IO[Option[PubSubMessage]
returning Some
only if there’s a message available and Stream[IO, PubSubMessage]
will be returning fs2 Stream
.
Now I can finalize the implementation of my consumer:
final class Consumer private (queue: Queue[IO, PubSubMessage]):
def poll: IO[Option[PubSubMessage]] = queue.tryTake
val stream: Stream[IO, PubSubMessage] = Stream.fromQueueUnterminated(queue)
object Consumer:
private def receiver(
dispatcher: Dispatcher[IO],
queue: Queue[IO, PubSubMessage]
): MessageReceiver = (message, consumer) => {
val p = for {
payload <- IO(message.getData().toStringUtf8())
_ <- queue.offer(
PubSubMessage(message.getMessageId(), payload, IO(consumer.ack()))
)
} yield ()
dispatcher.unsafeRunSync(p)
}
def create(subscriptionName: String) = for {
queue <- Resource.eval(Queue.unbounded[IO, PubSubMessage])
dispatcher <- Dispatcher[IO]
subscriber <- Resource.eval(
IO(
Subscriber
.newBuilder(subscriptionName, receiver(dispatcher, queue))
.build()
)
)
_ <- Resource.make(
IO(subscriber.startAsync())
)(_ => IO(subscriber.stopAsync()))
} yield Consumer(queue)
Wrapping up
Hopefully, this article will be helpful if you had any doubts about how to integrate impure APIs with the Cats Effect. You can check the complete code of my cats-friendly PubSub API with usage examples on GitHub. Take care!
Might interest you:
Cats Effect vs ZIO