flatMap Your Job Queue

Joe Ferris

Many applications have one or more large data jobs. Maybe you need to download a fresh set of reports every day. Maybe you need to update a cache every hour. Whatever you need to do, you’ve likely turned to a staple of web development: the job queue.

You create a queue that describes work to be done. You start a pool of workers that will pull a job from the queue, complete the work it describes, and repeat until the queue is empty. A job queue is perfect for simple tasks like sending emails or generating thumbnails from uploaded images.

However, this approach has some pitfalls when working with large jobs. If a large job is interrupted in the middle, it will need to start over from the beginning. Worse, it may create a lot of orphaned data. It’s difficult to scale a large job without breaking it up, but doing so creates hidden dependencies within jobs and makes a unified job queue very difficult to understand.

We can overcome these difficulties by flattening a large job into a stream of messages. Let’s demonstrate this by working through an example job. Our example will be written using Scala, but if you’re not familiar with Scala you can still follow along, and the principles apply to other languages as well.

The Job: Fetching CSVs

Our sample job takes a URL for a CSV to download. The CSV looks like this:

accountId transactionsUrl
1 https://example.com/123
2 https://example.com/234

We’ll take each line and use it to fetch another CSV, this time containing transactions for the given account:

id amount
1 5.24
2 6.71

We’ll take each line from each of those CSVs and save the transactions to the account.

Our job is written using synchronous, blocking IO:

def run(accountsUrl: Url): Unit = {
  val accountsCsv: Array[Byte] = httpClient.download(accountsUrl)
  val accountUrls: List[(Long, Url)] = csvParser.parse(accountsCsv)

  accountUrls.foreach {
    case (accountId, transactionsUrl) =>
      val transactionsCsv: Array[Byte] = httpClient.download(transactionsUrl)
      val transactions: List[Transaction] = csvParser.parse(transactionsCsv)

      transactions.foreach { transaction =>
        transactionDatabase.save(accountId, transaction)
      }
  }
}

The initial version will block at each step and pull all the data it needs into memory. This prevents us from executing independent steps concurrently, and it could potentially use a lot of memory by loading a lot of data.

Our job currently looks like this:

Before

We want to flatten the job as much as possible so that each step is a small message which can be independently processed. It will end up looking more like this:

After

Moving Towards Streams

Our first improvement will be to rewrite it using a streaming data type:

def run(accountsUrl: Url): IO[Unit] = {
  val accountsCsv: Stream[IO, Byte] = httpClient.download(accountsUrl)
  val accountUrls: Stream[IO, (Long, Url)] = csvParser.parse(accountsCsv)

  accountUrls
    .evalMap {
      case (accountId, transactionsUrl) =>
        val transactionsCsv: Stream[IO, Byte] =
          httpClient.download(transactionsUrl)
        val transactions: Stream[IO, Transaction] =
          csvParser.parse(transactionsCsv)

        transactions
          .evalMap { transaction =>
            transactionDatabase.save(accountId, transaction)
          }
          .compile
          .drain
    }
    .compile
    .drain
}

This new version looks similar, but takes advantage of the Stream type from fs2. A Stream is like a lazy List with side effects. It can process a large or infinite list without loading the entire dataset into memory, and it can track side effects like downloading files and writing to databases.

By using a stream, the work will now be performed in a streaming fashion: whenever a chunk of bytes is ready from the first CSV, it will be run through the CSV parser. Whenever a chunk of CSV lines is ready, it will start the next CSV download. That download will produce chunks of transactions, which will be written to the database. It operates as a stream: a chunk is pulled from the previous step whenever a step finishes processing its current chunk.

There’s one interesting thing about the shape of this stream: one step of the stream produces a new stream. Each time we download a CSV for an account, we get a new stream of transactions for that account. We’re currently processing each inner stream fully before starting the next.

Flattening the Stream

We can make the steps of our stream more independent by flattening all the streams into one, using flatten:

def run(accountsUrl: Url): IO[Unit] =
  httpClient
    .download(accountsUrl)
    .through(csvParser.parse[(Long, Url)])
    .map {
      case (accountId, transactionsUrl) =>
        httpClient
          .download(transactionsUrl)
          .through(csvParser.parse[Transaction])
          .map(transaction => (accountId, transaction))
    }
    .flatten
    .evalMap {
      case (accountId, transaction) =>
        transactionDatabase.save(accountId, transaction)
    }
    .compile
    .drain

This informs fs2 that one step produces new streams. The output of each produced stream will be flattened into the source stream, producing a single, continuous stream of transactions. Now that each step of our stream is structurally independent, we can perform independent steps concurrently:

def run(accountsUrl: Url): IO[Unit] =
  httpClient
    .download(accountsUrl)
    .through(csvParser.parse[(Long, Url)])
    .prefetchN(128)
    .map {
      case (accountId, transactionsUrl) =>
        httpClient
          .download(transactionsUrl)
          .through(csvParser.parse[Transaction])
          .map(transaction => (accountId, transaction))
    }
    .parJoin(8)
    .prefetchN(128)
    .evalMap {
      case (accountId, transaction) =>
        transactionDatabase.save(accountId, transaction)
    }
    .compile
    .drain

This latest version includes three improvements:

  • Parsing lines of the original CSV will happen concurrently while the resulting line is being processed. Up to 128 lines will be buffered before it pauses.
  • Up to 8 transaction CSVs will be downloaded concurrently.
  • Database writes will happen concurrently while CSVs are being downloaded and processed. Up to 128 transactions will be buffered before CSV processing pauses.

Distributed, Durable Processing

So far, we’ve made the steps of our stream structurally independent and started running them concurrently. However, we still have no way of recovering from a job that was interrupted in the middle. We also have no way of scaling this process beyond a single worker node, as each import must be fully handled in a single session. We can move beyond a single node by transforming this job into a series of messages. We’ll do this using Apache Kafka and fs2.

We can model this job as three distinct messages, each with its own consumer:

  • A message indicating that a new import URL is available for processing
  • A message indicating that a new transaction URL is available for processing
  • A message indicating that a transaction is ready to write to the database

In general, a Kafka consumer reads from one log of messages, called a topic. Each consumed message may result in one or more messages being produced to a new topic. This allows us to chain multiple consumers together until we’re ready to write our results to a database. You can read more about consumers in the Kafka documentation.

We’ll start with a consumer for import URLs:

def produceAccountUpdates: IO[Unit] =
  kafka
    .consumerStream[IO]
    .using(ConsumerSettings[IO, String, Url])
    .evalTap(_.subscribeTo("imports"))
    .flatMap(_.stream)
    .flatMap { committable =>
      httpClient
        .download(committable.record.value)
        .through(csvParser.parse[(Long, Url)])
        .map {
          case (accountId, url) =>
            ProducerRecords.one(
              ProducerRecord("account-updates", accountId, url),
              committable.offset
            )
        }
    }
    .through(kafka.produce(ProducerSettings[IO, Long, Url]))
    .evalMap(_.passthrough.commit)
    .compile
    .drain

In this example, we consume from a topic called “imports” that contains URLs we should download. We download each file and produce new messages containing the account ID and URL to download to fetch transactions.

We can fetch the transactions in another consumer:

def fetchAccountUpdates: IO[Unit] =
  kafka
    .consumerStream[IO]
    .using(ConsumerSettings[IO, Long, Url])
    .evalTap(_.subscribeTo("account-updates"))
    .flatMap(_.stream)
    .flatMap { committable =>
      httpClient
        .download(committable.record.value)
        .through(csvParser.parse[Transaction])
        .map { transaction =>
          ProducerRecords.one(
            ProducerRecord(
              "transactions",
              transaction.id,
              (committable.record.key, transaction)
            ),
            committable.offset
          )
        }
    }
    .through(kafka.produce(ProducerSettings[IO, Long, (Long, Transaction)]))
    .evalMap(_.passthrough.commit)
    .compile
    .drain

This consumes the topic containing URLs for transactions and downloads them. Each line parsed from the transaction CSV will be produced to a new topic.

Lastly, we need to write the transactions to our database:

def saveAccountUpdates: IO[Unit] =
  kafka
    .consumerStream[IO]
    .using(ConsumerSettings[IO, Long, (Long, Transaction)])
    .evalTap(_.subscribeTo("transactions"))
    .flatMap(_.stream)
    .evalMap { committable =>
      val (accountId, transaction) = committable.record.value
      transactionDatabase
        .save(accountId, transaction)
        .flatMap { _ =>
          committable.offset.commit
        }
    }
    .compile
    .drain

This consumes the transactions topic and writes each transaction to the database.

These Kafka consumers will run on independent threads. For smaller scale, you could run all the consumers in a single process. However, you can also run each consumer in one or more independent processes, allowing massive, horizontal scale.

Conclusion

By breaking up our job into several messages, we’ve acquired some new abilities:

  • A job interrupted in the middle won’t need to reprocess steps that have already completed. We can further improve the delivery guarantees of this approach by taking advantage of Kafka’s transactional commits.
  • We can scale each step of the job horizontally and independently
  • If we want to reuse the output of a step, we can create a new consumer that uses the same topic. For example, we could also write the transactions to a different database.
  • If we need to aggregate the incoming data, we can do so using Kafka Streams.

Using Kafka also has some advantages over queue-based message brokers like RabbitMQ:

  • Using consistent partitioning, we are guaranteed transactions for a given account are processed in order.
  • By using transactional commits and managing offsets for the final database write, we can simulate exactly-once delivery.
  • If we need to aggregate data, we can probably aggregate it using Kafka streams from existing topics rather than using a separate system like Spark.

Learning to think in terms of message streams instead of jobs queues is an important step in building data pipelines. Using small, independent messages allows you to process data more efficiently and opens up opportunities for higher concurrency and better consistency guarantees.