---
title: Let’s Build a Custom Publisher in Combine
teaser: 'Once you know the boilerplate, it''s not hard to build a custom publisher
  in Combine.

  '
tags: combine,reactive programming,swift,ios
author: Adam Sharp
published_on: 2020-06-05
---

After [getting started with Combine][combine-intro], [making a few network calls][combine-urlsession], and perhaps trying out the [Timer publisher][combine-timers] or [KVO][combine-kvo], eventually you’ll reach a point where you reach for a custom `Publisher` to solve your problem. You might have used [ReactiveSwift][] or [RxSwift][] and found yourself missing one of the many useful operators they provide. Maybe you’d even like to try your hand at contributing one to [CombineExt][]!

However, the `Publisher` protocol seems deceptively simple, having just a single method:

```swift
public protocol Publisher {
  associatedtype Output
  associatedtype Failure: Error

  func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure
}
```

At the time of writing there’s no official guide to writing a conformance, so let’s work through the process and see what’s involved.

[combine-intro]: https://developer.apple.com/documentation/combine/receiving_and_handling_events_with_combine
[combine-urlsession]: https://developer.apple.com/documentation/foundation/urlsession/processing_url_session_data_task_results_with_combine
[combine-timers]: https://developer.apple.com/documentation/combine/replacing_foundation_timers_with_timer_publishers
[combine-kvo]: https://developer.apple.com/documentation/combine/performing_key-value_observing_with_combine
[ReactiveSwift]: https://github.com/ReactiveCocoa/ReactiveSwift
[RxSwift]: https://github.com/ReactiveX/RxSwift
[CombineExt]: https://github.com/CombineCommunity/CombineExt

## The simplest useful publisher

Let’s pick a (hopefully) simple example, inspired by a few of Combine’s built-in utility publishers:

- `Just`, which emits a single value that you provide and immediately finishes;
- `Empty`, which doesn’t emit any values and immediately completes<sup><a href="#fn1-17337" id="fnr1-17337" title="see footnote" class="footnote">1</a></sup>;
- `Fail`, which doesn’t emit any values and immediately fails with a provided error.

The publisher we’ll build is called `Always`. It’s similar to `Just`, but instead of completing, it repeatedly emits the same value as long as there is demand.<sup><a href="#fn2-17337" id="fnr2-17337" title="see footnote" class="footnote">2</a></sup>

How is it useful? Here’s one way<sup><a href="#fn3-17337" id="fnr3-17337" title="see footnote" class="footnote">3</a></sup> we could use it to compute a sequence of squared numbers:

```swift
import Foundation

let numbers = [1, 2, 3, 4].publisher

let squares = numbers.zip(Always(2))
  .map { pow($0, $1) } // [1, 4, 9, 16]
```

`Always` is an infinite sequence, but `Publisher.zip(_:)` will complete the first time either publisher emits a completion event. The effect is to transform an infinite sequence into a finite one. Zipping a finite sequence together with an infinite one is a surprisingly useful technique!

## Step 0: Publisher boilerplate

This part is a significant roadblock in getting to the actual interesting part of writing our publisher, and could be where many decide to move on to a different approach. I’ll outline a template you can follow that will make it much easier to get started.
<aside>
(If you’d like to jump to the end result, or follow along with the implementation, you can find [the finished implementation of `Always` in a GitHub gist][gist].)
</aside>

First, let’s quickly cover the parts to making a custom publisher:

1. The `Publisher` type itself. This is the easiest part, especially when Xcode can auto-complete the declarations for you. The problem is that publishers don’t really _do_ anything: their job is to construct and connect the other types that do the real work.
2. Every publisher is backed by a `Subscription`. This will be a class, and it’s where the magic happens. This isn’t obvious at first, because a subscription is not part of the publisher’s public API!
3. The `Subscriber`. Publishers must be generic over any kind of subscriber, which complicates the type signature of the `Publisher.receive(subscriber:)` method. The benefit is type safety: the subscription class will also be generic over the subscriber, allowing any kind of subscriber to be attached as long as it has matching `Input` and `Failure` associated types.

That was a lot of words, so let me show you the template code we’ll begin with:

```swift
/// `Always` is a publisher that continuously emits the
/// same output as long as there is demand.
public struct Always<Output>: Publisher {
  public typealias Failure = Never

  public let output: Output

  public init(_ output: Output) {
    self.output = output
  }

  public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
    let subscription = Subscription(output: output, subscriber: subscriber)
    subscriber.receive(subscription: subscription)
  }
}

private extension Always {
  final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
    private let output: Output
    private var subscriber: S?

    init(output: Output, subscriber: S) {
      self.output = output
      self.subscriber = subscriber
    }
  }
}
```

This seems to be everything we need to get started, but the type error we get leads us into the interesting part.

## Step 1: The interesting part

If you paste the code above into a source file and try to compile it, you’ll be met with the following error:

```
Sources/Always/Always.swift:14:38: error: argument type 'Always<Output>.Subscription<S>' does not conform to expected type 'Subscription'
    subscriber.receive(subscription: subscription)
                                     ^
                                                  as! Subscription
```

Our `Subscription` type needs to conform to the `Combine.Subscription` protocol:

```swift
extension Always.Subscription: Subscription {
  func request(_ demand: Subscribers.Demand) {
    // TODO: Respond to demand by emitting values
  }
}
```

We now have all the pieces we need: a `Subscription`, a `Subscriber`, and *demand* that needs to be satisfied. So what can we do with a `Subscribers.Demand` value?

1. Demand is like a special integer, and we can compare it just like one. Here’s how we check if the demand is positive:

    ```swift
    if demand > 0 {
      // true if demand == .unlimited or .max(>=1)
    }
    ```

2. We can modify demand to keep track of how much is remaining, like an index or counter:

    ```swift
    // Keep track whenever we emit a value to the subscriber
    demand -= 1
    ```

With those two tips, we have all we need to implement `Subscription.request(_:)`. First, we unwrap `subscriber`, and then store the `demand` in a variable that we can mutate to keep track of how many times we’ve sent `output` to our subscriber:

```swift
guard let subscriber = subscriber else { return }
var demand = demand
```

Now, as long as the demand is positive, repeatedly send `output`, one value at a time:

```
while demand > 0 {
  demand -= 1
  let newDemand = subscriber.receive(output)
  demand += newDemand
}
```

That’s not a lot of code! Most of it is boilerplate, and once you find the real guts of the publisher, there’s not a whole lot to it. Just one more method to write and we’ll have a functioning publisher.

## Step 2: Cancellation, and a hidden surprise

The `Subscription` protocol inherits from another Combine protocol named `Cancellable`:

```swift
/// A protocol indicating that an activity or action may
/// be cancelled.
///
/// Calling `cancel()` frees up any allocated resources.
/// It also stops side effects such as timers, network
/// access, or disk I/O.
public protocol Cancellable {
    /// Cancel the activity.
    func cancel()
}
```

So what does cancellation mean for `Always`? Well, it should stop sending values to its `subscriber`, regardless of pending demand. Now you can see why we declared `subscriber` as optional — because our implementation practically writes itself:

```swift
extension Always.Subscription: Cancellable {
  func cancel() {
    subscriber = nil
  }
}
```

Let’s test our implementation, shall we?

```swift
final class AlwaysTests: XCTestCase {
  func testItEmitsASingleValue() {
    var output: [Int] = []
    _ = Always(1).first().sink { output.append($0) }
    XCTAssertEqual(output, [1])
  }
}
```

What happens when we run this code? It gets into an infinite loop! Let’s use the `print()` operator to try and get some insight into what’s going on here:

```
_ = Always(1).first().print("First").sink { /* ... */ }
// First: receive subscription: (First)
// First: request unlimited
// First: receive value: (1)
// First: receive finished
```

This is curious: because `first()` only ever emits a single value, wouldn’t it make sense for it to request `max(1)` from our subscription? Why is it requesting `unlimited`? This could be why our code goes into an infinite loop, because `unlimited` demand *never* reaches zero\!<sup><a href="#fn4-17337" id="fnr4-17337" title="see footnote" class="footnote">4</a></sup>

This was a real head-scratcher for me. I wondered whether I’d found a bug in the `first()` operator, so here’s how I tested it purely out of publishers Combine already provides:

```swift
// An infinite sequence of ones
let ones = sequence(first: 1) { $0 }

// Convert it to a publisher, then ask for the first value:
_ = ones.publisher.first().sink { print($0) }
```

Unfortunately, this works precisely as expected: it prints a single “1” and then completes. What gives?

[Jasdev][] provided [the insight that helped solve this issue][forums] on the Swift forums, again through clever use of the `print()` operator to understand what `first()` is up to:

```
_ = ones.publisher.print("Ones").first.sink { /* ... */ }
// Ones: receive subscription: (Sequence)
// Ones: request unlimited
// Ones: receive value: (1)
// Ones: receive cancel
```

Here’s what’s happening:

1. The `sink()` requests `unlimited` demand.
2. `first()` then passes this request upstream, even though it only needs `max(1)` elements.
3. It receives its first value from upstream, then immediately calls `cancel()` to prevent further values from being delivered.

And this is the problem with the current implementation of `Always`: it doesn’t properly handle cancellation. Here’s how we can fix it:

```swift
// We used to check if `subscriber` is `nil` here...
var demand = demand

// ...but now we check it here, instead.
while let subscriber = subscriber, demand > 0 {
  demand -= 1
  demand += subscriber.receive(output)
}
```

Instead of checking whether `subscriber` is `nil` up front, we instead need to check on every iteration of the loop. The reason is subtle: `subscriber.receive(output)` can result in `cancel()` being called recursively, so that by the time it returns we need to bail out as quickly as possible instead of sending more values.

[forums]: https://forums.swift.org/t/synchronous-publishers-how-do-they-work/36098/3

## Conclusion

We’ve seen that the core implementation of a simple, synchronous publisher isn’t much code at all: `Subscription.request(_:)` is just four lines. There are a few tricky gotchas when it comes to handling back pressure and cancellation, but now we’re armed with the knowledge of how to tackle them in more complicated publishers. What’s more, we have a handy template of 20 or so lines of code that will get us started next time.

You can find [the implementation in its entirety][gist] on GitHub. Special thanks to [Mike Burns][] and [Jasdev Singh][Jasdev] for reviewing early drafts of this post. Happy publishing!

[gist]: https://gist.github.com/sharplet/ddf2debb7eccff40d307027f4c6d9f0c
[Mike Burns]: https://thoughtbot.com/blog/authors/mike-burns
[Jasdev]: https://twitter.com/jasdev

<div class="footnotes">
<hr />
<ol>

<li id="fn1-17337">
<p><code>Empty</code>’s initialiser accepts a parameter <code>completeImmediately</code>, whose default value is <code>true</code>. Passing <code>false</code> results in a publisher that <em>never</em> completes. There’s an obvious name for such a publisher, but it turns out <a href="https://developer.apple.com/documentation/swift/never"><code>Never</code> was already taken</a>. <a href="#fnr1-17337" title="return to article" class="reversefootnote">&#8617;&#xFE0E;</a></p>
</li>

<li id="fn2-17337">
<p>In brief, <em>demand</em> — represented in Combine by the <a href="https://developer.apple.com/documentation/combine/subscribers/demand"><code>Subscribers.Demand</code> type</a> — gives subscribers a way to communicate how many values they’re ready to receive. Apple provides a good introduction to the concepts of <em>demand</em> and <em>back pressure</em> in their guide <a href="https://developer.apple.com/documentation/combine/processing_published_elements_with_subscribers">Processing Published Elements with Subscribers</a>. <a href="#fnr2-17337" title="return to article" class="reversefootnote">&#8617;&#xFE0E;</a></p>
</li>

<li id="fn3-17337">
<p>There are ways to solve this problem without <code>Always</code>, including inlining the exponent: <code>pow($0, 2)</code>. But those are less fun! <a href="#fnr3-17337" title="return to article" class="reversefootnote">&#8617;&#xFE0E;</a></p>
</li>

<li id="fn4-17337">
<p>This is why I described <code>Subscribers.Demand</code> as being “like a special integer.” A regular integer will always get smaller when you subtract from it, but <code>Subscribers.Demand.unlimited</code> will always remain <code>unlimited</code>, regardless of what you add to or subtract from it. <a href="#fnr4-17337" title="return to article" class="reversefootnote">&#8617;&#xFE0E;</a></p>
</li>

</ol>
</div>
