Spark custom receivers with Google PubSub and NYC Taxi Tycoon data stream

Krzysztof Grajek

01 Sep 2021.4 minutes read

Today we will tackle one of the coolest features of Spark and Spark Streaming, which is the ability to provide custom receivers so we can stream data from anywhere into our Spark cluster.

For the purposes of this tutorial, we are going to feed our Spark with a stream of data from the New York City Taxi Tycoon sources. This data is available through the Google PubSub, hence our custom receiver will be a Google PubSub one! There is no implementation for Google PubSub receiver by default in Spark, but as you will see shortly, it’s pretty easy to set up your own if you need to.

Felix Morgner @ Flickr.com

GCP Setup

One of the first things to set up when working with GCP services is a service account we are going to use to connect. To set up one for our needs, simply go to the IAM & Admin section of your GCP console and add a new service account.

After filling up the name, click Create and Continue:

On the second window of the creation wizard, add a Pub/Sub Admin role and finish up by clicking Done. On the page with the list of your service accounts, select 3 dots on the right and click Manage keys.

Create new a JSON format key and download it to your machine.

Now we can continue setting up a new subscription on the NYC Taxi topic. Open up Google Pub/Sub and add new subscription:

Enter the topic name: projects/pubsub-public-data/topics/taxirides-realtime.

!IMPORTANT! Please remember to clean up any resources on GCP after playing around with them.

Custom Receiver Implementation

Now we can start implementing our PubSub receiver that can be used with Spark streaming. Documentation for custom receivers is available here. Basically, you need to extend the Receiver class and implement 2 methods: onStart and onStop. The onStart method must be a non-blocking one so the receiving of data must occur on a separate thread where you can use store(..) for the data to be processed by Spark.

Within our onStart method, we spawn a new Thread, and do the rest of the work in a separate helper method where we initialise a connection, receive the messages (as String) and pass it onto Spark:

def onStart() {
    // Start the thread that receives data over a connection
    new Thread("PubSub Receiver") {
      override def run() { receive() }
    }.start()
  }

Our receive method uses the fs2-google-pubsub library with Http4s to establish the connection and receive messages from PubSub.

private def receive() {

    val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
    implicit val cs: ContextShift[IO] = IO.contextShift(ec)
    implicit val timer: Timer[IO] = IO.timer(ec)
    implicit def unsafeLogger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

    implicit val decoder: MessageDecoder[ValueHolder] = (bytes: Array[Byte]) => {
      Try(ValueHolder(new String(bytes))).toEither
    }
    val client = AsyncHttpClient.resource[IO]()
    val mkConsumer = PubsubHttpConsumer.subscribe[IO, ValueHolder](
      Model.ProjectId(projectId),
      Model.Subscription(subscriptionName),
      credentialsPath,
      PubsubHttpConsumerConfig(
        host = host,
        port = port,
        isEmulator = false,
      ),
      _: Client[IO],
      (msg, err, ack, _) => IO(println(s"Msg $msg got error $err")) >> ack,
    )

    fs2.Stream.resource(client)
      .flatMap(mkConsumer)
      .evalTap(t => t.ack >> IO(store(t.value.msg)))
      .compile
      .lastOrError
      .handleErrorWith { e => IO(restart(s"Restarting due to error: ${e.getMessage}"))}
      .unsafeRunSync()

    restart("Trying to connect again")
  }

First, we need to set up some implicit values needed by the fs2-google-pubsub library and a decoder function where we convert the received bytes into their String representation. We use those implicit vals to set up new PubsubHttpConsumer where, in addition, we pass projectId, subscriptionName, host, and port as the values needed to establish a connection. All of those additional values are injected through the constructor arguments.

Lastly, we start receiving the messages with the FS2 Stream and pass them on for further processing by Spark with the store method:

.evalTap(t => t.ack >> IO(store(t.value.msg)))

Using custom PubSub receiver

Now it’s time to use it! You can use your custom receiver as a parameter to receiveStream on the SparkContext object. Our custom receiver is initialised with all needed parameters for PubSub connection.

val spark = SparkSession
      .builder
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
    ssc.checkpoint("data/checkpoint/")

    val messagesRaw = ssc.receiverStream(new PubSubCustomReceiver("your-project-name",
      "nyc-typhoon-sub", "pubsub.googleapis.com", 443,
      "data/service-key.json"))

Once we obtain a message stream, we can start working with the data. The received objects are representing a TaxiRide, which looks like the following:

case class TaxiRide (
                      ride_id: String,
                      point_idx: Long,
                      latitude: Double,
                      longitude: Double,
                      timestamp: String,
                      meter_reading: Double,
                      meter_increment: Double,
                      ride_status: String,
                      passenger_count: Int

I didn’t bother to properly deserialise timestamps or enums for the purposes of this blog post. Each message is parsed with a simple Play JSON implementation:

def parseTR(str: String): TaxiRide = {
    implicit val taxiRideFormat: OFormat[TaxiRide] = Json.format[TaxiRide]
    Json.parse(str).as[TaxiRide]
  }

After parsing, we process the messages by counting the occurrences of each ride_status using a moving window mechanism (stats are collected for the last 30 seconds with a 5-second interval).

val messages = messagesRaw.map(v => parseTR(v))
    val statusKeyValues = messages.map(msg => (msg.ride_status, 1))

    // Now count them up over a 5 minute window sliding every 5 seconds
    val statusCounts = statusKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(30), Seconds(5))

    // Sort the results by the count values
    val sortedResults = statusCounts.transform(rdd => rdd.sortBy(x => x._2, ascending = false))

    // Print the top 10
    sortedResults.print
    ssc.start()
    ssc.awaitTermination()
    // Stop the session
    spark.stop()

The results are printed to the console:

In other words, if I understand the incoming data properly, the New York Tycoon taxi cabs are picking up a new person (or more) more or less every second. Amazing! :)

Blog Comments powered by Disqus.