Previously, I wrote about actors in Actors part 1 and Actors part 2; this time I’m going to cover how to write a Combine operator to proxy a Combine publisher into an async sequence that you can iterate over by using a simple for loop.

Note: The current version of Xcode 13 is beta 2 and I’m going to be using the latest swift.org toolchain nightly snapshot in order to make use of AsyncStream. If you’re using beta 3 or later AsyncStream is likely supported out of the box.

AsyncSequence and AsyncStream

AsyncSequence is a new API introduced in iOS15/macOS12 that allows you to use the new async/await Swift syntax to create asynchronous sequence iterators.

The existing Swift standard library Sequence protocol provides a mechanism for working with finite or infinite sequences of elements.

AsyncSequence, simply put, is exactly the same sequence protocol with the difference that getting the next element in the sequence is asynchronous - i.e. you optionally might non-blockingly wait for the next element to become available.

Let’s look at a practical example: Foundation offers a new API on the URL type - URL.lines. lines is an asynchronous sequence that makes text lines of a file at the given URL available as elements in a sequence as soon as they are available. In other words, if you’re reading a remote file, you can iterate over URL.lines in a simple for loop and it will non-blockingly run the body of the loop for each text line of the file when it’s received from the server.

The mechanics of making an async sequence are the same as for sync sequences. The async sequence protocol requires a single method that makes an iterator:

1
2
3
4
// Source abbreviated
protocol AsyncSequence {
  func makeAsyncIterator() -> Self.AsyncIterator
}

The iterator protocol is also pretty flexible with only a single requirement that returns asynchronously the next element or a nil to terminate the sequence:

1
2
3
4
// Source abbreviated
protocol AsyncIteratorProtocol {
  func next() async throws -> Self.Element?
}

The neat thing then is that you can use your custom async sequence in a normal for loop without the need to pass around closures or fiddle with cancellables or other rituals:

1
2
3
for try await element in myAsyncSequence {
  print(element)
}

Making your own async sequences is quite easy but for most simple cases using the default AsyncStream implementation is sufficient. AsyncStream.init(...) takes a single closure that can yield elements, complete, or error. The closure is transparently used as the sequence iterator without the need for you to implement an iterator yourself.

Combine and AsyncStream

In this blog series we’re going to put AsyncStream and Combine side by side and see where are the similarities and make a simple plan how to make an operator to proxy Combine’s publishers to async sequences.

We’re, more specifically, going to compare Combine’s Subscriber and AsyncStream types. Let’s have a look at:

  1. Elements type - Combine subscribers are typed and receive a single type of element (Input); async streams also yield only a single value type (Element).
  2. Emitting an element - subscribers have a method called receive(_:) which emits a single element, async stream uses a continuation that can emit a value called yield(_:).
  3. Erroring - subscribers have a method receive(completion:) to which you can pass a failure with an error; a stream’s continuation has a finish(throwing:) method that takes an error too.
  4. Completing successfully - as previously, subscribers have receive(completion:) with a successful enum case, stream’s continuation accepts nil as the error in finish(throwing:) for successful completion.

The only remaining open question is publisher cancellation. Some publishers implement specific behavior when they are cancelled (usually resource cleanup). Cancelling is simply the consumer stopping being interested in the publisher.

This is a bit of a head scratcher.

Generally when you’re iterating over a sequence in a for loop there is no easy way to cancel the iterator. Your sequence might have a built-in mechanism to complete or error but not necessarily for being cancelled. But it might have a cancel mechanism since it’s a good idea for async code in general to have a way for being cancelled.

So for the purpose of building a proxy from Combine to async sequence (this blog series' ultimate goal) I think the async sequence should complete successfully if the publisher is cancelled. Erroring out seems inappropriate, and doing nothing will leave a for loop hanging forever.

It looks like, then, we can easily map all events of a Combine publisher to calling the respective methods on an async stream’s continuation!

We’ll do that in the next post: Bridge from Combine to AsyncSequence - the code (part 2)


Where to go from here?

All of the concurrency features in Swift are still work in progress. I’m using an Xcode beta version plus a swift.org toolchain right now to get all of the above working but it’s still interesting to play with all those new APIs and get a feeling of the future of Swift code.

Hit me up with your ideas, replies, and feedback on Twitter at https://twitter.com/icanzilb.


To learn about all Combine check Combine: Asynchronous programming with Swift - this is where you can see all updates, discuss in the website forums, and more.