How to think about Subjects in RxJava (Part 1)

Kaushik Gopal
tech-at-instacart
Published in
6 min readDec 18, 2015

--

When I first started using RxJava and heard of Subjects I envisioned them as mystical trinkets. When used correctly they seemed to magically do the impossible. When used incorrectly they turned my code to a steaming pile of U+1F4A9. A friend warming up to RxJava echoed a similar sentiment: “Subjects are like a (colorful adjective) black box to me. I’m not really sure when I should ever be using them. I run into a corner with RxJava, snoop around the web, copy-paste some code which uses Subjects from StackOverflow, get my code working and hope to never look back at that code again”.

We know Observables and Subscribers to be the workhorse constructs. Coupled with an operator (or two) you should be on your merry Rx way. But adding Subjects to the mix opens a whole new channel of communication between these constructs. You just have to start thinking about them differently.

The goal of this post is to help you warm up to that line of thinking.

Let’s start off by looking at the textbook definition:

A Subject is a sort of bridge or proxy that acts both as a Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

you know nothing JS

A Subscriber and an Observable?

What demigoddery is this?! You have a producer (Observable) on the one end sending down a stream of events. On the other end, you have a consumer (Subscriber) swallowing up those events. When would you possibly need something that does both?

Think pipe connectors

Let’s take a specific example. The world of Android development has this common requirement: “Resuming or continuing the work done by a long-running operation like a network call after a screen rotation or configuration change”. I took a stab at the solution using retained fragments from the Android APIs, coupled with some RxJava.

pipe connectors

You have a UI based fragment (A) which acts as the master. When you want to start your network call you spawn a worker fragment (B) which makes the call. Meanwhile, if (A) is rotated, recreated, destroyed etc. it wouldn’t matter. After (A) finally connects or syncs back with (B), it would only receive subsequent events from (B) without restarting the whole process. Let’s model this example with some code. Say I have a long running Observable:

Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long aLong) {
return aLong.intValue();
}
})
.take(20);

In the real world, this Observable would be your network call or long-running operation. This is executed immediately in the onCreate method of the worker fragment (B) as shown below:

/**
* This method will only be called once when the retained Fragment is first created.
*/
@Override
public void onCreate(Bundle savedInstanceState) {
Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long aLong) {
return aLong.intValue();
}
})
.take(20)
.subscribe(); // Observable kicked off
}
By virtue of the way Android works, only at a later point in (B) (like the onResume call) can you actually talk back to any connecting fragment such as the UI based fragment (A) :@Override
public void onResume() {
super.onResume();
// channel open for communication with master fragment
// send results (A)
}
Realization
So we started the call in onCreate... how do we pipe the results down to onResume? What if we had this magical trinket that acted as a Subscriber in onCreate consuming all those events, but as an Observable in onResume sending down the events to anyone listening?With Subjects we can do the following:// consume events
(...).subscribe(mSubject);
// produce events
mSubject.subscribe(...);
slightly more flushed out:private Subject<Integer, Integer> mSubject = PublishSubject.create();@Override
public void onCreate(Bundle savedInstanceState) {
Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Integer>() {
@Override
public Integer call(Long aLong) {
return aLong.intValue();
}
})
.take(20)
.subscribe(mSubject);
}
@Override
public void onResume() {
super.onResume();
// channel open for communication with master fragment
// send results to master fragment
mfragA.sendResultsBack(mSubject.asObservable);
}
There you have it! A pipe connector for all your Rx streams. A complete working example can be found here.Some considerations:
  • Dispose your subscriptions responsibly!
In the snippets above, I've omitted the parts where we responsibly dispose the subscriptions, for brevity and clarity. Have a look at the solution to see how this should be done.
  • Uhh….why not just create the observable in onResume?
Firstly, we want to execute the call as soon as possible. onCreate is the earliest point we can do this. onResume on the other hand is the point where (A) and (B) can actually talk to each other.Secondly and more importantly if you created the observable in onResume, every time (A) connects back to (B) you would simply be restarting the sequence... which defeats the purpose.
  • This works because Subjects are by default “hot”:
Most observables are "cold". For a fantastic explanation of Hot/Cold observables check out this egghead video. Essentially "cold" observables are like playing your videos from the start every time while "hot" observables are like live-streaming videos. mObservable here (despite the use of an interval operator) is "cold", so every time you subscribe to the stream it will restart the sequence. Unlike cold observables, Subjects are "hot" by default.
  • Which Subject to use?
There are many kinds of subjects. For this specific requirement, a PublishSubject works well because we wish to continue the sequence from where it left off. So assuming events 1,2,3 were emitted in (B), after (A) connects back we only want to see 4, 5, 6. If we used a ReplaySubject we would see [1, 2, 3], 4, 5, 6; or if we used a BehaviorSubject we would see 3, 4, 5, 6 etc.
A previous solution used a very similar technique. However, this is where we enter the lands of "multicasting". Multicasting is not simple: you have to consider thread-safety or come to grips with concepts like .refCount etc. Subjects are far more simpler. As the amazing Karnok point out in this excellent series, when using Subjects you can handle complex conditions like backpressure very easily with simple operators like .onBackPressurexxx.Another wise Jedi Rxer also mentioned once that using multicasting techniques are dangerous and indications of code smells. Almost always, there's a ready-made construct or operator that does the job for you far more easily.Stay tuned for more in this series.[greenhouse url_token="instacart" department_filter="Engineering"]If you fancy talking about such topics all day, you should check our jobs page out.My thanks to Donn Felker, Maksim and Michael for proofreading this article.Images used in this post:

--

--