Many applications have one or more large data jobs. Maybe you need to download a fresh set of reports every day. Maybe you need to update a cache every hour. Whatever you need to do, you’ve likely turned to a staple of web development: the job queue.
You create a queue that describes work to be done. You start a pool of workers that will pull a job from the queue, complete the work it describes, and repeat until the queue is empty. A job queue is perfect for simple tasks like sending emails or generating thumbnails from uploaded images.
However, this approach has some pitfalls when working with large jobs. If a large job is interrupted in the middle, it will need to start over from the beginning. Worse, it may create a lot of orphaned data. It’s difficult to scale a large job without breaking it up, but doing so creates hidden dependencies within jobs and makes a unified job queue very difficult to understand.
We can overcome these difficulties by flattening a large job into a stream of messages. Let’s demonstrate this by working through an example job. Our example will be written using Scala, but if you’re not familiar with Scala you can still follow along, and the principles apply to other languages as well.
The Job: Fetching CSVs
Our sample job takes a URL for a CSV to download. The CSV looks like this:
accountId | transactionsUrl |
---|---|
1 | https://example.com/123 |
2 | https://example.com/234 |
We’ll take each line and use it to fetch another CSV, this time containing transactions for the given account:
id | amount |
---|---|
1 | 5.24 |
2 | 6.71 |
We’ll take each line from each of those CSVs and save the transactions to the account.
Our job is written using synchronous, blocking IO:
def run(accountsUrl: Url): Unit = {
val accountsCsv: Array[Byte] = httpClient.download(accountsUrl)
val accountUrls: List[(Long, Url)] = csvParser.parse(accountsCsv)
accountUrls.foreach {
case (accountId, transactionsUrl) =>
val transactionsCsv: Array[Byte] = httpClient.download(transactionsUrl)
val transactions: List[Transaction] = csvParser.parse(transactionsCsv)
transactions.foreach { transaction =>
transactionDatabase.save(accountId, transaction)
}
}
}
The initial version will block at each step and pull all the data it needs into memory. This prevents us from executing independent steps concurrently, and it could potentially use a lot of memory by loading a lot of data.
Our job currently looks like this:
We want to flatten the job as much as possible so that each step is a small message which can be independently processed. It will end up looking more like this:
Moving Towards Streams
Our first improvement will be to rewrite it using a streaming data type:
def run(accountsUrl: Url): IO[Unit] = {
val accountsCsv: Stream[IO, Byte] = httpClient.download(accountsUrl)
val accountUrls: Stream[IO, (Long, Url)] = csvParser.parse(accountsCsv)
accountUrls
.evalMap {
case (accountId, transactionsUrl) =>
val transactionsCsv: Stream[IO, Byte] =
httpClient.download(transactionsUrl)
val transactions: Stream[IO, Transaction] =
csvParser.parse(transactionsCsv)
transactions
.evalMap { transaction =>
transactionDatabase.save(accountId, transaction)
}
.compile
.drain
}
.compile
.drain
}
This new version looks similar, but takes advantage of the Stream
type from
fs2. A Stream
is like a lazy List
with side effects. It can process a
large or infinite list without loading the entire dataset into memory, and it
can track side effects like downloading files and writing to databases.
By using a stream, the work will now be performed in a streaming fashion: whenever a chunk of bytes is ready from the first CSV, it will be run through the CSV parser. Whenever a chunk of CSV lines is ready, it will start the next CSV download. That download will produce chunks of transactions, which will be written to the database. It operates as a stream: a chunk is pulled from the previous step whenever a step finishes processing its current chunk.
There’s one interesting thing about the shape of this stream: one step of the stream produces a new stream. Each time we download a CSV for an account, we get a new stream of transactions for that account. We’re currently processing each inner stream fully before starting the next.
Flattening the Stream
We can make the steps of our stream more independent by flattening all the
streams into one, using flatten
:
def run(accountsUrl: Url): IO[Unit] =
httpClient
.download(accountsUrl)
.through(csvParser.parse[(Long, Url)])
.map {
case (accountId, transactionsUrl) =>
httpClient
.download(transactionsUrl)
.through(csvParser.parse[Transaction])
.map(transaction => (accountId, transaction))
}
.flatten
.evalMap {
case (accountId, transaction) =>
transactionDatabase.save(accountId, transaction)
}
.compile
.drain
This informs fs2 that one step produces new streams. The output of each produced stream will be flattened into the source stream, producing a single, continuous stream of transactions. Now that each step of our stream is structurally independent, we can perform independent steps concurrently:
def run(accountsUrl: Url): IO[Unit] =
httpClient
.download(accountsUrl)
.through(csvParser.parse[(Long, Url)])
.prefetchN(128)
.map {
case (accountId, transactionsUrl) =>
httpClient
.download(transactionsUrl)
.through(csvParser.parse[Transaction])
.map(transaction => (accountId, transaction))
}
.parJoin(8)
.prefetchN(128)
.evalMap {
case (accountId, transaction) =>
transactionDatabase.save(accountId, transaction)
}
.compile
.drain
This latest version includes three improvements:
- Parsing lines of the original CSV will happen concurrently while the resulting line is being processed. Up to 128 lines will be buffered before it pauses.
- Up to 8 transaction CSVs will be downloaded concurrently.
- Database writes will happen concurrently while CSVs are being downloaded and processed. Up to 128 transactions will be buffered before CSV processing pauses.
Distributed, Durable Processing
So far, we’ve made the steps of our stream structurally independent and started running them concurrently. However, we still have no way of recovering from a job that was interrupted in the middle. We also have no way of scaling this process beyond a single worker node, as each import must be fully handled in a single session. We can move beyond a single node by transforming this job into a series of messages. We’ll do this using Apache Kafka and fs2.
We can model this job as three distinct messages, each with its own consumer:
- A message indicating that a new import URL is available for processing
- A message indicating that a new transaction URL is available for processing
- A message indicating that a transaction is ready to write to the database
In general, a Kafka consumer reads from one log of messages, called a topic. Each consumed message may result in one or more messages being produced to a new topic. This allows us to chain multiple consumers together until we’re ready to write our results to a database. You can read more about consumers in the Kafka documentation.
We’ll start with a consumer for import URLs:
def produceAccountUpdates: IO[Unit] =
kafka
.consumerStream[IO]
.using(ConsumerSettings[IO, String, Url])
.evalTap(_.subscribeTo("imports"))
.flatMap(_.stream)
.flatMap { committable =>
httpClient
.download(committable.record.value)
.through(csvParser.parse[(Long, Url)])
.map {
case (accountId, url) =>
ProducerRecords.one(
ProducerRecord("account-updates", accountId, url),
committable.offset
)
}
}
.through(kafka.produce(ProducerSettings[IO, Long, Url]))
.evalMap(_.passthrough.commit)
.compile
.drain
In this example, we consume from a topic called “imports” that contains URLs we should download. We download each file and produce new messages containing the account ID and URL to download to fetch transactions.
We can fetch the transactions in another consumer:
def fetchAccountUpdates: IO[Unit] =
kafka
.consumerStream[IO]
.using(ConsumerSettings[IO, Long, Url])
.evalTap(_.subscribeTo("account-updates"))
.flatMap(_.stream)
.flatMap { committable =>
httpClient
.download(committable.record.value)
.through(csvParser.parse[Transaction])
.map { transaction =>
ProducerRecords.one(
ProducerRecord(
"transactions",
transaction.id,
(committable.record.key, transaction)
),
committable.offset
)
}
}
.through(kafka.produce(ProducerSettings[IO, Long, (Long, Transaction)]))
.evalMap(_.passthrough.commit)
.compile
.drain
This consumes the topic containing URLs for transactions and downloads them. Each line parsed from the transaction CSV will be produced to a new topic.
Lastly, we need to write the transactions to our database:
def saveAccountUpdates: IO[Unit] =
kafka
.consumerStream[IO]
.using(ConsumerSettings[IO, Long, (Long, Transaction)])
.evalTap(_.subscribeTo("transactions"))
.flatMap(_.stream)
.evalMap { committable =>
val (accountId, transaction) = committable.record.value
transactionDatabase
.save(accountId, transaction)
.flatMap { _ =>
committable.offset.commit
}
}
.compile
.drain
This consumes the transactions topic and writes each transaction to the database.
These Kafka consumers will run on independent threads. For smaller scale, you could run all the consumers in a single process. However, you can also run each consumer in one or more independent processes, allowing massive, horizontal scale.
Conclusion
By breaking up our job into several messages, we’ve acquired some new abilities:
- A job interrupted in the middle won’t need to reprocess steps that have already completed. We can further improve the delivery guarantees of this approach by taking advantage of Kafka’s transactional commits.
- We can scale each step of the job horizontally and independently
- If we want to reuse the output of a step, we can create a new consumer that uses the same topic. For example, we could also write the transactions to a different database.
- If we need to aggregate the incoming data, we can do so using Kafka Streams.
Using Kafka also has some advantages over queue-based message brokers like RabbitMQ:
- Using consistent partitioning, we are guaranteed transactions for a given account are processed in order.
- By using transactional commits and managing offsets for the final database write, we can simulate exactly-once delivery.
- If we need to aggregate data, we can probably aggregate it using Kafka streams from existing topics rather than using a separate system like Spark.
Learning to think in terms of message streams instead of jobs queues is an important step in building data pipelines. Using small, independent messages allows you to process data more efficiently and opens up opportunities for higher concurrency and better consistency guarantees.