Part 2 of a three-part post in which we explain how to write event-dependent unit tests for business logic using the RxTest framework.

  1. Introduction to reactive LightSwitch app [Part 1]
    Writing state-dependent unit tests for business logic
    XCTest and RxBlocking
  2. Writing event-dependent unit tests for business logic [Github]
    XCTest and RxTest
  3. Automating writing mocks and lenses [Part 3]
    Making unit tests more readable

RxBlocking vs RxTest

RxBlocking is a great tool that you will use 80% of the time, but it just falls slightly short for the remaining 20%.

Two main problems we encountered when using RxBlocking:

  • You have no control over when events are sent and in what order. All values that you send have to be sent at the moment of subscription.
  • You can only assert against what you received, but not when you received it.

We sometimes want to test how our output changes when we get different events at different times.

To clarify, let’s go through two simple examples:

Example 1

There is a function foo() that somewhere in its body contains .onNext(1) call for some PublishSubject. You are told to test that .onNext(1) is indeed called when calling the function. Seems rather simple.
You turn to your old friend RxBlocking and write something like this:

func testOnNextCalled() throws {


    XCTAssertEqual(try publishSubject.toBlocking().first(), 1)

Can you figure out why this test won’t work?

We are calling the function foo() (and with it onNext(1)) before calling toBlocking().first(). In our assertion, we will wait for onNext(1) call that already happened and this test would never finish.

Then you might think, why not just call toBlocking().first() before calling the function:

func testOnNextCalled() throws {

    let result = try publishSubject.toBlocking().first()


    XCTAssertEqual(result, 1)

But then you are forgetting that toBlocking().first() blocks the current thread and foo() will never get called. We again end up with a test that never finishes.

We have to find a way to start listening for incoming events and not block the current thread.
Later I’ll show you how to solve this problem with RxTest.

Example 2

One area where RxTest really shines is the precise control of input events. We specify exactly when we want events to be emitted and how many times. We can even simulate time differences between each event. This is great when you know your code will contain time or order sensitive Rx chains, like with the usage of zip, combineLatest, timer, interval, flatMapLatest, etc.

The first function we tested in Part 1 was queryLightsWithState.

func queryLightsWithState() -> Observable<[LightWithState]>

We made sure it works correctly when we get both LightModel and LightStateModel from the repository at the same time. We’ll show how to write tests where one observer is receiving events much later than another observer.

Intro to RxTest

Schedulers are an Rx way of making tasks run on specific queues. You probably used .observeOn(MainScheduler.instance) quite a lot when you wanted to update the UI on the main thread. Another common scheduler is ConcurrentDispatchQueueScheduler for when you need to perform background work.

RxTest comes with a new scheduler that will be the workhorse for all our tests – TestScheduler.

It allows us to

  • schedule tasks to be run at specific virtual times and convert regular time into virtual times.
  • create Cold or Hot TestableObservable which emits events at specific virtual times.
  • create a TestableObserver that records an element and virtual time of each event.

Cold vs Hot Observables

Cold observables start emitting values only after some observer subscribes to them. They usually have some extra work tied to them that gets done with each subscription. One example is an API call or even a simple Observable.of(1).

Hot observables emit values even if there are no observers subscribed to them. An example would be an Observable that emits values on each tap of the button.

Virtual time

Instead of using regular time, RxTest uses VirtualTimeUnit that represents ticks of a virtual clock.

Using virtual time enables you to test timers, intervals, and long times between events without actually waiting for that time to pass.

To do this, TestScheduler has to have complete control over the Rx environment, meaning that often you need to inject it as the main scheduler those operators are being run on.

In the end, I’ll show a couple of common scenarios where the injection of TestScheduler is necessary.

Writing event dependent unit tests for business logic with XCTest and RxTest

We are ready to start writing some tests. Let’s go through those two examples from the beginning.

Example 1 Solution

To recap, .onNext(1) is called on PublishSubject each time function foo() is called. RxBlocking couldn’t help us here, but luckily we have RxTest.

func testOnNextCalled() throws {

    // 1
    let scheduler = TestScheduler(initialClock: 0, resolution: 1)
    let disposeBag = DisposeBag()

    // 2
    scheduler.scheduleAt(10) {
    // 3
    let observer = scheduler
                         disposeBag: disposeBag)

    // 4

    // 5
    expect([.next(10, 1)]))
  1. We initialized TestScheduler with the initial clock set to 0. This is the initial virtual time for our scheduler. We’ll explain the resolution in the next step.
  2. The scheduler will call foo() at exactly 10 virtual time units. Remember, these are not seconds, milliseconds, or any real-time value. It’s completely arbitrary.

    You might be asking the same questions I was asking when I tried this out. What happens if I put 10000 instead of 10? Will the test take longer? When does it matter to put a higher amount instead of lower?

    If you set 10000 instead of 10 your test would take the same amount of time to finish. The scheduler will just say it was run at virtual time 10000.
    But if your code had a timer that disposes of each subscription after 60 seconds, the first thing you would need to do is make the timer run on TestScheduler. If we inject the TestScheduler into the timer, 60 seconds become 60 virtual time units.
    Now if you set some number higher than 60 in your scheduleAt(), the test would fail because the subscription was disposed of before the observer received the event.
    That is the power of TestScheduler, it made our 60-second timer instantaneous.

    Then again you might ask if 60 seconds is 60 virtual time units, what is 60 milliseconds? The answer may surprise you, it’s 1 virtual time unit (by default).
    The formula for transforming seconds into a virtual time unit is ceil(seconds / resolution).
    In our case where we set resolution = 1, anything lower than 1 second will round up to 1 virtual time unit.
    To fix this you would need to set the resolution to 0.01 which would transform 60ms to 6 virtual time units.

    Conclusion: You only need to worry about resolution and specific numbers if you are using regular time somewhere in the code you are testing. In all the other cases, put your favourite number, it doesn’t matter.
  3. record() is a generic helper function that creates a TestableObserver, uses it to subscribe to some Observable, and then returns the TestableObserver. A reminder that TestableObserver records all received elements with their virtual time.
extension TestScheduler {

    Creates a `TestableObserver` instance which immediately subscribes to the `source`
   func record<O: ObservableConvertibleType>(
       _ source: O,
       disposeBag: DisposeBag
   ) -> TestableObserver<O.Element> {
       let observer = self.createObserver(O.Element.self)
           .bind(to: observer)
           .disposed(by: disposeBag)
       return observer

  1. The scheduler will start doing scheduled tasks.
  2. To access elements and their virtual times from a TestableObserver, you use .events. There is only one event that we are expecting, and it should contain integer 1 at 10 virtual time units.

Example 2 Solution

The solution for this example is one that we will use in our LightSwitch app, so this time let’s setup the entire test suite first.

Setup for testing with RxTest looks a little different than with RxBlocking. To separate RxBlocking tests from the RxTest tests, we’ll create a new file and class LightsUseCaseEventsTests.

import XCTest
import RxSwift
import RxTest
import RxCocoa
@testable import LightSwitch
class LightsUseCaseEventsTests: XCTestCase {

   private var lightsUseCase: LightsUseCaseProtocol!
   private var lightsRepository: LightsRepositoryProtocolMock!
   private var lightModelSubject: PublishSubject<[LightModel]>!
   private var lightStateModelSubject: PublishSubject<[LightStateModel]>!
   private var scheduler: TestScheduler!
   private var disposeBag: DisposeBag!

   override func setUpWithError() throws {
       try super.setUpWithError()
       lightsRepository = LightsRepositoryProtocolMock()
       lightsUseCase = LightsUseCase(lightsRepository: lightsRepository)
       lightModelSubject = PublishSubject<[LightModel]>()
       lightStateModelSubject = PublishSubject<[LightStateModel]>()
       scheduler = TestScheduler(initialClock: 0)
       disposeBag = DisposeBag()

Notice we imported RxTest and RxCocoa. We then added two PublishSubject`s and a TestScheduler. Two PublishSubject`s will help us deliver our events after our TestableObserver subscribes to them. Initialization of TestScheduler is the same as before, but now we are placing it in our setUpWithError() function.

Setup is finished, let’s test queryLightsWithState using RxTest and explain each line. The purpose of our test is to show how to use cold observables to emit LightModel before LightStateModel, hence the name testQueryLightsWithState_When_Model_Before_State.

func testQueryLightsWithState_When_Model_Before_State() throws {
   // 1
       .queryAllLightsReturnValue = lightModelSubject
       .queryAllLightStatesReturnValue = lightStateModelSubject
   // 2
   let lightModels = [LightModel].stub(withCount: 3)
   let lightStateModels = [LightStateModel].stub(withCount: 3)

   // 3
   let numberOfLightsWithStates = self.lightsUseCase
       .map { $0.count }

   // 4
       .createColdObservable([.next(10, lightModels)])
       .bind(to: lightModelSubject)
       .disposed(by: disposeBag)

       .createColdObservable([.next(20, lightStateModels)])
       .bind(to: lightStateModelSubject)
       .disposed(by: disposeBag)

   // 5
   let result = scheduler
                        disposeBag: self.disposeBag)

   // 6

   // 7
   XCTAssertEqual(, [.next(20, 3)])
  1. Instead of returning .just() like when we were creating state-dependent tests, we are going to assign our PublishSubject`s as the return value. This way we keep the subscription alive to send our data whenever we want.
  2. Creating our models with our handy extensions (check Part 1 for those).
  3. Assigning our testable Rx chain to a constant. This is just to clean up our test.
  4. Creating a cold observable with [.next(10, lightModels)] parameter. 10 represents virtual time unit, and lightModels is the value we are emitting.
    We then bind emitted values to lightModelSubject, which is also the reason why we needed to import RxCocoa.
    To sum up: Emit .onNext(lightModels) on lightModelSubject, but fake it like we sent it 10 units of virtual time after subscription. Then emit .onNext(lightStateModels) on lightStateModelSubject with time of 20 units.
  5. As before, creating a TestableObserver and subscribing it to numberOfLightsWithStates.
  6. We tell the scheduler to start doing scheduled tasks.
  7. The output we are interested in is contained in We know that if we sent three LightModel objects at 10, and three LightStateModel objects at 20, that we should have three LightWithState objects at 20.

Similarly, you would test different scenarios by just playing with events and timings.

Dependency injection of TestScheduler

I promised to give examples of when you need to inject your TestScheduler, and I already hinted at one scenario with timers, but let’s put it all in one place.

Timer operators

Whenever the Rx chain you are testing contains interval, timer, or timeouts. You don’t want your test to depend on actual seconds or milliseconds of those operators. TestScheduler will happily convert them to virtual time units, and your tests will finish with lighting speed.

SubscribeOn and ObserveOn

If you are transferring parts of your code to specific schedulers with the usage of subscribeOn and observeOn operators, you will have to make them all run on TestScheduler. Asynchronous parts of your code can cause you a real headache during testing, it’s better to keep everything in sync.


Here you can find a finished implementation of what we have done so far.

We learned how and when to write tests with the popular RxTest framework. This is actually the end of us writing tests, but there’s so much more we can do to improve upon them in Part 3. 

If you have any questions regarding writing unit tests with RxTest, feel free to leave them in the comments.

Part 3 will include automating writing mocks and lenses with Sourcery and making unit tests more readable with Quick/Nimble.