Need help with combine-schedulers?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

pointfreeco
355 Stars 46 Forks MIT License 39 Commits 0 Opened issues

Description

⏰ A few schedulers that make working with Combine more testable and more versatile.

Services available

!
?

Need anything else?

Contributors list

⏰ Combine Schedulers

CI

A few schedulers that make working with Combine more testable and more versatile.

Motivation

The Combine framework provides the

Scheduler
protocol, which is a powerful abstraction for describing how and when units of work are executed. It unifies many disparate ways of executing work, such as
DispatchQueue
,
RunLoop
and
OperationQueue
.

However, the moment you use any of these schedulers in your reactive code you instantly make the publisher asynchronous and therefore much harder to test, forcing you to use expectations and waits for time to pass as your publisher executes.

This library provides new schedulers that allow you to turn any asynchronous publisher into a synchronous one for ease of testing and debugging.

Learn More

This library was designed over the course of many episodes on Point-Free, a video series exploring functional programming and Swift hosted by Brandon Williams and Stephen Celis.

You can watch all of the episodes here.

video poster image

AnyScheduler

The

AnyScheduler
provides a type-erasing wrapper for the
Scheduler
protocol, which can be useful for being generic over many types of schedulers without needing to actually introduce a generic to your code. The Combine framework ships with many type-erasing wrappers, such as
AnySubscriber
,
AnyPublisher
and
AnyCancellable
, yet for some reason does not ship with
AnyScheduler
.

This type is useful for times that you want to be able to customize the scheduler used in some code from the outside, but you don't want to introduce a generic to make it customizable. For example, suppose you have an

ObservableObject
view model that performs an API request when a method is called:
class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

let apiClient: ApiClient

init(apiClient: ApiClient) { self.apiClient = apiClient }

func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: DispatchQueue.main) .assign(to: &self.$episode) } }

Notice that we are using

DispatchQueue.main
in the
reloadButtonTapped
method because the
fetchEpisode
endpoint most likely delivers its output on a background thread (as is the case with
URLSession
).

This code seems innocent enough, but the presence of

.receive(on: DispatchQueue.main)
makes this code harder to test since you have to use
XCTest
expectations to explicitly wait a small amount of time for the queue to execute. This can lead to flakiness in tests and make test suites take longer to execute than necessary.

One way to fix this testing problem is to use an "immediate" scheduler instead of

DispatchQueue.main
, which will cause
fetchEpisode
to deliver its output as soon as possible with no thread hops. In order to allow for this we would need to inject a scheduler into our view model so that we can control it from the outside:
class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

let apiClient: ApiClient let scheduler: S

init(apiClient: ApiClient, scheduler: S) { self.apiClient = apiClient self.scheduler = scheduler }

func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.scheduler) .assign(to: &self.$episode) } }

Now we can initialize this view model in production by using

DispatchQueue.main
and we can initialize it in tests using
DispatchQueue.immediate
. Sounds like a win!

However, introducing this generic to our view model is quite heavyweight as it is loudly announcing to the outside world that this type uses a scheduler, and worse it will end up infecting any code that touches this view model that also wants to be testable. For example, any view that uses this view model will need to introduce a generic if it wants to also be able to control the scheduler, which would be useful if we wanted to write snapshot tests.

Instead of introducing a generic to allow for substituting in different schedulers we can use

AnyScheduler
. It allows us to be somewhat generic in the scheduler, but without actually introducing a generic.

Instead of holding a generic scheduler in our view model we can say that we only want a scheduler whose associated types match that of

DispatchQueue
:
class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

let apiClient: ApiClient let scheduler: AnySchedulerOf

init(apiClient: ApiClient, scheduler: AnySchedulerOf) { self.apiClient = apiClient self.scheduler = scheduler }

func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.scheduler) .assign(to: &self.$episode) } }

Then, in production we can create a view model that uses a live

DispatchQueue
, but we just have to first erase its type:
let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: DispatchQueue.main.eraseToAnyScheduler()
)

For common schedulers, like

DispatchQueue
,
OperationQueue
, and
RunLoop
, there is even a static helper on
AnyScheduler
that further simplifys this:
let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: .main
)

Then in tests we can use an immediate scheduler:

let viewModel = EpisodeViewModel(
  apiClient: ...,
  scheduler: .immediate
)

So, in general,

AnyScheduler
is great for allowing one to control what scheduler is used in classes, functions, etc. without needing to introduce a generic, which can help simplify the code and reduce implementation details from leaking out.

TestScheduler

A scheduler whose current time and execution can be controlled in a deterministic manner. This scheduler is useful for testing how the flow of time effects publishers that use asynchronous operators, such as

debounce
,
throttle
,
delay
,
timeout
,
receive(on:)
,
subscribe(on:)
and more.

For example, consider the following

race
operator that runs two futures in parallel, but only emits the first one that completes:
func race(
  _ first: Future,
  _ second: Future
) -> AnyPublisher {
  first
    .merge(with: second)
    .prefix(1)
    .eraseToAnyPublisher()
}

Although this publisher is quite simple we may still want to write some tests for it.

To do this we can create a test scheduler and create two futures, one that emits after a second and one that emits after two seconds:

let scheduler = DispatchQueue.test

let first = Future { callback in scheduler.schedule(after: scheduler.now.advanced(by: 1)) { callback(.success(1)) } } let second = Future { callback in scheduler.schedule(after: scheduler.now.advanced(by: 2)) { callback(.success(2)) } }

And then we can race these futures and collect their emissions into an array:

var output: [Int] = []
let cancellable = race(first, second).sink { output.append($0) }

And then we can deterministically move time forward in the scheduler to see how the publisher emits. We can start by moving time forward by one second:

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

This proves that we get the first emission from the publisher since one second of time has passed. If we further advance by one more second we can prove that we do not get anymore emissions:

scheduler.advance(by: 1)
XCTAssertEqual(output, [1])

This is a very simple example of how to control the flow of time with the test scheduler, but this technique can be used to test any publisher that involves Combine's asynchronous operations.

ImmediateScheduler

The Combine framework comes with an

ImmediateScheduler
type of its own, but it defines all new types for the associated types of
SchedulerTimeType
and
SchedulerOptions
. This means you cannot easily swap between a live
DispatchQueue
and an "immediate"
DispatchQueue
that executes work synchronously. The only way to do that would be to introduce generics to any code making use of that scheduler, which can become unwieldy.

So, instead, this library's

ImmediateScheduler
uses the same associated types as an existing scheduler, which means you can use
DispatchQueue.immediate
to have a scheduler that looks like a dispatch queue but executes its work immediately. Similarly you can construct
RunLoop.immediate
and
OperationQueue.immediate
.

This scheduler is useful for writing tests against publishers that use asynchrony operators, such as

receive(on:)
,
subscribe(on:)
and others, because it forces the publisher to emit immediately rather than needing to wait for thread hops or delays using
XCTestExpectation
.

This scheduler is different from

TestScheduler
in that you cannot explicitly control how time flows through your publisher, but rather you are instantly collapsing time into a single point.

As a basic example, suppose you have a view model that loads some data after waiting for 10 seconds from when a button is tapped:

class HomeViewModel: ObservableObject {
  @Published var episodes: [Episode]?

let apiClient: ApiClient

init(apiClient: ApiClient) { self.apiClient = apiClient }

func reloadButtonTapped() { Just(()) .delay(for: .seconds(10), scheduler: DispachQueue.main) .flatMap { apiClient.fetchEpisodes() } .assign(to: &self.$episodes) } }

In order to test this code you would literally need to wait 10 seconds for the publisher to emit:

func testViewModel() {
  let viewModel = HomeViewModel(apiClient: .mock)

viewModel.reloadButtonTapped()

_ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10)

XCTAssert(viewModel.episodes, [Episode(id: 42)]) }

Alternatively, we can explicitly pass a scheduler into the view model initializer so that it can be controller from the outside:

class HomeViewModel: ObservableObject {
  @Published var episodes: [Episode]?

let apiClient: ApiClient let scheduler: AnySchedulerOf

init(apiClient: ApiClient, scheduler: AnySchedulerOf) { self.apiClient = apiClient self.scheduler = scheduler }

func reloadButtonTapped() { Just(()) .delay(for: .seconds(10), scheduler: self.scheduler) .flatMap { self.apiClient.fetchEpisodes() } .assign(to: &self.$episodes) } }

And then in tests use an immediate scheduler:

func testViewModel() {
  let viewModel = HomeViewModel(
    apiClient: .mock,
    scheduler: .immediate
  )

viewModel.reloadButtonTapped()

// No more waiting...

XCTAssert(viewModel.episodes, [Episode(id: 42)]) }

Animated schedulers

CombineSchedulers comes with helpers that aid in asynchronous animations in both SwiftUI and UIKit.

If a SwiftUI state mutation should be animated, you can invoke the

animation
and
transaction
methods to transform an existing scheduler into one that schedules its actions with an animation or in a transaction. These APIs mirror SwiftUI's
withAnimation
and
withTransaction
functions, which are invoked by the animated scheduler.

For example, to animate an API response in your view model, you can specify that the scheduler that receives this state should be animated:

self.apiClient.fetchEpisode()
  .receive(on: self.scheduler.animation())
  .assign(to: &self.$episode)

If you are powering a UIKit feature with Combine, you can use the

.animate
method, which mirrors
UIView.animate
:
self.apiClient.fetchEpisode()
  .receive(on: self.scheduler.animate(withDuration: 0.3))
  .assign(to: &self.$episode)

FailingScheduler

A scheduler that causes a test to fail if it is used.

This scheduler can provide an additional layer of certainty that a tested code path does not require the use of a scheduler.

As a view model becomes more complex, only some of its logic may require a scheduler. When writing unit tests for any logic that does not require a scheduler, one should provide a failing scheduler, instead. This documents, directly in the test, that the feature does not use a scheduler. If it did, or ever does in the future, the test will fail.

For example, the following view model has a couple responsibilities:

class EpisodeViewModel: ObservableObject {
  @Published var episode: Episode?

let apiClient: ApiClient let mainQueue: AnySchedulerOf

init(apiClient: ApiClient, mainQueue: AnySchedulerOf) { self.apiClient = apiClient self.mainQueue = mainQueue }

func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.mainQueue) .assign(to: &self.$episode) }

func favoriteButtonTapped() { self.episode?.isFavorite.toggle() } }

  • It lets the user tap a button to refresh some episode data
  • It lets the user toggle if the episode is one of their favorites

The API client delivers the episode on a background queue, so the view model must receive it on its main queue before mutating its state.

Tapping the favorite button, however, involves no scheduling. This means that a test can be written with a failing scheduler:

func testFavoriteButton() {
  let viewModel = EpisodeViewModel(
    apiClient: .mock,
    mainQueue: .failing
  )
  viewModel.episode = .mock

viewModel.favoriteButtonTapped() XCTAssert(viewModel.episode?.isFavorite == true)

viewModel.favoriteButtonTapped() XCTAssert(viewModel.episode?.isFavorite == false) }

With

.failing
, this test strongly declares that favoriting an episode does not need a scheduler to do the job, which means it is reasonable to assume that the feature is simple and does not involve any asynchrony.

In the future, should favoriting an episode fire off an API request that involves a scheduler, this test will begin to fail, which is a good thing! This will force us to address the complexity that was introduced. Had we used any other scheduler, it would quietly receive this additional work and the test would continue to pass.

UIScheduler

A scheduler that executes its work on the main queue as soon as possible. This scheduler is inspired by the equivalent scheduler in the ReactiveSwift project.

If

UIScheduler.shared.schedule
is invoked from the main thread then the unit of work will be performed immediately. This is in contrast to
DispatchQueue.main.schedule
, which will incur a thread hop before executing since it uses
DispatchQueue.main.async
under the hood.

This scheduler can be useful for situations where you need work executed as quickly as possible on the main thread, and for which a thread hop would be problematic, such as when performing animations.

Publishers.Timer

A publisher that emits a scheduler's current time on a repeating interval.

This publisher is an alternative to Foundation's

Timer.publisher
, with its primary difference being that it allows you to use any scheduler for the timer, not just
RunLoop
. This is useful because the
RunLoop
scheduler is not testable in the sense that if you want to write tests against a publisher that makes use of
Timer.publisher
you must explicitly wait for time to pass in order to get emissions. This is likely to lead to fragile tests and greatly bloat the time your tests take to execute.

It can be used much like Foundation's timer, except you specify a scheduler rather than a run loop:

Publishers.Timer(every: .seconds(1), scheduler: DispatchQueue.main)
  .autoconnect()
  .sink { print("Timer", $0) }

Alternatively you can call the

timerPublisher
method on a scheduler in order to derive a repeating timer on that scheduler:
DispatchQueue.main.timerPublisher(every: .seconds(1))
  .autoconnect()
  .sink { print("Timer", $0) }

But the best part of this timer is that you can use it with

TestScheduler
so that any Combine code you write involving timers becomes more testable. This shows how we can easily simulate the idea of moving time forward 1,000 seconds in a timer:
let scheduler = DispatchQueue.test
var output: [Int] = []

Publishers.Timer(every: 1, scheduler: scheduler) .autoconnect() .sink { _ in output.append(output.count) } .store(in: &self.cancellables)

XCTAssertEqual(output, [])

scheduler.advance(by: 1) XCTAssertEqual(output, [0])

scheduler.advance(by: 1) XCTAssertEqual(output, [0, 1])

scheduler.advance(by: 1_000) XCTAssertEqual(output, Array(0...1_001))

Compatibility

This library is compatible with iOS 13.2 and higher. Please note that there are bugs in the Combine framework and iOS 13.1 and lower that will cause crashes when trying to compare

DispatchQueue.SchedulerTimeType
values, which is an operation that the
TestScheduler
depends on.

Installation

You can add CombineSchedulers to an Xcode project by adding it as a package dependency.

  1. From the File menu, select Swift Packages › Add Package Dependency…
  2. Enter "https://github.com/pointfreeco/combine-schedulers" into the package repository URL text field
  3. Depending on how your project is structured:
    • If you have a single application target that needs access to the library, then add CombineSchedulers directly to your application.
    • If you want to use this library from multiple targets you must create a shared framework that depends on CombineSchedulers, and then depend on that framework from your other targets.

Documentation

The latest documentation for Combine Schedulers' APIs is available here.

Other Libraries

License

This library is released under the MIT license. See LICENSE for details.

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.