After getting started with Combine, making a few network calls, and perhaps trying out the Timer publisher or KVO, eventually you’ll reach a point where you reach for a custom Publisher
to solve your problem. You might have used ReactiveSwift or RxSwift and found yourself missing one of the many useful operators they provide. Maybe you’d even like to try your hand at contributing one to CombineExt!
However, the Publisher
protocol seems deceptively simple, having just a single method:
public protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure
}
At the time of writing there’s no official guide to writing a conformance, so let’s work through the process and see what’s involved.
The simplest useful publisher
Let’s pick a (hopefully) simple example, inspired by a few of Combine’s built-in utility publishers:
Just
, which emits a single value that you provide and immediately finishes;Empty
, which doesn’t emit any values and immediately completes1;Fail
, which doesn’t emit any values and immediately fails with a provided error.
The publisher we’ll build is called Always
. It’s similar to Just
, but instead of completing, it repeatedly emits the same value as long as there is demand.2
How is it useful? Here’s one way3 we could use it to compute a sequence of squared numbers:
import Foundation
let numbers = [1, 2, 3, 4].publisher
let squares = numbers.zip(Always(2))
.map { pow($0, $1) } // [1, 4, 9, 16]
Always
is an infinite sequence, but Publisher.zip(_:)
will complete the first time either publisher emits a completion event. The effect is to transform an infinite sequence into a finite one. Zipping a finite sequence together with an infinite one is a surprisingly useful technique!
Step 0: Publisher boilerplate
This part is a significant roadblock in getting to the actual interesting part of writing our publisher, and could be where many decide to move on to a different approach. I’ll outline a template you can follow that will make it much easier to get started.
First, let’s quickly cover the parts to making a custom publisher:
- The
Publisher
type itself. This is the easiest part, especially when Xcode can auto-complete the declarations for you. The problem is that publishers don’t really do anything: their job is to construct and connect the other types that do the real work. - Every publisher is backed by a
Subscription
. This will be a class, and it’s where the magic happens. This isn’t obvious at first, because a subscription is not part of the publisher’s public API! - The
Subscriber
. Publishers must be generic over any kind of subscriber, which complicates the type signature of thePublisher.receive(subscriber:)
method. The benefit is type safety: the subscription class will also be generic over the subscriber, allowing any kind of subscriber to be attached as long as it has matchingInput
andFailure
associated types.
That was a lot of words, so let me show you the template code we’ll begin with:
/// `Always` is a publisher that continuously emits the
/// same output as long as there is demand.
public struct Always<Output>: Publisher {
public typealias Failure = Never
public let output: Output
public init(_ output: Output) {
self.output = output
}
public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
let subscription = Subscription(output: output, subscriber: subscriber)
subscriber.receive(subscription: subscription)
}
}
private extension Always {
final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {
private let output: Output
private var subscriber: S?
init(output: Output, subscriber: S) {
self.output = output
self.subscriber = subscriber
}
}
}
This seems to be everything we need to get started, but the type error we get leads us into the interesting part.
Step 1: The interesting part
If you paste the code above into a source file and try to compile it, you’ll be met with the following error:
Sources/Always/Always.swift:14:38: error: argument type 'Always<Output>.Subscription<S>' does not conform to expected type 'Subscription'
subscriber.receive(subscription: subscription)
^
as! Subscription
Our Subscription
type needs to conform to the Combine.Subscription
protocol:
extension Always.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
// TODO: Respond to demand by emitting values
}
}
We now have all the pieces we need: a Subscription
, a Subscriber
, and demand that needs to be satisfied. So what can we do with a Subscribers.Demand
value?
Demand is like a special integer, and we can compare it just like one. Here’s how we check if the demand is positive:
if demand > 0 { // true if demand == .unlimited or .max(>=1) }
We can modify demand to keep track of how much is remaining, like an index or counter:
// Keep track whenever we emit a value to the subscriber demand -= 1
With those two tips, we have all we need to implement Subscription.request(_:)
. First, we unwrap subscriber
, and then store the demand
in a variable that we can mutate to keep track of how many times we’ve sent output
to our subscriber:
guard let subscriber = subscriber else { return }
var demand = demand
Now, as long as the demand is positive, repeatedly send output
, one value at a time:
while demand > 0 {
demand -= 1
let newDemand = subscriber.receive(output)
demand += newDemand
}
That’s not a lot of code! Most of it is boilerplate, and once you find the real guts of the publisher, there’s not a whole lot to it. Just one more method to write and we’ll have a functioning publisher.
Step 2: Cancellation, and a hidden surprise
The Subscription
protocol inherits from another Combine protocol named Cancellable
:
/// A protocol indicating that an activity or action may
/// be cancelled.
///
/// Calling `cancel()` frees up any allocated resources.
/// It also stops side effects such as timers, network
/// access, or disk I/O.
public protocol Cancellable {
/// Cancel the activity.
func cancel()
}
So what does cancellation mean for Always
? Well, it should stop sending values to its subscriber
, regardless of pending demand. Now you can see why we declared subscriber
as optional — because our implementation practically writes itself:
extension Always.Subscription: Cancellable {
func cancel() {
subscriber = nil
}
}
Let’s test our implementation, shall we?
final class AlwaysTests: XCTestCase {
func testItEmitsASingleValue() {
var output: [Int] = []
_ = Always(1).first().sink { output.append($0) }
XCTAssertEqual(output, [1])
}
}
What happens when we run this code? It gets into an infinite loop! Let’s use the print()
operator to try and get some insight into what’s going on here:
_ = Always(1).first().print("First").sink { /* ... */ }
// First: receive subscription: (First)
// First: request unlimited
// First: receive value: (1)
// First: receive finished
This is curious: because first()
only ever emits a single value, wouldn’t it make sense for it to request max(1)
from our subscription? Why is it requesting unlimited
? This could be why our code goes into an infinite loop, because unlimited
demand never reaches zero!4
This was a real head-scratcher for me. I wondered whether I’d found a bug in the first()
operator, so here’s how I tested it purely out of publishers Combine already provides:
// An infinite sequence of ones
let ones = sequence(first: 1) { $0 }
// Convert it to a publisher, then ask for the first value:
_ = ones.publisher.first().sink { print($0) }
Unfortunately, this works precisely as expected: it prints a single “1” and then completes. What gives?
Jasdev provided the insight that helped solve this issue on the Swift forums, again through clever use of the print()
operator to understand what first()
is up to:
_ = ones.publisher.print("Ones").first.sink { /* ... */ }
// Ones: receive subscription: (Sequence)
// Ones: request unlimited
// Ones: receive value: (1)
// Ones: receive cancel
Here’s what’s happening:
- The
sink()
requestsunlimited
demand. first()
then passes this request upstream, even though it only needsmax(1)
elements.- It receives its first value from upstream, then immediately calls
cancel()
to prevent further values from being delivered.
And this is the problem with the current implementation of Always
: it doesn’t properly handle cancellation. Here’s how we can fix it:
// We used to check if `subscriber` is `nil` here...
var demand = demand
// ...but now we check it here, instead.
while let subscriber = subscriber, demand > 0 {
demand -= 1
demand += subscriber.receive(output)
}
Instead of checking whether subscriber
is nil
up front, we instead need to check on every iteration of the loop. The reason is subtle: subscriber.receive(output)
can result in cancel()
being called recursively, so that by the time it returns we need to bail out as quickly as possible instead of sending more values.
Conclusion
We’ve seen that the core implementation of a simple, synchronous publisher isn’t much code at all: Subscription.request(_:)
is just four lines. There are a few tricky gotchas when it comes to handling back pressure and cancellation, but now we’re armed with the knowledge of how to tackle them in more complicated publishers. What’s more, we have a handy template of 20 or so lines of code that will get us started next time.
You can find the implementation in its entirety on GitHub. Special thanks to Mike Burns and Jasdev Singh for reviewing early drafts of this post. Happy publishing!
-
Empty
’s initialiser accepts a parametercompleteImmediately
, whose default value istrue
. Passingfalse
results in a publisher that never completes. There’s an obvious name for such a publisher, but it turns outNever
was already taken. ↩︎ -
In brief, demand — represented in Combine by the
Subscribers.Demand
type — gives subscribers a way to communicate how many values they’re ready to receive. Apple provides a good introduction to the concepts of demand and back pressure in their guide Processing Published Elements with Subscribers. ↩︎ -
There are ways to solve this problem without
Always
, including inlining the exponent:pow($0, 2)
. But those are less fun! ↩︎ -
This is why I described
Subscribers.Demand
as being “like a special integer.” A regular integer will always get smaller when you subtract from it, butSubscribers.Demand.unlimited
will always remainunlimited
, regardless of what you add to or subtract from it. ↩︎