---
title: flatMap Your Job Queue
teaser: 'Make your job queue more performant, robust, and understandable by breaking
  it up into a pipeline of messages.

  '
tags: data
author: Joe Ferris
published_on: 2019-08-16
---

Many applications have one or more large data jobs. Maybe you need to download a
fresh set of reports every day. Maybe you need to update a cache every hour.
Whatever you need to do, you've likely turned to a staple of web development:
the **job queue**.

You create a queue that describes work to be done. You start a pool of workers
that will pull a job from the queue, complete the work it describes, and repeat
until the queue is empty. A job queue is perfect for simple tasks like sending
emails or generating thumbnails from uploaded images.

However, this approach has some pitfalls when working with large jobs. If a
large job is interrupted in the middle, it will need to start over from the
beginning. Worse, it may create a lot of orphaned data. It's difficult to scale
a large job without breaking it up, but doing so creates hidden dependencies
within jobs and makes a unified job queue very difficult to understand.

We can overcome these difficulties by flattening a large job into a stream of
messages. Let's demonstrate this by working through an example job. Our example
will be written using [Scala], but if you're not familiar with Scala you can
still follow along, and the principles apply to other languages as well.

## The Job: Fetching CSVs

Our sample job takes a URL for a CSV to download. The CSV looks like this:

| accountId | transactionsUrl         |
| --------- | ----------------------- |
| 1         | https://example.com/123 |
| 2         | https://example.com/234 |

We'll take each line and use it to fetch another CSV, this time containing
transactions for the given account:

| id | amount |
| -- | ------ |
| 1  | 5.24   |
| 2  | 6.71   |

We'll take each line from each of those CSVs and save the transactions to the
account.

Our job is written using synchronous, blocking IO:

``` scala
def run(accountsUrl: Url): Unit = {
  val accountsCsv: Array[Byte] = httpClient.download(accountsUrl)
  val accountUrls: List[(Long, Url)] = csvParser.parse(accountsCsv)

  accountUrls.foreach {
    case (accountId, transactionsUrl) =>
      val transactionsCsv: Array[Byte] = httpClient.download(transactionsUrl)
      val transactions: List[Transaction] = csvParser.parse(transactionsCsv)

      transactions.foreach { transaction =>
        transactionDatabase.save(accountId, transaction)
      }
  }
}
```

The initial version will block at each step and pull all the data it needs into
memory. This prevents us from executing independent steps concurrently, and it
could potentially use a lot of memory by loading a lot of data.

Our job currently looks like this:

![Before](https://images.thoughtbot.com/flatmap-your-job-queue/JktnK6epR9OHyeSpHhf9_before.svg)

We want to flatten the job as much as possible so that each step is a small
message which can be independently processed. It will end up looking more like
this:

![After](https://images.thoughtbot.com/flatmap-your-job-queue/pK9zM2EpR16ozTaWaV5o_after.svg)

## Moving Towards Streams

 Our first improvement will be to rewrite it using a streaming data type:

``` scala
def run(accountsUrl: Url): IO[Unit] = {
  val accountsCsv: Stream[IO, Byte] = httpClient.download(accountsUrl)
  val accountUrls: Stream[IO, (Long, Url)] = csvParser.parse(accountsCsv)

  accountUrls
    .evalMap {
      case (accountId, transactionsUrl) =>
        val transactionsCsv: Stream[IO, Byte] =
          httpClient.download(transactionsUrl)
        val transactions: Stream[IO, Transaction] =
          csvParser.parse(transactionsCsv)

        transactions
          .evalMap { transaction =>
            transactionDatabase.save(accountId, transaction)
          }
          .compile
          .drain
    }
    .compile
    .drain
}
```

This new version looks similar, but takes advantage of the `Stream` type from
[fs2]. A `Stream` is like a lazy `List` with side effects. It can process a
large or infinite list without loading the entire dataset into memory, and it
can track side effects like downloading files and writing to databases.

By using a stream, the work will now be performed in a streaming fashion:
whenever a chunk of bytes is ready from the first CSV, it will be run through
the CSV parser. Whenever a chunk of CSV lines is ready, it will start the next
CSV download. That download will produce chunks of transactions, which will be
written to the database. It operates as a stream: a chunk is pulled from the
previous step whenever a step finishes processing its current chunk.

There's one interesting thing about the shape of this stream: one step of the
stream produces a new stream. Each time we download a CSV for an account, we get
a new stream of transactions for that account. We're currently processing each
inner stream fully before starting the next.

## Flattening the Stream

We can make the steps of our stream more independent by flattening all the
streams into one, using `flatten`:

``` scala
def run(accountsUrl: Url): IO[Unit] =
  httpClient
    .download(accountsUrl)
    .through(csvParser.parse[(Long, Url)])
    .map {
      case (accountId, transactionsUrl) =>
        httpClient
          .download(transactionsUrl)
          .through(csvParser.parse[Transaction])
          .map(transaction => (accountId, transaction))
    }
    .flatten
    .evalMap {
      case (accountId, transaction) =>
        transactionDatabase.save(accountId, transaction)
    }
    .compile
    .drain
```

This informs fs2 that one step produces new streams. The output of each produced
stream will be flattened into the source stream, producing a single, continuous
stream of transactions. Now that each step of our stream is structurally
independent, we can perform independent steps concurrently:

``` scala
def run(accountsUrl: Url): IO[Unit] =
  httpClient
    .download(accountsUrl)
    .through(csvParser.parse[(Long, Url)])
    .prefetchN(128)
    .map {
      case (accountId, transactionsUrl) =>
        httpClient
          .download(transactionsUrl)
          .through(csvParser.parse[Transaction])
          .map(transaction => (accountId, transaction))
    }
    .parJoin(8)
    .prefetchN(128)
    .evalMap {
      case (accountId, transaction) =>
        transactionDatabase.save(accountId, transaction)
    }
    .compile
    .drain
```

This latest version includes three improvements:

* Parsing lines of the original CSV will happen concurrently while the resulting
  line is being processed. Up to 128 lines will be buffered before it pauses.
* Up to 8 transaction CSVs will be downloaded concurrently.
* Database writes will happen concurrently while CSVs are being downloaded and
  processed. Up to 128 transactions will be buffered before CSV processing
  pauses.

## Distributed, Durable Processing

So far, we've made the steps of our stream structurally independent and started
running them concurrently. However, we still have no way of recovering from a
job that was interrupted in the middle. We also have no way of scaling this
process beyond a single worker node, as each import must be fully handled in a
single session. We can move beyond a single node by transforming this job into a
series of messages. We'll do this using [Apache Kafka] and fs2.

We can model this job as three distinct messages, each with its own consumer:

* A message indicating that a new import URL is available for processing
* A message indicating that a new transaction URL is available for processing
* A message indicating that a transaction is ready to write to the database

In general, a Kafka consumer reads from one log of messages, called a
<dfn>topic</dfn>. Each consumed message may result in one or more messages being
produced to a new topic. This allows us to chain multiple consumers together
until we're ready to write our results to a database. You can read more about
consumers in the [Kafka documentation].

We'll start with a consumer for import URLs:

``` scala
def produceAccountUpdates: IO[Unit] =
  kafka
    .consumerStream[IO]
    .using(ConsumerSettings[IO, String, Url])
    .evalTap(_.subscribeTo("imports"))
    .flatMap(_.stream)
    .flatMap { committable =>
      httpClient
        .download(committable.record.value)
        .through(csvParser.parse[(Long, Url)])
        .map {
          case (accountId, url) =>
            ProducerRecords.one(
              ProducerRecord("account-updates", accountId, url),
              committable.offset
            )
        }
    }
    .through(kafka.produce(ProducerSettings[IO, Long, Url]))
    .evalMap(_.passthrough.commit)
    .compile
    .drain
```

In this example, we consume from a topic called "imports" that contains URLs we
should download. We download each file and produce new messages containing the
account ID and URL to download to fetch transactions.

We can fetch the transactions in another consumer:

``` scala
def fetchAccountUpdates: IO[Unit] =
  kafka
    .consumerStream[IO]
    .using(ConsumerSettings[IO, Long, Url])
    .evalTap(_.subscribeTo("account-updates"))
    .flatMap(_.stream)
    .flatMap { committable =>
      httpClient
        .download(committable.record.value)
        .through(csvParser.parse[Transaction])
        .map { transaction =>
          ProducerRecords.one(
            ProducerRecord(
              "transactions",
              transaction.id,
              (committable.record.key, transaction)
            ),
            committable.offset
          )
        }
    }
    .through(kafka.produce(ProducerSettings[IO, Long, (Long, Transaction)]))
    .evalMap(_.passthrough.commit)
    .compile
    .drain
```

This consumes the topic containing URLs for transactions and downloads them.
Each line parsed from the transaction CSV will be produced to a new topic.

Lastly, we need to write the transactions to our database:

``` scala
def saveAccountUpdates: IO[Unit] =
  kafka
    .consumerStream[IO]
    .using(ConsumerSettings[IO, Long, (Long, Transaction)])
    .evalTap(_.subscribeTo("transactions"))
    .flatMap(_.stream)
    .evalMap { committable =>
      val (accountId, transaction) = committable.record.value
      transactionDatabase
        .save(accountId, transaction)
        .flatMap { _ =>
          committable.offset.commit
        }
    }
    .compile
    .drain
```

This consumes the transactions topic and writes each transaction to the
database.

These Kafka consumers will run on independent threads. For smaller scale, you
could run all the consumers in a single process. However, you can also run each
consumer in one or more independent processes, allowing massive, horizontal
scale.

## Conclusion

By breaking up our job into several messages, we've acquired some new abilities:

* A job interrupted in the middle won't need to **reprocess** steps that have
  already completed. We can further improve the delivery guarantees of this
  approach by taking advantage of Kafka's transactional commits.
* We can **scale** each step of the job horizontally and independently
* If we want to **reuse** the output of a step, we can create a new consumer
  that uses the same topic. For example, we could also write the transactions to
  a different database.
* If we need to **aggregate** the incoming data, we can do so using Kafka
  Streams.

Using Kafka also has some advantages over queue-based message brokers like
RabbitMQ:

* Using consistent partitioning, we are guaranteed transactions for a given
  account are processed in order.
* By using transactional commits and managing offsets for the final database
  write, we can simulate exactly-once delivery.
* If we need to aggregate data, we can probably aggregate it using Kafka streams
  from existing topics rather than using a separate system like [Spark].

Learning to think in terms of message streams instead of jobs queues is an
important step in building data pipelines. Using small, independent messages
allows you to process data more efficiently and opens up opportunities for
higher concurrency and better consistency guarantees.

[Scala]: https://www.scala-lang.org/
[fs2]: https://fs2.io/
[Apache Kafka]: https://kafka.apache.org/
[Kafka documentation]: https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
[Spark]: https://spark.apache.org/
