Let’s start with a riddle. Consider this situation: You are developing a News app which consists of a list of the hottest news and a refresh button with which the user can pull the latest news from the network.

Here is a small code snippet that looks pretty simple and should work, but it doesn’t. Take a look at it and try to guess what will actually happen when the user presses the refreshButton.


class MainActivity : AppCompatActivity() {

   private val refreshProcessor = PublishProcessor.create()

   override fun onCreate(savedInstanceState: Bundle?) {

       refreshButton.setOnClickListener {

             refreshProcessor.onNext(Unit)

       }

       refreshProcessor

               .flatMapSingle(networkRequest::fetchData)

               .subscribeOn(backgroundScheduler)

               .observeOn(mainScheduler)

               .subscribe(view::render)

   }

}

If you guessed a NetworkOnMainThreadException would occur, then you are correct.

Even though we added .subscribeOn(backgroundThread) that is not enough. SubscribeOn operator only switches the subscribing process (we will talk about it later) to the desired thread, but that doesn’t mean the items will be emitted on that thread.

Your RxChain will execute on the thread on which onNext was called, and in our case, that is the main thread because onClickListeners get notified on the main thread.

The worst thing about this is that in most cases your app won’t break like in our example, but the domain logic will execute on the main thread which may lead to dropping frames on the UI and bad user experience.

 

How to tackle this?

There is no universal solution to this issue. The developer needs to evaluate every one of these issues and find the best solution for the specific problem. We were aware of this but didn’t want to debug every possible RxChain in our app to find out which ones are the problematic ones.

We wanted our app to “complain” if something like this ever happens and when I say “complain” I mean for it to break in DEBUG builds and to log it in RELEASE builds.

After giving it some thought on how to achieve this, we concluded that if we want to switch to the main thread that means that we already don’t want to be on the main thread.

This is easily achievable with the .compose() operator with which we would check that the current thread is not the main thread before switching to it. Even though this would work, we didn’t want to have any extra overhead of operators. Also, we would need to manually switch all current .observeOn(mainScheduler) operators in our codebase with the composed one and to remember to use it in the future. That is a tedious process, and we don’t want to think about it all the time. We wanted a solution that works with a minimum impact on the current project.

How to tackle it better?

Because we inject our schedulers with dependency injection, we thought about creating our mainScheduler which will check that the current thread is not the main thread before switching to the main thread.

It was too much work, so we wrapped the existing one 🙂


class OnRescheduleNotifyMainScheduler : Scheduler() {

   private val mainScheduler = AndroidSchedulers.mainThread()

   override fun createWorker() = object : Worker() {

       private val worker = mainScheduler.createWorker()

       override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {

           if (Looper.myLooper() == Looper.getMainLooper()) {

               logOrError()

           }

           return worker.schedule(run, delay, unit)

       }

       override fun dispose() = worker.dispose()

       override fun isDisposed() = worker.isDisposed

   }

}

We provided our OnRescheduleNotifyMainScheduler with dependency injection instead of the original mainScheduler, and it worked as it should… in most cases. For some odd reason, it reported that we are on the main thread even though we weren’t. It didn’t make sense, and it looked like it was an implementation detail of RxJava that is bothering us.

How does the subscribing process look like under the RxJava hood?

We started debugging RxJava with some simple Flowable flows to understand how it works internally. Here is an example of one:


       publishProcessor

               .filter(this::isValid)

               .map(this::toViewModel)

               .doOnNext(this::log)

   .observeOn(mainScheduler)

               .subscribe(view::render)

And this is what we’ve learned: When you call .subscribe() then the subscription process is started which consists of 3 steps:

  • subscribing to the upstream
  • notifying onSubscribed() the downstream
  • requesting for items from the upstream

It looks like this:

We noticed that when the .observeOn() operator requests for items, it schedules a task on the provided scheduler (in our case it is the OnRescheduleNotifyMainScheduler) but from the current thread, which is also the main thread because we don’t have any .subscribeOn(backgroundScheduler) in our RxChain.

What if we add .subscribeOn() to the flow?

When we added it, it worked as it should. Makes sense, but why do some of our RxChains work fine and others are reported as issues even though they also have .subscribeOn(backgroundScheduler)? What’s the difference?



We added the .subscribeOn() operator to the middle of our chain and started debugging again.

publishProcessor

               .filter(this::isValid)

               .map(this::toViewModel)

   .subscribeOn(backgroundScheduler)

               .doOnNext(this::log)

   .observeOn(mainScheduler)

               .subscribe(view::render)

One thing we noticed that the position of the .subscribeOn() dictates if our custom scheduler will work or not. It the .subscribeOn() was before .observeOn() then the scheduler would not work ok, and if it were below, then everything would be fine.

We dug deeper and came to this conclusion. SubscribeOn operator only switches the subscribing process to the desired thread, but that doesn’t mean the items will be emitted on that thread. We already said that the subscribing process consists of three steps, and the .subscribeOn() operator switches that three steps to the provided thread. It acts as if it is the last in the chain and when someone subscribes to it, it immediately calls onSubscribed() on the downstream. It subscribes to the upstream when the downstream requests items from it, and that subscribe() method call is on the provided thread.

What did we learn?

It actually makes sense to put your .subscribeOn() as close to the .subscribe() as you can. Ideally just above it, because then the whole subscribing process is done on the provided thread and not on the main thread. Also, then our OnRescheduleNotifyMainScheduler works fine and “complains” if we are doing some extra work on the main thread which we don’t want to.

0 comments