Yesterday I wrote about AsyncSequence, AsyncStream and a simple plan how to bridge (or proxy) Combine publishers to the new asynchronous sequences APIs - Bridge from Combine to AsyncSequence - the plan (part 1).

Today let’s put together the code as planned and see if that’s going to work out so we can easily use publishers as async sequences.

CombineAsyncStream setup

I’ll create a new type called CombineAsyncStream and make it conform to AsyncSequence. As soon as I typed this, I got some errors as AsyncSequence requires me to defne its output type and its iterator type first:

1
2
3
4
5
class CombineAsyncStream<Upstream: Publisher>: AsyncSequence {
  typealias Element = Upstream.Output
  typealias AsyncIterator = CombineAsyncStream<Upstream>
  
}

Since I’d like to use this sequence type to proxy any kind of publisher I’m making it generic over its input publisher.

I’m required to define the iterator type (AsyncIterator) for the sequence but I don’t need a separate type for this so my sequence is going to be my iterator as well.

To satisfy AsyncSequence’s only requirement I need to add a method to get the sequence iterator, which in my case is just self:

1
2
3
func makeAsyncIterator() -> Self {
  return self
}

Next, the compiler complains that my type isn’t in fact an async iterator. Let’s see how to fix this the easiest way possible.

Note: That’s not what exactly the error is Xcode says but trust me the issue is the AsyncIterator conformance.

Since I don’t want to overcomplicate my code, I will not write my own custom async iterator. I’ll use the provided AsyncStream to yield values asynchronously and just wrap it in my own CombineAsyncStream!

To do this I’ll need to add two things to CombineAsyncStream - a stream instance and its iterator:

1
2
private let stream: AsyncThrowingStream<Upstream.Output, Error>
private lazy var iterator = stream.makeAsyncIterator()

I’ll let AsyncStream create the iterator and I will just proxy to it from my own type. I’ll add an extension so I can conform CombineAsyncStream to AsyncIteratorProtocol and have it be my iterator too:

1
2
3
4
5
extension CombineAsyncStream: AsyncIteratorProtocol {
  public func next() async throws -> Upstream.Output? {
    return try await iterator.next()
  }
}

OK, that was a lot of scaffolding but now I’m finally ready to add the “meat” to CombineAsyncStream - subscribing the upstream publisher and proxying its elements as an async sequence.

Subscribing the upstream

I’ll add an initializer that takes a publisher and wires up my underlaying async stream. I’ve broken down this into steps - first, let’s add the basics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private var cancellable: AnyCancellable?

public init(_ upstream: Upstream) {
  var subscription: AnyCancellable? = nil
  
  stream = AsyncThrowingStream<Upstream.Output, Error>
    (Upstream.Output.self) { continuation in

  }
  
  cancellable = subscription
}

The sequence subscribes the publisher and keeps the subscription alive via storing the token in cancellable. In init(_:) I create an AsyncThrowingStream and, for the moment, leave its closure empty but this is where I’ll subscribe the publisher and emit values.

Note how AsyncThrowingStream is generic over Upstream.Output to set what types of values it’ll emit. Currently you cannot specify the error type of AsyncThrowingStream (maybe something that’ll change later?) so you always need to set Error as the generic error type.

Let’s add some code in the closure:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
subscription = upstream
.handleEvents(
  receiveCancel: {
    continuation.finish(throwing: nil)
  }
)
.sink(receiveCompletion: { completion in
  switch completion {
    case .failure(let error): 
      continuation.finish(throwing: error)
    case .finished: continuation.finish(throwing: nil)
  }
}, receiveValue: { value in
  continuation.yield(value)
})

I’m subscribing the upstream publisher and proxy all events by using the async stream’s continuation:

  • a publisher cancellation maps to iterator completion.
  • a publisher error maps to the iterator throwing.
  • a publisher completion maps to iterator completion.
  • a publisher emitting maps to the iterator yielding a value.

At this point I don’t have any errors in Xcode so my async sequence is ready to use!

The asyncStream() operator

To give CombineAsyncStream a try I’ll create a simple operator that’ll help me easily proxy publishers to streams:

1
2
3
4
5
extension Publisher {
  func asyncStream() -> CombineAsyncStream<Self> {
    return CombineAsyncStream(self)
  }
}

Alright! Let’s do it!

I’m going to try some of the code in a simple iOS project. For fun let’s first run this:

1
2
3
4
5
6
for try await tick in Timer
      .publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .asyncStream() {
  print(tick)
}

Notice you need to use the new await keyword in the for loop to let the compiler know you’re iterating over an asynchronous sequence. Additionally, since CombineAsyncStream could throw, you need to use try as well.

Note: In other words you need to wrap this code in do {} catch {} or make the container method throwing. I opted for the latter to minimize the code samples in here.

I have to say - this code really, really looks similar to what I’d usually write with Combine, if you consider the loop body to be the subscription’s sink.

However, looking at this fairly simple example we can see (at least) three big wins:

  1. You don’t need to use a closure like with sink(...) - therefore there’s no need to capture self or other values.
  2. You don’t need to keep a cancellable around - the sequence (and the publisher) are automatically released when the for loop has finished.
  3. And, finally, you don’t need to write your own error handling like with Combine. You can catch errors as usual, let the errors traverse the call hierarchy if your method is throwing, or even just crash by using a try!.

OH… MY… GOD!

All that is pretty big to me. It’s insanely good. In fact.

Note: OK, at this point I think I need to write a bit more briefly because I feel like I have way too much to say about all these new features.

Let’s run the app in the Simulator - we get our ticks printed out in the console!

Ticks printed in the Xcode console

Let’s do another one! How about getting the screen orientation via NotificationCenter?

1
2
3
4
5
6
let orientationStream = NotificationCenter
   .default
   .publisher(for: UIDevice.orientationDidChangeNotification)
   .map { _ in UIDevice.current.orientation.isLandscape }
   .removeDuplicates()
   .asyncStream()

I get a publisher out of the UIDevice.orientationDidChangeNotification notification, remove duplicates, and map the values to a Bool which shows if I’m in landscape orientation or not.

Next, I can iterate over that stream like a bauss:

1
2
3
for try await orientation in orientationStream {
  print("Landscape: \(orientation)")
}

Rotating the iPhone simulator prints series of true and false values like so:

Series of true and false values printed in Xcode’s console

Cancelling the async iterator

One cool new feature this year is that you can use the new task() view modifier in SwiftUI to run async code when your view is being added on screen. Therefore to continuously display the screen orientation (from above) you can simply do:

1
2
3
4
5
6
Text(myText)
  .task {
    for try await orientation in orientationStream {
      myText = orientation
    }
  }

And best of all, when the view is removed from screen - that will also automatically cancel the for loop so you don’t need to do anything extra yourself to cancel the iterator!

Good job SwiftUI! But how about if you need to cancel the sequence when using UIKit or in your view model?

Let’s add cancellability to CombineAsyncStream!

I am (anyways) keeping around the publisher cancellable - why not add a method that calls cancel() on it? Additionally, I’ll add a debug log in my deinit() to make sure I’m indeed cleanly exiting my iterator:

1
2
3
4
5
6
7
8
func cancel() {
  cancellable?.cancel()
  cancellable = nil
}

deinit {
  print("[Stream Deinit]")
}

Ta-da! Now I have a way to cancel the iterator which will effectively exit my for loop.

To try this last piece of code, I’ll iterate over a timer and cancel the stream few seconds later:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Create a Timer async stream.
let stream = Timer
  .publish(every: 1.0, tolerance: 1.0, on: .main, in: .common)
  .autoconnect()
  .asyncStream()

// Cancel stream after 5 seconds.
Task.detached {
  await Task.sleep(5_000_000_000)
  stream.cancel()
}

// Print timer ticks to the console.
for try await tick in stream {
  print("\(tick.formatted(date: .omitted, time: .standard))")
}

// Print when the for loop completes
print("Completed.")

I’m running the app one last time and the output in the console is:

Five entries printed to the console, followed by a completion text.

  • The for loop prints five times to the console.
  • The line after the loop prints “Completed”.
  • Finally, the sequence is released and deinit also prints.

The complete code

Getting here was somewhat convoluted but the final code isn’t all that complex:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class CombineAsyncStream<Upstream: Publisher>: AsyncSequence {
  typealias Element = Upstream.Output
  typealias AsyncIterator = CombineAsyncStream<Upstream>
  
  func makeAsyncIterator() -> Self {
    return self
  }

  private let stream: 
    AsyncThrowingStream<Upstream.Output, Error>

  private lazy var iterator = stream.makeAsyncIterator()
  
  private var cancellable: AnyCancellable?
  public init(_ upstream: Upstream) {
    var subscription: AnyCancellable? = nil
    
    stream = AsyncThrowingStream<Upstream.Output, Error>
    (Upstream.Output.self) { continuation in
      subscription = upstream
        .handleEvents(
          receiveCancel: {
            continuation.finish(throwing: nil)
          }
        )
        .sink(receiveCompletion: { completion in
          switch completion {
            case .failure(let error): 
              continuation.finish(throwing: error)
            case .finished: continuation.finish(throwing: nil)
          }
        }, receiveValue: { value in
          continuation.yield(value)
        })
    }
    
    cancellable = subscription
  }
  
  func cancel() {
    cancellable?.cancel()
    cancellable = nil
  }
}

extension CombineAsyncStream: AsyncIteratorProtocol {
  public func next() async throws -> Upstream.Output? {
    return try await iterator.next()
  }
}

extension Publisher {
  func asyncStream() -> CombineAsyncStream<Self> {
    return CombineAsyncStream(self)
  }
}

Where to go from here?

Is this the ultimate Combine to AsyncSequence converter? Hardly. There are publishers that buffer, others meant to be shared, further others that replay etc, etc. I didn’t have a look into all publishers but it looks like getting a universal proxy together should be possible.

The question on my mind is - what else could we do in the field of Combine - AsyncSequence coop? Converting from AsyncSequence to Combine sounds easy enough. Also, it seems that creating operator alternatives on AsyncSequence is likely to be fairly easy too.

I’m really excited to look more into these new Swift features. I hope you are too.

Hit me up with your ideas, replies, and feedback on Twitter at https://twitter.com/icanzilb.