So you want to react-ify your Android app? Excellent! Welcome to the club! If
you’re like me you’ll start by using several of the
RxBindings libraries to turn those
ugly boiler plate Android UI callbacks into sexy new Observable
s. And if
you’re even more like me, after a while you’ll notice that RxBindings is missing
a few callbacks that you really really need.
So you decide to write one yourself! Mazel Tov! But, wait, you have no idea how
to even go about doing that… Let’s walk through some code to see how you can
create your own Observable
s from existing callbacks and some “gotchas” to look
out for.
Listener vs. RxJava
Let’s start with everyone’s favorite callback, View.OnClickListener
.
Here is the original non-Rx version:
view.setOnClickListener(new OnClickListener() {
@Override
public void onClick(View v) {
doSomething();
}
});
And here it is using RxJava & RxBindings:
RxView.clicks(view).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
doSomething();
}
});
RxView.clicks: The What and the How
Let’s walk through the RxView.clicks()
method to see what’s going on under the
hood.
@CheckResult @NonNull
public static Observable<Void> clicks(@NonNull View view) {
checkNotNull(view, "view == null");
return Observable.create(new ViewClickOnSubscribe(view));
}
First, we check that the View
we passed in isn’t null
. Even RxJava can’t
spare us from having to guard against null pointer exceptions.
Next, we create a new Observable
via the create
method. Unlike the
Observable.just()
or Observable.from()
methods, which allow you to create an
Observable
from just about any object, the create
method takes in an object
that implements the OnSubscribe
interface.
Tell me more about this OnSubscribe
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>>
The documentation for OnSubscribe
is short and to the point. It reads:
Invoked when Observable.subscribe is called.
Seems simple enough. To create an Observable
using Observable.create()
you
need an object that implements the OnSubscribe
interface. With this object,
you can write whatever sort of magic you want to be executed when subscribe
is
called on your Observable
. While that is all technically true, it’s not that
easy.
If you look at the documentation for create()
, you will see that the authors
had a good deal to say about how to implement the OnSubscribe
interface.
Write the function you pass to
{@code create}
so that it behaves as an Observable.
Which means that the function you create should accept a Subscriber<T>
and
invoke the Subscriber<T>
’s onNext
, onError
, and onCompleted
methods
appropriately.
Now that we have the requirements for implementing OnSubcribe
, let’s go back
to our View.OnClickListener
example and take a look at how RxBindings
implements it in the ViewClickOnSubscribe
class.
ViewClickOnSubscribe: An OnSubscribe Implementation
final class ViewClickOnSubscribe implements Observable.OnSubscribe<Void> {
final View view;
ViewClickOnSubscribe(View view) {
this.view = view;
}
@Override
public void call(final Subscriber<? super Void> subscriber) {
checkUiThread();
View.OnClickListener listener = new View.OnClickListener() {
@Override
public void onClick(View v) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null);
}
}
};
view.setOnClickListener(listener);
subscriber.add(new MainThreadSubscription() {
@Override
protected void onUnsubscribe() {
view.setOnClickListener(null);
}
});
}
}
See anything you recognize? Our old friend View.OnClickListener
! Here is where
we finally see the transformation from listener (callback) to Observable
. We
can also see all the requirements for OnSubscribe
we learned about earlier in
action!
But there are a few others things going on here. You might notice that this
class holds a strong reference to the View
that is being observed. So for
memory management it is important to unsubscribe to the Observable
to free the
reference.
But there’s something else going on here: there’s a good deal of talk about the main thread.
Threading And Observables
Observable
s can be “observed on” and “subscribed on” particular threads using
Scheduler
s. Sometimes you can manually set these threads via a Scheduler
and
sometimes it’s set for you. There is a metric ton of material out there
explaining all about threading and Observable
s in much greater detail than I
will go into. For now, let’s focus on how threads are used in
ViewClickOnSubscribe
.
First up, let’s look into the checkUiThread()
method. As stated in the View
class:
The entire view tree is single threaded. You must always be on the UI thread when calling any method on any view.
So we need to ensure the Observable
we create is subscribed to from the main
thread. Just as it is important to call subscribe()
on the Observable
on the
main thread, it is equally important that we call unSubscribe()
on the
Observable
on the main thread.
At the bottom of the class we see that ViewClickOnSubscribe
takes care of this
for us via a MainThreadSubscription
. This Subscription
ensures that the
unSubscribe()
call is executed on the main thread.
Lather, Rinse, Repeat
Voila! We have now gone from ugly callback to sexy Observable
! Before we go
forward let’s recap the basic steps towards creating an Observable
from a
listener.
- Create a class that implements
Observable.OnSubscribe<T>
whereT
is the type of object that will be passed in thesubscriber.onNext()
call. - In your implementation of
call(final Subscriber<? super T> subscriber)
, create an instance of the listener you are converting. - Make sure to use the
Subscriber
to callonNext
,onError
oronCompleted
in thecall()
method so your function “behaves like anObservable
.”
Easy peasy! What can go wrong?
As we saw in the ViewClickOnSubscribe
class, when you’re dealing with UI
related callbacks, we know for sure that all our work will happen on the main
thread. But what about when you’re not working with UI callbacks?
In the ViewClickOnSubscribe
class, when we set our internal listener to listen
for click events from the View, there is something happening in the background
that we need to pay close attention to. What thread is the callback being
triggered from? In this case, it is the main thread. But that might not always
be so obvious.
The thread that the callback is triggered from is very important. It affects how
our OnSubscribe
implementation will work. The Subscriber
passed in our
call()
method will receive the Observable
’s events (i.e. onNext()
,
onError()
, and onCompleted()
) on the thread from which the callback was
fired.
The object implementing the OnSubscribe
interface emits events on the thread
**from which the callback is fired.
So, why should I care?
I know I said I wouldn’t get into threading too much, but a little background is necessary in understanding why you need to care about what thread callbacks are triggered from.
subscribeOn and observeOn
As stated in Michael Parkers’ Effective-RxJava:
By default,
Observable
instances specify an execution policy of “immediate.”
While in most cases that may be fine, it’s not so great for larger, more
intensive operations that may block the main thread. Luckily, RxJava has solved
this problem by giving us subscribeOn(Scheduler scheduler)
and
observeOn(Scheduler scheduler)
.
subscribeOn(Scheduler scheduler)
applies to upstream Observable
instances
and its scheduler
specifies the thread on which the upstream subscribe
method is invoked. In English, every Observable
and call above the
subscribeOn(Scheduler scheduler)
call will happen on the thread specified by
subscribeOn()
.
Conversely, the observeOn(Scheduler scheduler)
method applies to downstream
Observable
instances. Its Scheduler
parameter specifies the thread on which
events, such as the next emitted value or the stream terminating normally or
with an error, are observed downstream.
Okay, but seriously, why should I care?
Using observeOn
and subscribeOn
means you can reasonably assume what is
being done on what thread. However if you are using those methods in conjunction
with an Observable
that has an internal listener that is being triggered from
a callback on a different thread, you lose that guarantee. Because now your
custom Observable
is internally switching threads.
That means that your calls to subscribeOn
or observeOn
on your Observable
will be ignored once you hit the custom Observable
because it will switch
threads without you realizing.
To quote the great Porky Pig, "That’s All Folks!”