Contents

Constructing effective data processing workflows using Elixir and Broadway

Constructing effective data processing workflows using Elixir and Broadway webp image

Today I will investigate some possibilities of Broadway - the Elixir way of doing data ingestion and data processing pipelines.

Broadway is a tool that enables developers to create and manage data pipelines by building complex topologies based on GenStage. Basically, Broadway is an additional layer of abstraction over GenStage which hugely simplifies complex GenStage topologies (especially backpressure, batching and graceful shutdown). It supports concurrent and multi-stage processing of data from various sources, such as message queues, streaming platforms and cloud services. Broadway handles the orchestration, concurrency, fault-tolerance and back-pressure of the pipeline automatically. It is designed to be scalable, reliable and easy to use for data ingestion and data processing tasks.

The main built-in features of Broadway include:

  • Back pressure
  • Automatic acknowledgment
  • Fault tolerance
  • Graceful shutdown
  • Built-in testing
  • Custom failure handling
  • Static and Dynamic batching
  • Ordering and Partitioning
  • Rate Limiting
  • Metrics

We will explore some of Broadway’s features by building a simple Phoenix application where we will consume messages from Google’s PubSub subscription and process it through our pipeline where we will add steps like batch message pre-processing, handling of single message, static batch processing, custom failure handling and partitioning. At the end we will integrate Broadway Dashboard which will help us visualise our pipeline and enable us to see, in real time, possible bottlenecks and message throughput.

Basic Setup

GCP Credentials and PubSub Subscription

To get started, if you want to follow along with the code samples (or simply clone the repo - link at the end of the post), you need to set up a Google Service Account with Google IAM. Service account is needed to create a subscription from publicly available data of New York City taxi events topic. I will not go into details on how to set it up in Google console as I have already done so in other blog post of mine. Just follow the instructions about setting up a subscription.

The most important thing, if you are already familiar with Google GCP, is to create a subscription from an already existing topic: projects/pubsub-public-data/topics/taxirides-realtime.

Please note that the amount of messages being sent to your subscription is huge so I highly recommend deleting all created resources for this tutorial after playing around with it.

Once you have the name of the subscription and .json file with your service account credentials you are ready to roll.

Phoenix Application

We start by creating a basic Phoenix application, you don’t need Phoenix to run Broadway pipelines but in order to use Phoenix Dashboard it is much easier to set it up this way.

create_app.sh
For this tutorial we won’t be using any html views nor databases, hence no ecto.

After skeleton app had been created we need to add necessary dependencies. In your mix.exs file add the following:

deps.ex
and run mix deps.get to download the dependencies for your Phoenix app to use.

Basic Data Ingestion Pipeline

Classic implementation of Broadway behaviour (also included on the Broadway github pages) looks like the following:

basic_broadway.exs
start_link is the place where you define configuration for your producer as well as the whole topology of your processing pipeline. You can create your own custom producer or use one of the available connectors:

  • Amazon SQS
  • Apache Kafka
  • Google Pub/Sub
  • RabbitMQ

In this tutorial we will use the official Broadway connector for Google Pub/Sub.

If you use default producer and batcher with concurrency set to 1,

broadway_default.exs
your pipeline topology for the above configuration will look like this:
pipeline topology
With such a simple supervision tree you basically have a single processor and a batcher processing your pipeline. Whatever logic you write in handle_message is going to be executed in your processor actor and whatever you do in handle_batch will be handled down the supervision tree under your batcher. Every message returned by batcher will be automatically acknowledged, although there are ways to override this behaviour - more on that later.

Of course with a topology like the one above we are not using the best thing we have in our hands, which is concurrency given to us by Elixir and GenStage. The beauty of Broadway is that you can define more complex topologies just by modifying configurations in start_link. Here is an example of topology we are going to create for the purposes of this tutorial:
an example of topology
Our custom error handling and partitioning, which we will add to our projects, are not depicted on this diagram for clarity, nevertheless, let's start with explaining what we are going to build.

The Data

For starters, just for clarity and reference I will show you how a single sample message received from Google subscription looks like:

nyc-msg.exs 
What we will do in our pipeline is:

  1. Implement sharding on processors level by ride_id so that single ride_id will end up on the same processor instance.
  2. Pre-process messages with a special handler method prepare_messages(messages, _context) which in our case will parse the JSON string to Elixir map.
  3. Route every message by ride_status to appropriate batcher
  4. Handle custom errors with handle_failed(messages, _context) when the ride_status is unknown
  5. Print out the results in each batcher and acknowledge the message.

Our Topology Explained

As you can see from the previous image we have increased the number of processors handling our messages to 6 and then we route every message depending on its content to a specific batcher. As we receive 3 kinds of messages from Google Pub/Sub, which are:

  • Pickup event
  • Enroute event
  • Dropoff event

We process batches of those messages in different batchers.

What is important is that batching allows us to further increase throughput by processing multiple pieces of data in one operation, eg. by inserting multiple messages at once to the database once they were processed and prepared by the processor in handle_message. For the purposes of this tutorial we just print out the content of processed messages and acknowledge them.

In the incoming stream of messages from Google we know that most of the messages are enroute which means we don’t need that many batch processors for pickup and dropoff as they do just a fraction of work, hence we have concurrency set to 1 for pickup and dropoff batchers. The enroute batcher has 4 separate processors to do its job.

topology.ex
The configuration itself is really straightforward, as I have already mentioned we have 6 processors receiving data from Pub/Sub producers and 3 separate batches with different numbers of processors for each one. You can easily tweak the numbers here and watch the pipeline in real-time using the Broadway Dashboard to quickly detect possible bottlenecks and modify your topology structure to suit your needs.

The Code Explained

Although our pipeline is quite complex we had to prepare just a few Elixir methods to run our logic. Before diving into proper Elixir code, let’s see how we configure the connection to our Google Pub/Sub subscriber:

config.ex
Pre-processing
Broadway allows you to pre-process messages in batches with the prepare_messages method. This is the place where you can parse your message or extend it with additional data before you pass it further down the processing tree onto the processor. For example append user data fetched from the database.

msg_preprocess.exs
What is important in prepare_messages is that we operate here on a batch of messages instead of a single one, this can increase throughput if we are able to fetch multiple records from the database by executing a single call. Another line of code to be aware of is Broadway.Message.update_data which updates the data part of the Broadway message struct. The messages returned from prepare_message are passed over to processors where handle_message for each gets executed.

Jason.decode can return different types of values (either :ok or :error) which will be handled by different implementations of handle_message using Elixir powerful pattern matching.

Handle message

handle_message_1.ex
handle_msg_default.ex
We have actually 2 kinds of handle_message created, which one gets executed depends on the result from prepare_messages logic. If the JSON parsing was successful, we would route the message to the appropriate batcher, otherwise we will use Broadway’s custom error handling mechanism and mark the message as failed. Failed messages are handled by an additional method described below.

Handle failed

handle_failed.ex
In our implementation, we simply match failed messages marked as ‘invalid-data’, print them out and acknowledge them. You can build custom error handling solution this way and decide what you want to do in case of different errors - eg. acknowledge, resend, store for further investigation.

Handle batch

handle_batch.ex
Once messages get processed in handle_message, they are again grouped in batches and processed down the tree in batchers. We have used a static batching mechanism here where we specify the name of the batchers in configuration and create separate method for each batcher executor. Messages returned from batchers are automatically acknowledged.

Partitioning

partition.ex
To make things a little more interesting we added some form of partitioning. You can specify partitioning for producers, batchers. Two separate methods for both or single or a single partition method defined in your config after processors and batchers configuration which will be used by both. n the last case you need to be aware that the message you are handling in partition will be the message coming into the producer and at a later time the one coming into the batcher, so be careful when modifying messages on the fly in the handle_message method.

The partition method needs to return an integer, if you don’t have one in your message body which you could use, you can help yourself by hashing the string using the Erlang built-in method :erlang.phash2.

Broadway Integration

After implementing all the methods in our module we are ready to integrate it into our Phoenix application. You need to do a couple of modifications to the existing, generated Phoenix files, starting with application.ex

integration.ex
We need to add our custom Broadway module BroadwayPubSubSimple as a child to the list of Phoenix application children array.

Broadway Dashboard

As mentioned at the beginning we are adding Broadway Dashboard to help us visualize our pipeline topology. Assuming you have added the dependency as described earlier, all you need to do is to modify route.ex and add additional_pages to the Phoenix built-in live dashboard. This will, in effect, add additional tab in your standard Phoenix dashboard where you can observe your pipelines.

dashboard.ex
Phoenix dashboard

Running the application

After adding our Broadway module to Phoenix, together with some additional configuration for Broadway Dashboard and our Broadway module implemented you can start the application up and see the messages being processed in real time with our pipeline.

First, make the credentials to Google Pub/Sub available as en environment variable by exporting it in your shell and start up the Phoenix app:

phoenix_start.sh
The messages from our subscription start being processed straight away. Switch to your Phoenix dashboard and navigate to the Broadway pipelines tab to see our topology in action.

To see partitioning in action I have selected logs for a single ride_id (in this case ”732c74e0-d54c-439c-b950-340f30c201c3”)

partition_logs.log
With PID printed in each log, we are able to see that the single ride_id is always executed by the same processor. Because we have partitioning set only on the processor level, when the messages are passed down the tree to the batchers layer we no longer care about partitions and the same ride_id is being executed by different batch actors.

Final notes

Broadway solves the problem of how to structure supervision trees in an elegant way, bringing you concurrency, fault tolerance and other before-mentioned features out of the box. Although we have built a rather simple pipeline showcasing basic Broadway functionality we haven’t touched many important features like dynamic batching, graceful shutdown and rate limiting, just to name a few. Those would need to wait for the next blog posts in the exciting Broadway endeavour of mine.

Example project with source code available at Github repository.

Blog Comments powered by Disqus.