Contents

PubSub with Scala

Krzysztof Grajek

24 Jan 2023.8 minutes read

PubSub with Scala webp image

Image: @Benjamin_scott_florin, flickr.com

What is Pub/Sub

In the microservices world (and not only), more often than not, you need to introduce asynchronous communication between services using some kind of messaging solution. At that point, you can utilize many different available options, starting with RabbitMQ in a single node setup up to a full-blown Kafka multinode cluster. Google Pub/Sub is a Google offering to asynchronous communication available directly on Google’s GCP, not as advanced as Kafka but at the same time easy to use and maintenance-free.

With Pub/Sub you create a system of producers and consumers for your events. Producers are sending events to Pub/Sub service not knowing when and how those messages will be processed and Pub/Sub service tries to deliver all those messages to predefined subscribers.

With Google Pub/Sub you have two flavors available, standard - simply called Google Pub/Sub and lite - which is more cost-effective but less available and with a limited set of features. You can for example run Pub/Sub Lite in a single zone, which will make things cheaper but replicate your data only within that one zone whereas Pub/Sub standard guarantees synchronous replication to at least two zones and best effort replication to a third zone.

Google Pub/Sub configuration

Google Pub/Sub administration is available together with all the other GCP services in the GCP web console which is very convenient.

Creating a topic

Typically, when creating a topic you have limited options available, except for assigning it a name (of course) you can pick an option to have a default subscription created, specify a schema for your messages (created separately), message encoding, and retention duration. For standard day-to-day operations, you just choose a name and you are good to go.

Quick Tip: When creating a topic for a real-world application, take 5 minutes with your team and decide on the naming conventions which you will use across the whole Pub/Sub system to name your topics and subscriptions to those topics in a consistent way - this is a huge advantage when looking up Pub/Sub resources with hundreds of topics and subscriptions.

Creating a subscription

When you have your topic(s) created where you can send your messages, it's time to create some subscriptions which will be responsible for keeping data on Google’s infrastructure before it will be delivered to your services. With subscriptions, you have a much bigger set of configuration options available (you can even push them directly to Google BigQuery) and we won’t go into all of them but only the most important ones.

A couple of the most important things you need to consider when creating your subscription are:

  • How long do you expect your messages to wait for processing or more importantly - how long will it take till they get acknowledged, you need to set message retention duration (default is 7 days)
  • How long will it take to process a message on the subscriber side, default is 10 seconds if the ack or nack will not be received by Pub/Sub within that timeframe the message will be redelivered (possibly to a different instance of subscriber).
  • Retry policy - shall your messages be resent by Pub/Sub immediately or with exponential backoff delay (I would recommend the latter so you can have some time for your services being subscribers to recover in case of failure)
  • Dead Lettering - highly recommended for all your subscriptions - when the message won’t be processed (for example cannot be parsed on the subscriber's side) it won’t be redelivered continuously eating up resources on your subscriber service nor it won’t get lost after the retention period duration passes.

Quick Tip: When creating subscriptions, it's highly recommended to utilize another GCP offering which is Alerting. You can set up alerts whenever new messages appear on your dead letter subscriptions or when the messages on your subscriptions are older than a specific amount of time - this way you can get notified when things are slow with your subscribers or when things are wrong.

create%20a%20topic

Useful tools

Except for the aforementioned alerting, you can monitor your subscriptions using a variety of available metrics:

metrics

As well as retrieve a subset of your messages from your subscriptions without acknowledging them to see their content.

messages

Using Google Pub/Sub with Scala

Although Google provides multiple bindings to all of its services (including Pub/Sub service), together with Java SDK https://cloud.google.com/pubsub/docs/reference/libraries when you are using Scala and effect types there is a nice library available, developed by permutive-engineering called fs2-google-pubsub which allows you to use PubSub with Fs2 streams.

There are two ways you can use this library, either opt for the Google Grpc version or use Http with Http4s and REST protocol. All the differences, pros, and cons of both are listed on the library GitHub website and I won’t copy them over here. In our example project, we will use the Http4s solution which is fully functional and stable.

Example Project

To demonstrate basic operations with Scala and Pub/Sub I have created a simple Http4s based project available on GitHub: https://github.com/softberries/pubsub-example-scala

Basic flow includes receiving an Order object with an HTTP Post request and sending it over to our Google Pub/Sub topic. Another service called OrderService is continuously receiving any Order messages appearing on one of the order topic subscriptions. This way, you can simply discover how to handle the publisher and subscriber parts in your Scala projects.

api

To start off, the first thing we need to do is to declare the topics and subscriptions you have created within the GCP console in Pub/Sub, as well as the location of your key file, which needs to be created upfront to communicate with your Google Pub/Sub instance.

application.conf

http {
    host = "localhost"
    host = ${?HOSTNAME}
    port = 8089
    port = ${?PORT}
}

pub-sub {
    project-id = "some-project-id"
    project-id = ${?PROJECT_ID}
    host = "pubsub.googleapis.com"
    port = "443"
    key-file-location = "/var/secrets/googlepubsub/key.json"
    key-file-location = ${?PUB_SUB_KEY_FILE_LOCATION}
    subscriptions {
        orders = "orders-dev-sub"
        orders = ${?ORDERS_SUBSCRIPTION}
    }
    topics {
        orders = "orders-dev"
        topic = ${?ORDERS_TOPIC}
    }
}

OrderEvent sent between our Scala is a simple case class with a couple of fields defined, plus additional implicit decoders and encoders.

OrderEvent

import com.permutive.pubsub.consumer.decoder.MessageDecoder
import com.permutive.pubsub.producer.encoder.MessageEncoder
import core.pubsub.CirceImplicits.{pubSubDecoder, pubSubEncoder}
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}

case class OrderEvent(id: String, name: String, price: Double)

object OrderEvent {
  implicit val orderDecoder: Decoder[OrderEvent] =
    deriveDecoder[OrderEvent]
  implicit val orderEncoder: Encoder[OrderEvent] =
    deriveEncoder[OrderEvent]
  implicit lazy val pubSubMsgEncoder: MessageEncoder[OrderEvent] = pubSubEncoder[OrderEvent]
  implicit lazy val pubSubMsgDecoder: MessageDecoder[OrderEvent] = pubSubDecoder[OrderEvent]
}

To meet the demands of posting messages to Pub/Sub we have abstracted away the producer creation to a form of a factory which you can use to create many different producers depending only on your topic name.

import cats.effect.IO
import cats.effect.kernel.Resource
import cats.implicits.catsSyntaxOptionId
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.http.{HttpPubsubProducer, PubsubHttpProducerConfig}
import config.PubSubConfiguration
import org.http4s.client.Client
import org.typelevel.log4cats.Logger

private[pubsub] trait PubSubProducerFactory {
  def createProducer[T: MessageEncoder](topicName: String): Resource[IO, PubsubProducer[IO, T]]
}

private[pubsub] class PubSubProducerFactoryImpl(
    client: Client[IO],
    config: PubSubConfiguration
)(implicit logger: Logger[IO]) extends PubSubProducerFactory {

  def createProducer[T: MessageEncoder](topicName: String): Resource[IO, PubsubProducer[IO, T]] =
    HttpPubsubProducer.resource[IO, T](
      projectId = Model.ProjectId(config.projectId),
      topic = Model.Topic(topicName),
      googleServiceAccountPath = config.keyFileLocation.some,
      config = PubsubHttpProducerConfig[IO](
        host = config.host,
        port = config.port,
        isEmulator = false,
        onTokenRefreshError = e => logger.error(s"Producer $topicName error - Token refresh error: $e"),
        onTokenRetriesExhausted = e =>
          logger.error(
            s"Producer $topicName error -  Token retries exhausted error: $e"
          )
      ),
      client
    )
}

For the subscriber part you can utilize two classes, one called PubSubSubscriber which you initialize with your subscription name:

Subscriber setup

def subscribe(subscriptionName: String): fs2.Stream[IO, ConsumerRecord[IO, T]] =
    PubsubHttpConsumer.subscribe[IO, T](
      projectId = ProjectId(config.projectId),
      subscription = Subscription(subscriptionName),
      serviceAccountPath = Some(config.keyFileLocation),
      config = PubsubHttpConsumerConfig[IO](
        host = config.host,
        port = config.port,
        isEmulator = false,
        onTokenRefreshError = e =>
          logger.error(
            s"Subscriber $subscriptionName error - Token refresh error: $e"
          ),
        onTokenRetriesExhausted = e =>
          logger.error(
            s"Subscriber $subscriptionName error - Token retries exhausted error: $e"
          )
      ),
      client,
      (msg, err, _, nack) =>
        logger.error(
          s"Got error: $err from subscription: $subscriptionName on message: ${msg.messageId}"
        ) *> nack
    )

Second, analogous to our producers, you have a consumer factory where you create your consumers using the aforementioned subscriber.

Consumer factory

import cats.effect.IO
import cats.implicits.catsSyntaxApply
import com.permutive.pubsub.consumer.decoder.MessageDecoder
import config.PubSubConfiguration
import org.http4s.client.Client
import org.typelevel.log4cats.Logger

class PubSubConsumer[T: MessageDecoder] private[pubsub] (
    pubSubSubscriber: PubSubSubscriber[T],
    subscriptionName: String
)(implicit logger: Logger[IO]) {
  def consumePubSubMessages(consumptionRecipe: T => IO[Boolean]): fs2.Stream[IO, Unit] = {
    fs2.Stream.eval(logger.info(s"Running $subscriptionName consumer")) *>
      pubSubSubscriber
        .subscribe(subscriptionName)
        .evalMap(
          msg =>
            consumptionRecipe(msg.value).flatMap(
              isSuccessful =>
                if (isSuccessful) msg.ack
                else msg.nack
            )
        )
  }
}

object PubSubConsumer {
  def create[T: MessageDecoder](client: Client[IO], subscriptionName: String, config: PubSubConfiguration)(
      implicit logger: Logger[IO]
  ): PubSubConsumer[T] =
    new PubSubConsumer[T](
      pubSubSubscriber = new PubSubSubscriberImpl[T](client = client, config = config),
      subscriptionName = subscriptionName
    )
}

To visualize and consume incoming messages I have created a very simple service that simply logs the incoming messages:

OrderService

class OrderService()(implicit logger: Logger[IO]) {
  def consumeOrder(msg: OrderEvent): IO[Boolean] = {
    logger.info(s"Consuming order: $msg") *> IO.pure(true)
  }
}

Usage

In order to create your producer, simply pass a reference to your http client and configuration:

Create producer

class PubSubModule(client: Client[IO], pubSubConfiguration: PubSubConfiguration)(implicit logger: Logger[IO]) {
  lazy val pusSubProducerFactory = new PubSubProducerFactoryImpl(client, pubSubConfiguration)

  lazy val orderProducer: Resource[IO, PubsubProducer[IO, OrderEvent]] =
    pusSubProducerFactory.createProducer[OrderEvent](pubSubConfiguration.topics.orders)
}

For the consumer part, we glue it together in our Main method and specify the service which will handle the incoming messages:

Init consumers

override def run(args: List[String]): IO[ExitCode] = {
    //resource modules
    lazy val infrastructureResourceModule = new InfrastructureResourceModule(config)

    val programStream =
      for {
        //modules
        ....................
        //routes
        ....................
        //server initialization
        _ <- BlazeServerBuilder[IO]
              .bindHttp(config.http.port, config.http.host)
              .withHttpApp(
                (http4sInterpreter.toRoutes(endpoints.asInstanceOf[List[ServerEndpoint[Fs2Streams[IO], IO]]]) <+>
                  http4sInterpreter.toRoutes(swaggerEndpoints)).orNotFound
              )
              .serve
              .concurrently {
                //consumers initialization
                coreModule.orderMessageConsumer
                  .consumePubSubMessages(
                    coreModule.orderService.consumeOrder
                  )
                  .void
              }
      } yield ()
    programStream.compile.drain.as(ExitCode.Success)
  }

Example

Make sure all the settings in application.conf are set and run the project with standard sbt run, open up automatically generated swagger docs: http://localhost:8089/docs/ and send over a message on /api/create endpoint.

Swagger usage:

swagger

The message should be received in OrderApi and passed onto the producer which will send it over to Google Pub/Sub. Pub/Sub will then redistribute that message to all defined subscribers for this topic (in our case only one). Once the message is available in our subscription it should get picked up by our subscriber and handled by the OrderService.

Messages in GCP Pub/Sub console:

console

Running output:

running%20output

Summary

Google Pub/Sub can be a very useful tool in your toolbox when architecting a microservice-based solution with asynchronous messaging at its base, and when you don’t want the additional burden of taking care of Rabbit cluster or Kafka to be too big for your needs. It's easy to get started, easy to monitor, and get notified when something goes awry. It's getting more popular with more and more libraries and tools added every year and should be at least considered when planning your next project and/or already utilizing other GCP services.

Blog Comments powered by Disqus.