subscribe(on:) vs receive(on:)
In this post, instead of looking into the basics of multi-threading with Combine, we are going to have a look at the difference between subscribe(on:)
and receive(on:)
specifically.
We’re going to look at a typical subscription chain starting with a root publisher, a couple of operators, and a subscriber at the end.
We’ll look into more diagrams and some example code of how subscribe(on:)
and receive(on:)
affect subscriptions.
subscribe(on:)
subscribe(on:)
sets the scheduler on which you’d like the current subscription to be “managed” on. This operator sets the scheduler to use for creating the subscription, cancelling, and requesting input.
In other words subscribe(on:)
sets the scheduler to subscribe the upstream on.
A side effect of subscribing on the given scheduler is that subscribe(on:)
also changes the scheduler for its downstream like so:
Note: Except if you don’t explictly set the downstream receving scheduler via
receive(on:)
, we’ll look at that case further down.
For the purpose of this post I’ll use a custom publisher which when subscribed prints whether it’s subscribed on the main thread or not:
extension Publishers {
struct IsMainThread: Publisher {
typealias Output = String
typealias Failure = Never
func receive<S>(subscriber: S) where S : Subscriber,
Never == S.Failure, String == S.Input {
debugPrint("IsMainThread: \(Thread.isMainThread)")
subscriber.receive(subscription: Subscriptions.empty)
DispatchQueue.main.async {
_ = subscriber.receive("test")
}
}
}
}
Note: This is an oversimplified publisher for debug purposes that doesn’t create its own subscription. If you want to learn how to create your own publishers correctly have a look at Combine: Asynchronous programming with Swift.
The IsMainThread
publishers pushes an empty subscription to the subscriber and emits asynchronously a single test output value. The publisher always emits its only value on the main queue.
Additionally, while receiving a new subscriber it prints whether the code runs on the main thread or not.
Let’s subscribe that publisher on the main thread and see the output:
override func viewDidLoad() {
Publishers.IsMainThread()
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
}
This outputs:
IsMainThread: true
Sink: true
So both the publisher and the sink code are executed on the main thread which is the current thread in a viewDidLoad()
method.
Let’s try now inserting a subscribe(on:)
directly after the publisher like so:
override func viewDidLoad() {
Publishers.IsMainThread()
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
}
This time the output is:
IsMainThread: false
Sink: false
This time subscribe(on:)
subscribes its upstream on the given global queue and that’s why the code inside Publisher.IsMainThread
outputs false
. As a side effect subscribing the publisher on a global queue also results in the output downstream being delivered on the same queue so sink also prints out a false
.
Now let’s insert an operator (which means another publisher) in between Publisher.IsMainThread
and subscribe(on:)
:
Publishers.IsMainThread()
.map { $0 }
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
The output remains the same:
IsMainThread: false
Sink: false
Subscribing to map(...)
on a global queue leads to also subscribing Publishers.IsMainThread
on that queue. So using subscribe(on:)
sets the subscribing scheduler all the way up the stream.
Let’s see what will happen if we use two or more subscribe(on:)
operators in the same chain:
Publishers.IsMainThread()
.subscribe(on: DispatchQueue.main)
.map { $0 }
.subscribe(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
The output this time is:
IsMainThread: true
Sink: true
The subscribing works correctly up the stream - the upper most subscribe(on:)
creates the Publishers.IsMainThread
subscription on the main queue.
However, the value delivered to the sink
subscriber is also delivered on the main queue…
This example shows that changing the downstream scheduler with subscribe(on:)
is merely a side effect which you should not rely on. subscribe(on:)
is for controlling the upstream opeartions: subscribe, cancel, and request input.
receive(on:)
The correct operator to use to change the scheduler where downstream output is delivered is receive(on:)
.
On other words receive(on:)
sets the scheduler where the downstream receives output on.
You can use receive(on:)
similarly to subscribe(on:)
. You can use multiple receive(on:)
operators and that will always change the downstream scheduler:
Let’s try few examples. First let’s revisit our first piece of code and try a receive(on:)
:
Publishers.IsMainThread()
.receive(on: DispatchQueue.global())
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
The output this time:
IsMainThread: true
Sink: false
So unlike trying the same thing with subscribe(on:)
, receive(on:)
changes the queue only to its downstream. That’s why Publisher.IsMainThread()
this time is created on the current thread (which in my case is the main one) and the sink code is ran on a global queue.
We can try multiple receive(on:)
operators by inserting an extra map(...)
in between like so:
Publishers.IsMainThread()
.receive(on: DispatchQueue.global())
.map { value -> String in
debugPrint("Map: \(Thread.isMainThread)")
return value
}
.receive(on: DispatchQueue.main)
.sink { _ in
print("Sink: \(Thread.isMainThread)")
}
.store(in: &subscriptions)
Which prints:
IsMainThread: true
Map: false
Sink: true
This time around what goes down is:
- The subscription is created on the current queue - the main one
- I switch the executuion to a new global queue
map()
executes- I switch to the main queue
sink
executes
Note:
Publishers.IsMainThread
always outputs its value on the main queue (you can revise the publisher code from the beginning of the post).receive(on:)
overrides that and switches to your desired scheduler regardless what queue does the publisher code use internally.
This is very helpful when for example you’re adding your code in a view controller callback like viewDidLoad()
or another code running on the main queue. Your subscription can switch to a global queue in order to fetch data from the network or disk and process it, and finally switch back to the main queue to update your app’s UI.
Mix and Match
In a multi-threaded environment you are likely to want to control both on which schedulers you subscribe and receive output. Here’s how a more complex chain that gets data from a resource intensive publisher (it reads data from disk for example), then processes the data in a global queue, and finally updates the UI looks like:
[Resource Intensive Publisher]
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.global())
[Resource Intensive Processing of Output]
.receive(on: DispatchQueue.main)
[Update App UI on the Main Queue]
Hopefully that clears any confusion about subscribe(on:)
vs receive(on:)
and you can start enjoying multi-threading Combine style :)
Where to go from here?
To learn about multi-threading and working with schedulers in depth check out Combine: Asynchronous programming with Swift - this is where you can see all updates, discuss in the website forums, and more.