Chapter 4. Applying Reactive Programming to Existing Applications
Introducing a new library, technology, or paradigm to an application, be it greenfield or legacy codebase, must be a careful decision. RxJava is not an exception. In this chapter, we review some patterns and architectures found in ordinary Java applications and see how Rx can help. This process is not straightforward and requires a significant mindset shift, therefore we will carefully transform from imperative to functional and reactive style. Many libraries in Java projects these days simply add bloat without giving anything in return. However, you will see how RxJava not only simplifies traditional projects, but what kinds of benefits it brings to legacy platforms.
I am pretty sure that you’re already very excited about RxJava. Built-in operators and simplicity makes Rx an amazingly powerful tool for transforming streams of events. However, if you go back to your office tomorrow, you will realize that there are no streams, no real-time events from stock exchange. You can hardly find any events in your applications; it’s just a mash-up of web requests, databases, and external APIs. You are so eager to try this new RxJava-thing somewhere beyond Hello world. Yet it seems that there are simply no use cases in real life that justify using Rx. Yet, RxJava can be a significant step forward in terms of architectural consistency and robustness. You do not need to commit to reactive style top-to-bottom—this is too risky and requires too much work in the beginning. But Rx can be introduced at any layer, without breaking an application as a whole.
We take you through some common application patterns and ways by which you can enhance them with RxJava in noninvasive way, with the focus being on database querying, caching, error handling, and periodic tasks. The more RxJava you add in various places of your stack the more consistent your architecture will become.
From Collections to Observables
Unless your platform was built recently in JVM frameworks like Play, Akka actors, or maybe Vert.x, you are probably on a stack with a servlet container on one hand, and JDBC or web services on the other. Between them, there is a varying number of layers implementing business logic, which we will not refactor all at once; instead, let’s begin with a simple example. The following class represents a trivial repository abstracting us from a database:
class
PersonDao
{
List
<
Person
>
listPeople
()
{
return
query
(
"SELECT * FROM PEOPLE"
);
}
private
List
<
Person
>
query
(
String
sql
)
{
//...
}
}
Implementation details aside, how is this related to Rx?
So far we have been talking about asynchronous events pushed from upstream systems or, at best, when someone subscribes. How is this mundane Dao
relevant here?
Observable
is not only a pipe pushing events downstream. You can treat Observable<T>
as a data structure, dual to Iterable<T>
.
They both hold items of type T
, but providing a radically different interface.
So, it shouldn’t come as a surprise that you can simply replace one with the other:
Observable
<
Person
>
listPeople
()
{
final
List
<
Person
>
people
=
query
(
"SELECT * FROM PEOPLE"
);
return
Observable
.
from
(
people
);
}
At this point, we made a breaking change to the existing API. Depending on how big your system is, such incompatibility might be a major concern. Thus, it is important to bring RxJava into your API as soon as possible. Obviously, we are working with an existing application so that can’t be the case.
BlockingObservable: Exiting the Reactive World
If you are combining RxJava with existing, blocking and imperative code, might need have to translate Observable
to a plain collection.
This transformation is rather unpleasant, it requires blocking on an Observable
waiting for its completion.
Until Observable
completes, we are not capable of creating a collection.
BlockingObservable
is a special type that makes it easier to work with Observable
in nonreactive environment.
BlockingObservable
should be your last choice when working with RxJava, but it is inevitable when combining blocking and nonblocking code.
In Chapter 3, we refactored the listPeople()
method so that it returns Observable<People>
rather than List
.
Observable
is not an Iterable
in any sense, so our code no longer compiles.
We want to take baby steps rather than massive refactoring, so let’s keep the scope of changes as minimal as possible.
The client code could look like this:
List
<
Person
>
people
=
pesonDao
.
listPeople
();
String
json
=
marshal
(
people
);
We can imagine the marshal()
method pulling data from the people
collection and serializing them to JSON. That’s no longer the case, we
can’t simply pull items from Observable
when we want.
Observable
is in charge of producing (pushing) items and notifying subscribers if any.
This radical change can be easily circumvented with BlockingObservable
.
This handy class is entirely independent from Observable
and can be obtained via the Observable.toBlocking()
method.
The blocking variant of Observable
has superficially similar methods like single()
or subscribe()
.
However, BlockingObservable
is much more convenient in blocking environments that are inherently unprepared for the asynchronous nature of Observable
.
Operators on BlockingObservable
typically block (wait) until the underlying Observable
is completed.
This strongly contradicts the main concept of Observable
s that everything is likely asynchronous, lazy, and processed on the fly.
For example, Observable.forEach()
will asynchronously receive events from Observable
as they come in, whereas BlockingObservable.forEach()
will block until all events are processed and stream is completed.
Also exceptions are no longer propagated as values (events) but instead are rethrown in the calling thread.
In our case, we want to transform Observable<Person>
back into List<Person>
to limit the scope of refactoring:
Observable
<
Person
>
peopleStream
=
personDao
.
listPeople
();
Observable
<
List
<
Person
>>
peopleList
=
peopleStream
.
toList
();
BlockingObservable
<
List
<
Person
>>
peopleBlocking
=
peopleList
.
toBlocking
();
List
<
Person
>
people
=
peopleBlocking
.
single
();
I intentionally left all intermediate types explicit in order to explain
what happens. After refactoring to Rx, our API returns
Observable<Person> peopleStream
. This stream can potentially be fully
reactive, asynchronous, and event driven, which doesn’t match at all what
we need: a static List
. As the first step, we turn Observable<Person>
into Observable<List<Person>>
. This lazy operator will buffer all
Person
events and keep them in memory until the onCompleted()
event is
received. At this point, a single event of type List<Person>
will
be emitted, containing all seen events at once, as illustrated in the following marble diagram:
The resulting stream completes immediately after emitting a single List
item. Again, this operator is asynchronous; it doesn’t wait for all events to arrive but instead lazily buffers all values. The awkward looking Observable<List<Person>> peopleList
is then converted to
BlockingObservable<List<Person>> peopleBlocking
. BlockingObservable
is a good idea only when you must provide a blocking, static view of your otherwise asynchronous Observable
. Whereas Observable.from(List<T>)
converts normal pull-based collection into Observable
, toBlocking()
does something quite the opposite. You might ask yourself why we need two abstractions for blocking and nonblocking operators. The authors of RxJava figured out that being
explicit about synchronous versus asynchronous nature of underlying operator is too crucial to be left for JavaDoc. Having two unrelated
types ensures that you always work with the appropriate data structure. Moreover, BlockingObservable
is your weapon of last resort; normally, you should
compose and chain plain Observable
s as long as possible.
However, for the purpose of this exercise, let’s escape from Observable
right away.
The last operator single()
drops observables altogether and
extracts one, and only one, item we expect to receive from
BlockingObservable<T>
. A similar operator, first()
, will return a value of T
and discard whatever it has left. single()
, on the other hand, makes sure there are no more pending events in underlying Observable
before terminating. This means single()
will block waiting for onCompleted()
callback. Here is the same code snippet as earlier, this time with all operators chained:
List
<
Person
>
people
=
personDao
.
listPeople
()
.
toList
()
.
toBlocking
()
.
single
();
You might think that we went through all this hassle of wrapping and unwrapping Observable
for no apparent reason.
Remember, this was just the first step.
The next transformation will introduce some laziness.
Our code as it stands right now always executes query("...")
and wraps it with Observable
.
As you know by now, Observable
s (especially cold ones) are lazy by definition.
As long as no one subscribes, they just represent a stream that never had a chance to begin emitting values.
Most of the time you can call methods returning Observable
and as long as you don’t subscribe, no work will be done.
Observable
is like a Future
because it promises a value to appear in the future.
But as long as you don’t request it, a cold Observable
will not even begin emitting.
From that perspective, Observable
is more similar to java.util.function.Supplier<T>
, generating values of type T
on demand.
Hot Observable
s are different because they emit values whether you are listening or not, but we are not considering them right now.
The mere existence of Observable
does not indicate a background job or any side effect, unlike Future
, which almost always suggests some operation running concurrently.
Embracing Laziness
So how do we make our Observable
lazy? The simples technique is to
wrap an eager Observable
with defer()
:
public
Observable
<
Person
>
listPeople
()
{
return
Observable
.
defer
(()
->
Observable
.
from
(
query
(
"SELECT * FROM PEOPLE"
)));
}
Observable.defer()
takes a lambda expression (a factory) that can
produce Observable
.
The underlying Observable
is eager, so we want to postpone its creation. defer()
will wait until the last possible moment
to actually create Observable
; that is, until someone actually
subscribes to it. This has some interesting implications. Because
Observable
is lazy, calling listPeople()
has no side effects
and almost no performance footprint. No database is queried yet. You can
treat Observable<Person>
as a promise but without any background
processing happening yet. Notice that there is no asynchronous behavior
at the moment, just lazy evaluation. This is similar to how values in the Haskell programming language are evaluated
lazily only when absolutely needed.
If you never programmed in functional languages, you might be quite confused why laziness is so important and groundbreaking. It turns out that such behavior is quite useful and can improve the quality and freedom of your implementation quite a bit. For example, you no longer have to pay attention to which resources are fetched, when, and in what order. RxJava will load them only when they are absolutely needed.
As an example take this trivial fallback mechanism that we have all seen so many times:
void
bestBookFor
(
Person
person
)
{
Book
book
;
try
{
book
=
recommend
(
person
);
}
catch
(
Exception
e
)
{
book
=
bestSeller
();
}
display
(
book
.
getTitle
());
}
void
display
(
String
title
)
{
//...
}
You probably think there is nothing wrong with such a construct. In this example, we try to recommend the best book for a given person, but in case of failures, we degrade gracefully and display the best seller. The assumption is that fetching a bestseller is faster and can be cached. But what if you could add error handling declaratively so that try
-catch
blocks aren’t obscuring real logic?
void
bestBookFor
(
Person
person
)
{
Observable
<
Book
>
recommended
=
recommend
(
person
);
Observable
<
Book
>
bestSeller
=
bestSeller
();
Observable
<
Book
>
book
=
recommended
.
onErrorResumeNext
(
bestSeller
);
Observable
<
String
>
title
=
book
.
map
(
Book:
:
getTitle
);
title
.
subscribe
(
this
::
display
);
}
We are only exploring RxJava so far, thus I left all these intermediate
values and types. In real life, bestBookFor()
would look more like
this:
void
bestBookFor
(
Person
person
)
{
recommend
(
person
)
.
onErrorResumeNext
(
bestSeller
())
.
map
(
Book:
:
getTitle
)
.
subscribe
(
this
::
display
);
}
This code is beautifully concise and readable. First find
a recommendation for person
.
In case of error (onErrorResumeNext
), proceed with a bestseller.
No matter which one succeeded, map
returns a
value by extracting the title and then displays it. onErrorResumeNext()
is
a powerful operator that intercepts exceptions happening upstream,
swallows them, and subscribes to provided backup Observable
. This is
how Rx implements a try
-catch
clause. We will spend much more time on
error handling later in this book (see “Declarative try-catch Replacement”). For the time being, notice how we can
lazily call bestSeller()
without worrying that fetching best seller
happens even when a real recommendation went fine.
Composing Observables
SELECT * FROM PEOPLE
is not really a state-of-the-art SQL query. First, you should not fetch all columns blindly, but fetching all rows is
even more damaging. Our old API is not capable of paging results,
viewing just a subset of a table.
It might look like this, again in traditional enterprise application:
List
<
Person
>
listPeople
(
int
page
)
{
return
query
(
"SELECT * FROM PEOPLE ORDER BY id LIMIT ? OFFSET ?"
,
PAGE_SIZE
,
page
*
PAGE_SIZE
);
}
This is not a SQL book, so we’re going to set the implementation details aside. The author of this API was merciless: we don’t have the freedom to choose any range of records, we can only operate on 0-based page numbers. However in RxJava, due to laziness we can actually simulate reading an entire database starting from given page:
import
static
rx
.
Observable
.
defer
;
import
static
rx
.
Observable
.
from
;
Observable
<
Person
>
allPeople
(
int
initialPage
)
{
return
defer
(()
->
from
(
listPeople
(
initialPage
)))
.
concatWith
(
defer
(()
->
allPeople
(
initialPage
+
1
)));
}
This code snippet lazily loads the initial page of database records,
for example 10 items. If no one subscribes, even this first query is not
invoked. If there is a subscriber that only consumes a few initial
elements (e.g., allPeople(0).take(3)
), RxJava will unsubscribe
automatically from our stream and no more queries are executed. So what
happens when we request, say, 11 items but the first listPeople()
call
returned only 10? Well, RxJava figures out that the initial Observable
is exhausted but the consumer is still hungry. Luckily, it sees
concatWith()
operator, that basically says: when the Observable
on
the left is completed, rather than propagating completion notification
to subscribers, subscribe to Observable
on the right and continue as
if nothing happened, as depicted in the following marble diagram:
In other words, concatWith()
can join together two Observable
s so
that when the first one completes, the second one takes over. In
a.concatWith(b).subscribe(...)
, subscriber will first receive all
events from a
, followed by all events from b
. In this case, the
subscriber first receives an initial 10 items followed by a subsequent 10.
However, look carefully, there is an alleged infinite recursion in our code!
allPeople(initialPage)
calls allPeople(initialPage + 1)
without any
stop condition. This is a recipe for StackOverflowError
in most
languages, but not here. Again, calling
allPeople()
is always lazy, therefore the moment you stop listening
(unsubscribe), this recursion is over.
Technically concatWith()
can still produce StackOverflowError
here.
Wait until “Honoring the Requested Amount of Data”, you will learn how to deal with the varying demand for incoming data.
The technique of lazily loading data chunk by chunk is quite useful because
it allows you to concentrate on business logic, not on low-level
plumbing. We already see some benefits of applying RxJava even on a small
scale. Designing an API with Rx in mind doesn’t influence the entire
architecture, because we can always fall back to BlockingObservable
and
Java collections. But it’s better to have wide range of possibilities
that we can further trim down if necessary.
Lazy paging and concatenation
There are more ways to implement lazy paging with RxJava. If you think about it, the simplest way of loading paged data is to load everything and then take whatever we need. It sounds silly, but thanks to laziness it is feasible. First we generate all possible page numbers and then we request loading each and every page individually:
Observable
<
List
<
Person
>>
allPages
=
Observable
.
range
(
0
,
Integer
.
MAX_VALUE
)
.
map
(
this
::
listPeople
)
.
takeWhile
(
list
->
!
list
.
isEmpty
());
If this were not RxJava, the preceding code would take an enormous amount of time and memory, basically loading the entire database to memory.
But because Observable
is lazy, no query to the database appeared yet.
Moreover, if we find an empty page it means all further pages are empty, as well (we reached the end of the table).
Therefore, we use takeWhile()
rather than filter()
.
To flatten allPages
to Observable<Person>
we can use concatMap()
(see “Preserving Order Using concatMap()”):
Observable
<
Person
>
people
=
allPages
.
concatMap
(
Observable:
:
from
);
concatMap()
requires a transformation from List<Person>
to Observable<Person>
, executed for each page.
Alternatively we can try concatMapIterable()
, which does the same thing, but the transformation should return an Iterable<Person>
for each upstream value (happening to be Iterable<Person>
already):
Observable
<
Person
>
people
=
allPages
.
concatMapIterable
(
page
->
page
);
No matter which approach you choose, all transformations on Person
object are lazy.
As long as you limit the number of records you want to process (for example with people.take(15)
), the Observable<Person>
will invoke listPeople()
as late as possible.
Imperative Concurrency
I don’t often see explicit concurrency in enterprise applications. Most of the time a single request is handled by a single thread. The same thread does the following:
-
Accepts TCP/IP connection
-
Parses HTTP request
-
Calls a controller or servlet
-
Blocks on database call
-
Processes results
-
Encodes response (e.g., in JSON)
-
Pushes raw bytes back to the client
This layered model affects user latency when the backend makes several independent requests for instance to database. They are performed sequentially, whereas one could easily parallelize them. Moreover scalability is affected. For example in Tomcat there are 200 threads by default in the executors that are responsible for handling requests. This means that we can’t handle more than 200 concurrent connections. In case of a sudden but short burst of traffic, incoming connections are queued and the server responds with higher latency. However, this situation can’t last forever, and Tomcat will eventually begin rejecting incoming traffic. We will devote large parts of the next chapter (see “Nonblocking HTTP Server with Netty and RxNetty”) on how to deal with this rather embarrassing shortcoming. For the time being, let’s stay with traditional architecture. Executing every step of request handling within a single thread has some benefits, for example improved cache locality and minimal synchronization overhead.1 Unfortunately, in classic applications, because overall latency is the sum of each layer’s latencies, one malfunctioning component can have a negative impact on total latency.2 Moreover, sometimes there are many steps that are independent from one another and can be executed concurrently. For example, we invoke multiple external APIs or execute several independent SQL queries.
JDK has quite good support for concurrency, especially since Java 5 with ExecutorService
and Java 8 with CompletableFuture
.
Nonetheless, it is not as widely used as it could be.
For example, let’s look at the following program with no concurrency whatsoever:
Flight
lookupFlight
(
String
flightNo
)
{
//...
}
Passenger
findPassenger
(
long
id
)
{
//...
}
Ticket
bookTicket
(
Flight
flight
,
Passenger
passenger
)
{
//...
}
SmtpResponse
sendEmail
(
Ticket
ticket
)
{
//...
}
And on the client side:
Flight
flight
=
lookupFlight
(
"LOT 783"
);
Passenger
passenger
=
findPassenger
(
42
);
Ticket
ticket
=
bookTicket
(
flight
,
passenger
);
sendEmail
(
ticket
);
Again, quite typical, classic blocking code, similar to what you can find in many applications.
But if you look carefully
from a latency perspective, the preceding code snippet has four steps; however, the first two are independent from each other. Only the third step
(bookTicket()
) needs results from lookupFlight()
and
findPassenger()
. There exists an obvious opportunity to take advantage
of concurrency. Yet, very few developers will actually go down this path
because it requires awkward thread pools, Future
s, and callbacks. What if the API were already Rx-compatible, though? Remember, you can simply wrap
blocking, legacy code in Observable
, just like we did in the beginning
of this chapter:
Observable
<
Flight
>
rxLookupFlight
(
String
flightNo
)
{
return
Observable
.
defer
(()
->
Observable
.
just
(
lookupFlight
(
flightNo
)));
}
Observable
<
Passenger
>
rxFindPassenger
(
long
id
)
{
return
Observable
.
defer
(()
->
Observable
.
just
(
findPassenger
(
id
)));
}
Semantically, the rx-
methods do exactly the same thing and in the same way; that is, they are blocking by default. We didn’t gain anything yet, apart from a more verbose API from the client perspective:
Observable
<
Flight
>
flight
=
rxLookupFlight
(
"LOT 783"
);
Observable
<
Passenger
>
passenger
=
rxFindPassenger
(
42
);
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
(
f
,
p
)
->
bookTicket
(
f
,
p
));
ticket
.
subscribe
(
this
::
sendEmail
);
Both traditional blocking programs and the one with Observable
work
exactly the same way. It’s lazier by default, but the order of operations
is essentially the same. First, we create Observable<Flight>
, which as
you already know, does nothing by default. Unless someone explicitly asks
for a Flight
, this Observable
is just a lazy placeholder. We already
learned that this is a valuable property of cold Observable
s. The same
story goes for Observable<Passenger>
; we have two placeholders of type
Flight
and Passenger
, however no side-effects were performed yet. No
database query or web-service call. If we decide to stop processing
here, no superfluous work was done.
To proceed with bookTicket()
, we need concrete instances of
both Flight
and Passenger
.
It is tempting to just block on these two Observable
s by using the toBlocking()
operator.
However, we would like to avoid blocking as much as possible to reduce resource consumption (especially memory) and allow greater concurrency.
Another poor solution is to
.subscribe()
on the flight
and passenger
Observable
s and somehow
wait for both callbacks to finish. It’s fairly straightforward when
Observable
is blocking, but if callbacks appear asynchronously and you
need to synchronize some global state waiting for both of them, this
quickly becomes a nightmare.
Also a nested subscribe()
is nonidiomatic, and typically you want a single subscription for one message flow (use case).
The only reason why callbacks work somewhat
decently in JavaScript is because there is just one thread. The
idiomatic way of subscribing to multiple Observable
s at the same time
is zip
and zipWith
. You might perceive zip
as a way to join two
independent streams of data pair-wise. But far more often, zip
is
simply used to join together two single-item Observable
s.
ob1.zip(ob2).subscribe(...)
essentially means that receiving an event when
both ob1
and ob2
are done (emit an event on their own). So whenever
you see zip
, it’s more likely that someone is simply making a join
step on two or more Observable
s that had forked paths of execution.
zip
is a way to asynchronously wait for two or more values, no matter
which one appears last.
So let’s get back to flight.zipWith(passenger, this::bookTicket)
(a
shorter syntax using method reference instead of explicit lambda, as in the
code sample). The reason I keep all of the type information rather than
fluently joining expressions is because I want you to pay attention to
return types. flight.zipWith(passenger, ...)
doesn’t simply invoke
callback when both flight
and passenger
are done; it returns a new
Observable
which you should immediately recognize as a lazy
placeholder for data. Amazingly, at this point in time no computation was yet started, as well. We simply wrapped few data structures together, but no behavior was triggered. As long as no one subscribes to Observable<Ticket>
, RxJava won’t run any backend code. This is what finally happens in last statement: ticket.subscribe()
explicitly asks for Ticket
.
Where to Subscribe?
Pay attention to where you see subscribe()
in domain code.
Often your business logic is just composing Observable
s all the way down and returning them to some sort of framework or scaffolding layer.
The actual subscription happens behind the scenes in a web framework or some glue code.
It is not a bad practice to call subscribe()
yourself, but try to push it out as far as possible.
To understand the flow of execution, it’s useful to
look bottom up. We subscribed to ticket
, thus RxJava must subscribe
transparently to both flight
and passenger
. At this point the real
logic happens. Because both Observable
s are cold and no concurrency is yet involved, the first subscription to flight
invokes the lookupFlight()
blocking method right in the calling thread. When lookupFlight()
is done, RxJava can subscribe to passenger
. However, it already received a Flight
instance from synchronous flight
. rxFindPassenger()
calls findPassenger()
in a blocking fashion and receives a Passenger
instance. At this juncture, data flows back downstream. Instances of Flight
and Passenger
are combined using the provided lambda (bookTicket
) and passed to ticket.subscribe()
.
This sounds like a lot of work considering it behaves and works
essentially just like our blocking code in the beginning. But now we can
declaratively apply concurrency without changing any logic. If our
business methods returned Future<Flight>
(or
CompletableFuture<Flight>
, it doesn’t really matter), two decisions would
have been made for us:
-
The underlying invocation of
lookupFlight()
already began and there is no place for laziness. We don’t block on such method, but work already started. -
We have no control over concurrency whatsoever, it is the method implementation that decides whether a
Future
task is invoked in a thread pool, new thread per request, and so on.
RxJava gives users more control. Just because Observable<Flight>
wasn’t implemented with concurrency in mind, this does not mean that we cannot apply it later. Real-world Observable
s are typically asynchronous already, but in rare cases you must add concurrency to an existing Observable
. The consumers of our API, not the implementors, are free to choose the threading mechanism in case of the synchronous Observable
. All of this is achieved by using the subscribeOn()
operator:
Observable
<
Flight
>
flight
=
rxLookupFlight
(
"LOT 783"
).
subscribeOn
(
Schedulers
.
io
());
Observable
<
Passenger
>
passenger
=
rxFindPassenger
(
42
).
subscribeOn
(
Schedulers
.
io
());
At any point before subscribing, we can
inject subscribeOn()
operator and provide a so-called Scheduler
instance. In
this case, I used the Schedulers.io()
factory method, but we can just as well use a custom ExecutorService
and quickly wrap it with Scheduler
. When
subscription occurs, the lambda expression passed to
Observable.create()
is executed within the supplied Scheduler
rather than the client thread. It is not necessary yet but we will examine schedulers in depth in “What Is a Scheduler?” section. For the time being, treat a Scheduler
like a thread pool.
How does Scheduler
change the runtime behavior of our
program? Remember that the zip()
operator subscribes to two or more
Observable
s and waits for pairs (or tuples). When
subscription occurs asynchronously, all upstream Observable
s can call
their underlying blocking code concurrently. If you now run your
program, lookupFlight()
and findPassenger()
will begin execution
immediately and concurrently when ticket.subscribe()
is invoked. Then,
bookTicket()
will be applied as soon as the slower of the
aforementioned Observable
s emits a value.
Talking about slowness, you can declaratively apply a timeout, as well, when a given Observable
does not emit any value in the specified amount of time:
rxLookupFlight
(
"LOT 783"
)
.
subscribeOn
(
Schedulers
.
io
())
.
timeout
(
100
,
TimeUnit
.
MILLISECONDS
)
As always, in case of errors, they are propagated downstream rather than
thrown arbitrarily. So if the lookupFlight()
method takes more
than 100 milliseconds, you will end up with TimeoutException
rather
than an emitted value sent downstream to every subscriber.
The timeout()
operator is exhaustively explained in “Timing Out When Events Do Not Occur”.
We ended up
with two methods running concurrently without much effort, assuming that your API is already Rx-driven.
But we cheated a little bit with bookTicket()
still returning Ticket
, which definitely means it is blocking.
Even if booking ticket was extremely fast, it is still worth declaring it as such, just to make the API easier to evolve.
The evolution might mean adding concurrency or using in fully nonblocking environments (see Chapter 5).
Remember that turning a nonblocking API into a blocking one is as easy as calling toBlocking()
.
The opposite is often challenging and requires lots of extra resources.
Also, it is very difficult to predict the evolution of methods like rxBookTicket()
, if they ever touch the network or filesystem, not to mention database, it is worth it to wrap them with an Observable
indicating possible latency on the type level:
Observable
<
Ticket
>
rxBookTicket
(
Flight
flight
,
Passenger
passenger
)
{
//...
}
But now zipWith()
returns an awkward Observable<Observable<Ticket>>
and the code no longer compiles.
A good rule of thumb is that whenever you see double-wrapped type (for example Optional<Optional<...>>
) there is a flatMap()
invocation missing somewhere.
That’s the case here, as well. zipWith()
takes a pair (or more generally a tuple) of events, applies a function
taking these events as arguments, and puts the result into the
downstream Observable
as-is. This is why we saw Observable<Ticket>
first but now it’s Observable<Observable<Ticket>>
, where
Observable<Ticket>
is the result of our supplied function. There are
two ways to overcome this problem. One way is by using an intermediate pair returned from
zipWith
:
import
org.apache.commons.lang3.tuple.Pair
;
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
(
Flight
f
,
Passenger
p
)
->
Pair
.
of
(
f
,
p
))
.
flatMap
(
pair
->
rxBookTicket
(
pair
.
getLeft
(),
pair
.
getRight
()));
If using an explicit Pair
from third-party library did not obscure flow enough, method
reference would actually work: Pair::of
, but again, we decided that
visible type information is more valuable than saving a few keystrokes.
After all we read code for much more time than we write it. An
alternative to an intermediate pair is applying a flatMap
with an identity
function:
Observable
<
Ticket
>
ticket
=
flight
.
zipWith
(
passenger
,
this
::
rxBookTicket
)
.
flatMap
(
obs
->
obs
);
This obs -> obs
lambda expression is seemingly not doing anything, at
least if it were a map()
operator. But remember that flatMap()
applies
a function to each value inside Observable
, so this function takes
Observable<Ticket>
as an argument in our case. Later, the result is not placed directly in the resulting stream, like with map()
. Instead, the return value (of type Observable<T>
) is “flattened,” leading to an Observable<T>
rather than Observable<Observable<T>>
. When dealing with schedulers, the flatMap()
operator becomes even more powerful. You might perceive flatMap()
as merely a syntactic trick to avoid a nested Observable<Observable<...>>
problem, but it’s much more fundamental than this.
Observable.subscribeOn() Use Cases
It is tempting to think that subscribeOn()
is the right tool for concurrency in RxJava.
This operator works but you should not see the usage of subscribeOn()
(and yet to be described observeOn()
) often.
In real life, Observable
s come from asynchronous sources, so custom scheduling is not needed at all.
We use subscribeOn()
throughout this chapter to explicitly show how to upgrade existing applications to use reactive principles selectively.
But in practice, Scheduler
s and subscribeOn()
are weapons of last resort, not something seen commonly.
flatMap() as Asynchronous Chaining Operator
In our sample application, we must now send a list of Ticket
s via
e-mail. But we must keep in mind the following:
-
The list can be potentially quite long.
-
Sending an email might take several milliseconds or even seconds.
-
The application must keep running gracefully in case of failures, but report in the end which tickets failed to be delivered.
The last requirement quickly rules out simple
tickets.forEach(this::sendEmail)
because it eagerly throws an
exception and won’t continue with tickets that were not yet delivered.
Exceptions are actually a nasty back door to the type system and just
like callbacks are not very friendly when you want to manage them in a
more robust way.
That is why RxJava models them explicitly as special notifications, but be patient, we will get there.
In light of the error-handling requirement, our code looks more-or-less like that:
List
<
Ticket
>
failures
=
new
ArrayList
<>();
for
(
Ticket
ticket:
tickets
)
{
try
{
sendEmail
(
ticket
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
);
failures
.
add
(
ticket
);
}
}
However, the first two requirements or guidelines aren’t addressed. There is no reason why we keep sending emails from one thread sequentially.
Traditionally, we could use an ExecutorService pool
for that by submitting
each email as a separate task:
List
<
Pair
<
Ticket
,
Future
<
SmtpResponse
>>>
tasks
=
tickets
.
stream
()
.
map
(
ticket
->
Pair
.
of
(
ticket
,
sendEmailAsync
(
ticket
)))
.
collect
(
toList
());
List
<
Ticket
>
failures
=
tasks
.
stream
()
.
flatMap
(
pair
->
{
try
{
Future
<
SmtpResponse
>
future
=
pair
.
getRight
();
future
.
get
(
1
,
TimeUnit
.
SECONDS
);
return
Stream
.
empty
();
}
catch
(
Exception
e
)
{
Ticket
ticket
=
pair
.
getLeft
();
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
);
return
Stream
.
of
(
ticket
);
}
})
.
collect
(
toList
());
//------------------------------------
private
Future
<
SmtpResponse
>
sendEmailAsync
(
Ticket
ticket
)
{
return
pool
.
submit
(()
->
sendEmail
(
ticket
));
}
That’s a fair amount of code that all Java programmers should be
familiar with. Yet it seems too verbose and accidentally complex. First,
we iterate over tickets
and submit them to a thread pool. To be
precise, we call the sendEmailAsync()
helper method that submits
sendEmail()
invocation wrapped in Callable<SmtpResponse>
to a thread
pool
.
Even more precise instances of Callable
are first placed in an unbounded (by default) queue in front of a thread pool.
Lack of mechanisms that slow down too rapid submission of tasks if they cannot be processed on time led to reactive streams and backpressure effort (see “Backpressure”).
Because later we will need a Ticket
instance in case of
failure, we must keep track of which Future
was responsible for which
Ticket
, again in a Pair
. In real production code, you should consider a more meaningful and dedicated container like a TicketAsyncTask
value object. We collect all such pairs and proceed to the next iteration. At this point, the thread pool is already running multiple sendEmail()
invocations concurrently, which is precisely what we were aiming at. The second loop goes through all Future
s and tries to dereference them by blocking
(get()
) and awaiting for completion. If get()
returns
successfully, we skip such a Ticket
. However, if there is an exception we return Ticket
instance that was associated with this task—we know it failed and we want to report it later. Stream.flatMap()
allows us to return zero or one elements (or actually any number), contrary to Stream.map()
, which always requires one.
You might be wondering why we need two loops instead of just one like this:
//WARNING: code is sequential despite utilizing thread pool
List
<
Ticket
>
failures
=
tickets
.
stream
()
.
map
(
ticket
->
Pair
.
of
(
ticket
,
sendEmailAsync
(
ticket
)))
.
flatMap
(
pair
->
{
//...
})
.
collect
(
toList
());
This is an interesting bug that is really difficult to find if you don’t understand how Stream
s in Java 8 work. Because streams—just like Observable
s—are lazy, they evaluate the underlying collection one element
at a time and only when terminal operation was requested (e.g.,
collect(toList())
). This means that a map()
operation starting background tasks is not executed on all tickets immediately; rather, it’s done one at a
time, alternately by using a flatMap()
operation. Furthermore, we really
start one Future
, block waiting for it, start a second Future
, block waiting on that, and so on. An intermediate collection is needed to force evaluation, not
because of clarity or readability. After all,
List<Pair<Ticket, Future<SmtpResponse>>>
type is hardly more readable.
That’s plenty of work and the possibility of mistake is high, so it’s no
wonder that developers are reluctant to apply concurrent code on a daily
basis.
The little-known ExecutorCompletionService
from JDK is sometimes used when there is a pool of asynchronous tasks and we want to process them as they complete.
Moreover, Java 8 brings CompletableFuture
(see “CompletableFuture and Streams”) that is entirely reactive and nonblocking.
But how can RxJava help here?
First, assume that an API for sending an email is already retrofitted to use RxJava:
import
static
rx
.
Observable
.
fromCallable
;
Observable
<
SmtpResponse
>
rxSendEmail
(
Ticket
ticket
)
{
//unusual synchronous Observable
return
fromCallable
(()
->
sendEmail
())
}
There is no concurrency involved, just wrapping sendEmail()
inside an Observable
.
This is a rare Observable
; ordinarily you would use subscribeOn()
in the implementation so that the Observable
is asynchronous by default.
At this point, we can iterate over all tickets
as before:
List
<
Ticket
>
failures
=
Observable
.
from
(
tickets
)
.
flatMap
(
ticket
->
rxSendEmail
(
ticket
)
.
flatMap
(
response
->
Observable
.<
Ticket
>
empty
())
.
doOnError
(
e
->
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
))
.
onErrorReturn
(
err
->
ticket
))
.
toList
()
.
toBlocking
()
.
single
();
Observable.ignoreElements()
It is easy to see that inner flatMap()
in our example ignores response
and returns an empty stream.
In such cases, flatMap()
is an overkill; the ignoreElements()
operator is far more efficient.
ignoreElements()
simply ignores all emitted values and forwards onCompleted()
or onError()
notifications.
Because we are ignoring the actual response and just deal with errors, ignoreElements()
works great here.
All we are interested in lies inside the outer flatMap()
. If it were just
flatMap(this::rxSendEmail)
, code would work; however, any failure
emitted from rxSendEmail
would terminate the entire stream.
But we want to “catch” all emitted errors and collect them for later consumption.
We
use a similar trick to Stream.flatMap()
: if response
was
successfully emitted, we transform it to an empty Observable
. This
basically means that we discard successful tickets. However, in case of
failures, we return a ticket
that raised an exception. An extra
doOnError()
callback allows us to log exception—of course we can
just as well add logging to onErrorReturn()
operator, but I found this
separation of concerns more functional.
To remain compatible with previous implementations, we transform Observable
into Observable<List<Ticket>>
, BlockingObservable<List<Ticket>>
, toBlocking()
, and finally List<Ticket>
(single()
). Interestingly, even BlockingObservable
remains lazy. A toBlocking()
operator on its own doesn’t force evaluation by subscribing to the underlying stream and it doesn’t even block. Subscription and thus iteration and sending emails is postponed until single()
is invoked.
Note that if we replace the outer flatMap()
with concatMap()
(see “Ways of Combining Streams: concat(), merge(), and switchOnNext()” and “Preserving Order Using concatMap()”), we will encounter a similar bug as the mentioned with JDK’s Stream
.
As opposed to flatMap()
(or merge
) that subscribe immediately to all inner streams, concatMap
(or concat
) subscribes one inner Observable
after another.
And as long as no one subscribed to Observable
, no work even began.
So far, a simple for
loop with a try
—catch
was replaced with less
readable and more complex Observable
. However, to turn our
sequential code into multithreaded computation we barely need to add one extra operator:
Observable
.
from
(
tickets
)
.
flatMap
(
ticket
->
rxSendEmail
(
ticket
)
.
ignoreElements
()
.
doOnError
(
e
->
log
.
warn
(
"Failed to send {}"
,
ticket
,
e
))
.
onErrorReturn
(
err
->
ticket
)
.
subscribeOn
(
Schedulers
.
io
()))
It is so noninvasive that you might find it hard to spot. One extra
subscribeOn()
operator causes each individual rxSendMail()
to be
executed on a specified Scheduler
(io()
, in this case). This is one
of the strengths of RxJava; it is not opinionated about threading,
defaulting to synchronous execution but allowing seamless and almost
transparent multithreading. Of course, this doesn’t mean that you can safely inject schedulers in arbitrary places. But at least the API is less verbose and higher level. We will explore schedulers in much more detail later in “Multithreading in RxJava”. For the time being remember that Observable
s are
synchronous by default; however, we can easily change that and apply
concurrency in places where it was least expected. This is especially
valuable in existing legacy applications, which you can optimize without
much hassle.
Wrapping up if you are implementing Observable
s from scratch, making them asynchronous by default is more idiomatic.
That means placing subscribeOn()
directly inside rxSendEmail()
rather than externally.
Otherwise, you risk wrapping already asynchronous streams with yet another layer of schedulers.
Of course, if the producer behind Observable
is already asynchronous, it is even better because your stream does not bind to any particular thread.
Additionally, you should postpone subscribing to an Observable
as late as possible, typically close to the web framework of our outside world.
This changes your mindset significantly.
Your entire business logic is lazy until someone actually wants to see the results.3
Replacing Callbacks with Streams
Traditional APIs are blocking most of the time, meaning they force you
to wait synchronously for the results. This approach works relatively
well, at least before you heard about RxJava. But a blocking API is
particularly problematic when data needs to be pushed from the API producer
to consumers—this is anarea where RxJava really shines. There are numerous
examples of such cases and various approaches are taken by API
designers. Typically, we need to provide some sort of a callback that the
API invokes, often called event listeners. One of the most common
scenarios like that is
Java Message Service (JMS). Consuming JMS typically involves implementing a class that the
application server or container notifies on every incoming messages. We
can replace with relative ease such listeners with a composable Observable
, which is much more robust and versatile. The traditional listener looks similar
to this class, here using JMS support in
Spring framework, but our solution is technology-agnostic:
@Component
class
JmsConsumer
{
@JmsListener
(
destination
=
"orders"
)
public
void
newOrder
(
Message
message
)
{
//...
}
}
When a JMS message
is received, the JmsConsumer
class must decide what
to do with it.
Typically, some business logic is invoked inside a message consumer.
When a new component wants to be notified about
such messages, it must modify JmsConsumer
appropriately. Coversely, imagine Observable<Message>
that can be subscribed to by anyone.
Moreover, an entire universe of RxJava operators is available, allowing
mapping, filtering, and combining capabilities. The easiest way to
convert from a push, callback-based API to Observable
is to use
Subject
s. Every time a new JMS message is delivered, we push that
message to a PublishSubject
that looks like an ordinary hot
Observable
from the outside:
private
final
PublishSubject
<
Message
>
subject
=
PublishSubject
.
create
();
@JmsListener
(
destination
=
"orders"
,
concurrency
=
"1"
)
public
void
newOrder
(
Message
msg
)
{
subject
.
onNext
(
msg
);
}
Observable
<
Message
>
observe
()
{
return
subject
;
}
Keep in mind that Observable<Message>
is hot; it begins emitting JMS
messages as soon as they are consumed. If no one is subscribed at the
moment, messages are simply lost. ReplaySubject
is an alternative, but
because it caches all events since the application startup, it’s not
suitable for long-running processes. In case you have a subscriber that
absolutely must receive all messages, ensure that it subscribes before the JMS message listener is initialized. Additionally, our message listener has a concurrency="1"
parameter to ensure that Subject
is not invoked from multiple threads. As an alternative, you can use Subject.toSerialized()
.
As a side note, Subject
s are easier to get started but are known to be problematic after a while.
In this particular case, we can easily replace Subject
with the more idiomatic RxJava Observable
that uses create()
directly:
public
Observable
<
Message
>
observe
(
ConnectionFactory
connectionFactory
,
Topic
topic
)
{
return
Observable
.
create
(
subscriber
->
{
try
{
subscribeThrowing
(
subscriber
,
connectionFactory
,
topic
);
}
catch
(
JMSException
e
)
{
subscriber
.
onError
(
e
);
}
});
}
private
void
subscribeThrowing
(
Subscriber
<?
super
Message
>
subscriber
,
ConnectionFactory
connectionFactory
,
Topic
orders
)
throws
JMSException
{
Connection
connection
=
connectionFactory
.
createConnection
();
Session
session
=
connection
.
createSession
(
true
,
AUTO_ACKNOWLEDGE
);
MessageConsumer
consumer
=
session
.
createConsumer
(
orders
);
consumer
.
setMessageListener
(
subscriber:
:
onNext
);
subscriber
.
add
(
onUnsubscribe
(
connection
));
connection
.
start
();
}
private
Subscription
onUnsubscribe
(
Connection
connection
)
{
return
Subscriptions
.
create
(()
->
{
try
{
connection
.
close
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Can't close"
,
e
);
}
});
}
The JMS API provides two ways of receiving messages from a broker: synchronous via blocking receive()
method, and nonblocking, using MessageListener
. The nonblocking API is beneficial for many reasons; for example, it holds less resources like threads and stack memory. Also it aligns beautifully with the Rx style of programming. Rather than creating a MessageListener
instance and calling our subscriber from within it, we can use this terse syntax with method reference:
consumer
.
setMessageListener
(
subscriber:
:
onNext
)
Also, we must take care of resource cleanup and proper error handling. This tiny transformation layer allows us to easily consume JMS messages without worrying about API internals. Here an example using the popular ActiveMQ messaging broker running locally:
import
org.apache.activemq.ActiveMQConnectionFactory
;
import
org.apache.activemq.command.ActiveMQTopic
;
ConnectionFactory
connectionFactory
=
new
ActiveMQConnectionFactory
(
"tcp://localhost:61616"
);
Observable
<
String
>
txtMessages
=
observe
(
connectionFactory
,
new
ActiveMQTopic
(
"orders"
))
.
cast
(
TextMessage
.
class
)
.
flatMap
(
m
->
{
try
{
return
Observable
.
just
(
m
.
getText
());
}
catch
(
JMSException
e
)
{
return
Observable
.
error
(
e
);
}
});
JMS, just like JDBC, has a reputation of heavily using checked JMSException
, even when calling getText()
on a TextMessage
.
To properly handle errors (see “Error Handling” for more details) we use flatMap()
and wrap exceptions.
From that point, you can treat JMS messages flowing in like any other asynchronous and nonblocking stream.
And by the way, we used the cast()
operator that optimistically casts upstream events to a given type, failing with onError()
, otherwise.
cast()
is basically a specialized map()
operator that behaves like map(x -> (TextMessage)x)
.
Polling Periodically for Changes
The worst blocking API that you can work with requires polling for changes.
It provides no mechanism to push changes right at you, even with
callbacks or by blocking indefinitely. The only mechanism this API gives
is asking for the current state, and it is up to you to figure out if it
differs from previous state or not. RxJava has few really powerful
operators that you can apply to retrofit a given API to Rx style.
The first case I want you to consider is a simple method that delivers a
single value that represents state, for example
long getOrderBookLength()
. To track changes we must call this
method frequently enough and capture differences. You can achieve this
in RxJava with a very basic operator composition:
Observable
.
interval
(
10
,
TimeUnit
.
MILLISECONDS
)
.
map
(
x
->
getOrderBookLength
())
.
distinctUntilChanged
()
First we produce a synthetic long
value every 10 milliseconds which
serves as a basic ticking counter. For each such value (that is every
10 milliseconds), we call getOrderBookLength()
. However, the aforementioned
method doesn’t change that often, and we don’t want to flood our
subscribers with lots of irrelevant state changes. Luckily we can simply
say distinctUntilChanged()
and RxJava will transparently skip long
values returned by getOrderBookLength()
that have not changed since
last invocation, as demonstrated in the following marble diagram:
We can apply this pattern even further. Imagine that you are watching for filesystem or database table changes. The only mechanism at your
disposal is taking a current snapshot of files or database records. You
are building an API that will notify clients about every new item.
Obviously, you can use java.nio.file.WatchService
or database triggers, but take this as an educational example. This time, again, we begin by periodically taking a snapshot of current state:
Observable
<
Item
>
observeNewItems
()
{
return
Observable
.
interval
(
1
,
TimeUnit
.
SECONDS
)
.
flatMapIterable
(
x
->
query
())
.
distinct
();
}
List
<
Item
>
query
()
{
//take snapshot of file system directory
//or database table
}
The distinct()
operator keeps a record of all items that passed through
it (see also “Dropping Duplicates Using distinct() and distinctUntilChanged()”). If the same item appears for the
second time, it is simply ignored. That is why we can push the same list
of Item
s every second. The first time they are pushed downstream to all
subscribers. However, when the exact same list appears one second later,
all items were already seen and are therefore discarded. If at some
point in time the list returned from query()
contains one extra
Item
, distinct()
will let it go but discard it the next time.
This simple pattern allows us to replace a bunch of Thread.sleep()
invocations and manual caching with periodic polling.
It is applicable in
many areas, like File Transfer Protocol (FTP) polling, web scraping, and so on.
Multithreading in RxJava
There are third-party APIs that are blocking and there is simply nothing we can do about it. We might not have source code, rewriting might result in too much risk. In that case, we must learn how to deal with blocking code rather than fighting it.
One of the hallmarks of RxJava is declarative concurrency, as opposed to imperative concurrency. Manually creating and managing threads is a thing of the past (compare with “Thread Pool of Connections”) most of us already use managed thread pools (e.g., with ExecutorService
). But RxJava goes one step further: Observable
can be nonblocking just like CompletableFuture
in Java 8 (see “CompletableFuture and Streams”), but unlike the other, it is also lazy. Unless you subscribe, a well-behaving Observable
will not perform any action. But the power of Observable
goes even beyond that.
An asynchronous Observable
is the one that calls your Subscriber
s callback methods (like onNext()
) from a different thread.
Recall “Mastering Observable.create()” in which we explored when subscribe()
is blocking, waiting until all notifications arrive?
In real life, most Observable
s come from sources that are asynchronous by their nature. Chapter 5 is entirely devoted to such Observable
s. But even our simple JMS example from “Replacing Callbacks with Streams”, which uses a built-in, nonblocking API from the JMS specification (MessageListener
interface). This is not enforced or suggested by the type system, but many Observable
s are asynchronous from the very beginning, and you should assume that. A blocking subscribe()
method happens very rarely, when a lambda within Observable.create()
is not backed by any asynchronous process or stream. However, by default (with create()
) everything happens in the client thread (the one that subscribed). If you just poke onNext()
directly within your create()
callback, no multithreading and concurrency is involved whatsoever.
Encountering such an unusual Observable
, we can declaratively select the so-called Scheduler
that will be used to emit values.
In case of CompletableFuture
, we have no control over underlying threads, the API made the decision and in worst case it is impossible to override it. RxJava rarely makes such decisions alone and chooses safe default: client thread and no multithreading involved.
For the purposes of this chapter, we will use a really simple logging “library,”4 which will print a message along with the current thread and number of milliseconds since the start of the program using System.currentTimeMillis()
:
void
log
(
Object
label
)
{
System
.
out
.
println
(
System
.
currentTimeMillis
()
-
start
+
"\t| "
+
Thread
.
currentThread
().
getName
()
+
"\t| "
+
label
);
}
What Is a Scheduler?
RxJava is concurrency-agnostic, and as a matter of fact it does not introduce concurrency on its own. However, some abstractions to deal with threads are exposed to the end user. Also, certain operators cannot work properly without concurrency; see “Other Uses for Schedulers” for some of them. Luckily, the Scheduler
class, the only one you must pay attention to, is fairly simple. In principle it works similarly to ScheduledExecutorService
from java.util.concurrent
—it executes arbitrary blocks of code, possibly in the future. However, to meet Rx contract, it offers some more fine-grained abstractions, which you can see more of in the advanced section “Scheduler implementation details overview”.
Schedulers are used together with subscribeOn()
and observeOn()
operators as well as when creating certain types of Observable
s.
A scheduler only creates instances of Worker
s that are responsible for scheduling and running code.
When RxJava needs to schedule some code it first asks Scheduler
to provide a Worker
and uses the latter to schedule subsequent tasks.
You will find examples of this API later on, but first familiarize yourself with available built-in schedulers:
Schedulers.newThread()
-
This scheduler simply starts a new thread every time it is requested via
subscribeOn()
orobserveOn()
.newThread()
is hardly ever a good choice, not only because of the latency involved when starting a thread, but also because this thread is not reused. Stack space must be allocated up front (typically around one megabyte, as controlled by the-Xss
parameter of the JVM) and the operating system must start new native thread. When theWorker
is done, the thread simply terminates. This scheduler can be useful only when tasks are coarse-grained: it takes a lot of time to complete but there are very few of them, so that threads are unlikely to be reused at all. See also: “Thread per Connection”. In practice, followingSchedulers.io()
is almost always a better choice. Schedulers.io()
-
This scheduler is similar to
newThread()
, but already started threads are recycled and can possibly handle future requests. This implementation works similarly toThreadPoolExecutor
fromjava.util.concurrent
with an unbounded pool of threads. Every time a newWorker
is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused.The name
io()
is not a coincidence. Consider using this scheduler for I/O bound tasks which require very little CPU resources. However they tend to take quite some time, waiting for network or disk. Thus, it is a good idea to have a relatively big pool of threads. Still, be careful with unbounded resources of any kind—in case of slow or unresponsive external dependencies like web services,io()
scheduler might start an enormous number of threads, leading to your very own application becoming unresponsive, as well. See “Managing Failures with Hystrix” for more details how to tackle this problem. Schedulers.computation()
-
You should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code (reading from disk, network, sleeping, waiting for lock, etc.) Because each task executed on this scheduler is supposed to fully utilize one CPU core, executing more such tasks in parallel than there are available cores would not bring much value. Therefore,
computation()
scheduler by default limits the number of threads running in parallel to the value ofavailableProcessors()
, as found in theRuntime.getRuntime()
utility class.If for some reason you need a different number of threads than the default, you can always use the
rx.scheduler.max-computation-threads
system property. By taking less threads you ensure that there is always one or more CPU cores idle, and even under heavy load,computation()
thread pool does not saturate your server. It is not possible to have more computation threads than cores.computation()
scheduler uses unbounded queue in front of every thread, so if the task is scheduled but all cores are occupied, they are queued. In case of load peak, this scheduler will keep the number of threads limited. However, the queue just before each thread will keep growing.Luckily, built-in operators, especially
observeOn()
that we are about to discover in “Declarative Concurrency with observeOn()” ensure that thisScheduler
is not overloaded. Schedulers.from(Executor executor)
-
Scheduler
s are internally more complex thanExecutor
s fromjava.util.concurrent
, so a separate abstraction was needed. But because they are conceptually quite similar, unsurprisingly there is a wrapper that can turnExecutor
intoScheduler
using thefrom()
factory method:
import
com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
rx.Scheduler
;
import
rx.schedulers.Schedulers
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.ThreadPoolExecutor
;
//...
ThreadFactory
threadFactory
=
new
ThreadFactoryBuilder
()
.
setNameFormat
(
"MyPool-%d"
)
.
build
();
Executor
executor
=
new
ThreadPoolExecutor
(
10
,
//corePoolSize
10
,
//maximumPoolSize
0L
,
TimeUnit
.
MILLISECONDS
,
//keepAliveTime, unit
new
LinkedBlockingQueue
<>(
1000
),
//workQueue
threadFactory
);
Scheduler
scheduler
=
Schedulers
.
from
(
executor
);
I am intentionally using this verbose syntax for creating ExecutorService
rather than the more simple version:
import
java.util.concurrent.Executors
;
//...
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
10
);
Although tempting, the Executors
factory class hardcodes several defaults that are impractical or even dangerous in enterprise applications. For examples, it uses unbounded LinkedBlockingQueue
that can grow infinitely, resulting in OutOfMemoryError
for cases in which there are a of large number of outstanding tasks. Also, the default ThreadFactory
uses meaningless thread names like pool-5-thread-3
. Naming threads properly is an invaluable tool when profiling or analyzing thread dumps. Implementing ThreadFactory
from scratch is a bit cumbersome, so we used ThreadFactoryBuilder from Guava. If you are interested in tuning and properly utilizing thread pools even further, see “Thread Pool of Connections” and “Managing Failures with Hystrix”. Creating schedulers from Executor
that we consciously configured is advised for projects dealing with high load. However, because RxJava has no control over independently created threads in an Executor
, it cannot pin threads (that is, try to keep work of the same task on the same thread to improve cache locality). This Scheduler
barely makes sure a single Scheduler.Worker
(see “Scheduler implementation details overview”) processes events sequentially.
Schedulers.immediate()
-
Schedulers.immediate()
is a special scheduler that invokes a task within the client thread in a blocking fashion, rather than asynchronously. Using it is pointless unless some part of your API requires providing a scheduler, whereas you are absolutely fine with default behavior ofObservable
, not involving any threading at all. In fact, subscribing to anObservable
(more on that in a second) viaimmediate()
Scheduler
typically has the same effect as not subscribing with any particular scheduler at all. In general, avoid this scheduler, it blocks the calling thread and is of limited use. Schedulers.trampoline()
-
The
trampoline()
scheduler is very similar toimmediate()
because it also schedules tasks in the same thread, effectively blocking. However, as opposed toimmediate()
, the upcoming task is executed when all previously scheduled tasks complete.immediate()
invokes a given task right away, whereastrampoline()
waits for the current task to finish. Trampoline is a pattern in functional programming that allows implementing recursion without infinitely growing the call stack. This is best explained with an example, first involvingimmediate()
. By the way, notice that we do not interact directly with aScheduler
instance but first create aWorker
. This makes sense as you will quickly see in “Scheduler implementation details overview”.Scheduler
scheduler
=
Schedulers
.
immediate
();
Scheduler
.
Worker
worker
=
scheduler
.
createWorker
();
log
(
"Main start"
);
worker
.
schedule
(()
->
{
log
(
" Outer start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Inner start"
);
sleepOneSecond
();
log
(
" Inner end"
);
});
log
(
" Outer end"
);
});
log
(
"Main end"
);
worker
.
unsubscribe
();
The output is as expected; you could actually replace
schedule()
with a simple method invocation:1044 | main | Main start 1094 | main | Outer start 2097 | main | Inner start 3097 | main | Inner end 3100 | main | Outer end 3100 | main | Main end
Inside the
Outer
block weschedule()
Inner
block that gets invoked immediately, interrupting theOuter
task. WhenInner
is done, the control goes back toOuter
. Again, this is simply a convoluted way of invoking a task in a blocking manner indirectly viaimmediate()
Scheduler
. But what happens if we replaceSchedulers.immediate()
withSchedulers.trampoline()
? The output is quite different:1030 | main | Main start 1096 | main | Outer start 2101 | main | Outer end 2101 | main | Inner start 3101 | main | Inner end 3101 | main | Main end
Do you see how
Outer
manages to complete beforeInner
even starts? This is because theInner
task was queued inside thetrampoline()
Scheduler
, which was already occupied by theOuter
task. WhenOuter
finished, the first task from the queue (Inner
) began. We can go even further to make sure you understand the difference:log
(
"Main start"
);
worker
.
schedule
(()
->
{
log
(
" Outer start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Middle start"
);
sleepOneSecond
();
worker
.
schedule
(()
->
{
log
(
" Inner start"
);
sleepOneSecond
();
log
(
" Inner end"
);
});
log
(
" Middle end"
);
});
log
(
" Outer end"
);
});
log
(
"Main end"
);
The
Worker
fromimmediate()
Scheduler
outputs the following:1029 | main | Main start 1091 | main | Outer start 2093 | main | Middle start 3095 | main | Inner start 4096 | main | Inner end 4099 | main | Middle end 4099 | main | Outer end 4099 | main | Main end
Versus the
trampoline()
worker:1041 | main | Main start 1095 | main | Outer start 2099 | main | Outer end 2099 | main | Middle start 3101 | main | Middle end 3101 | main | Inner start 4102 | main | Inner end 4102 | main | Main end
Schedulers.test()
-
This
Scheduler
is used only for testing purposes, and you will never see it in production code. Its main advantage is the ability to arbitrarily advance the clock, simulating time passing by.TestScheduler
is described to a great extent in “Schedulers in Unit Testing”.Scheduler
s alone are not very interesting. If you want to discover how they work internally and how to implement your own, check out the next section.
Scheduler implementation details overview
Note
This section is entirely optional, feel free to jump straight to “Declarative Subscription with subscribeOn()” if you are not interested in implementation details.
Scheduler
not only decouples tasks and their execution (typically by running them in another thread), but it also abstracts away the clock, as we will learn in “Virtual Time”. The API of the Scheduler
is a bit simpler compared to, for example, ScheduledExecutorService
:
abstract
class
Scheduler
{
abstract
Worker
createWorker
();
long
now
();
abstract
static
class
Worker
implements
Subscription
{
abstract
Subscription
schedule
(
Action0
action
);
abstract
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
);
long
now
();
}
}
When RxJava wants to schedule a task (presumably, but not necessarily in the background), it must first ask for an instance of Worker
. It is the Worker
that allows scheduling the task without any delay or at some point in time. Both Scheduler
and Worker
have an overridable source of time (now()
method) that it uses to determine when a given task is supposed to run.
Naively, you can think of a Scheduler
like a thread pool and a Worker
like a thread inside that pool.
The separation between Scheduler
and Worker
is necessary to easily implement some of the guidelines enforced by the Rx contract, namely invoking Subscriber
’s method sequentially, not concurrently. Worker
’s contract provides just that: two tasks scheduled on the same Worker
will never run concurrently. However, independent Worker
s from the same Scheduler
can run tasks concurrently just fine.
Rather than going through the API, let’s analyze the source code of an existing Scheduler
, namely HandlerScheduler
, as found in the RxAndroid project. This Scheduler
simply runs all scheduled tasks on an Android UI thread. Updating the user interface is only allowed from that thread (see “Android Development with RxJava” for more details). This is similar to the Event Dispatch Thread (EDT) as found in Swing, where most of the updates to windows and components must be executed within dedicated thread (EDT). Unsurprisingly, there is also the RxSwing5 project for that.
The code snippet that follows is a stripped down and incomplete class from RxAndroid for education purposes only:
package
rx
.
android
.
schedulers
;
import
android.os.Handler
;
import
android.os.Looper
;
import
rx.Scheduler
;
import
rx.Subscription
;
import
rx.functions.Action0
;
import
rx.internal.schedulers.ScheduledAction
;
import
rx.subscriptions.Subscriptions
;
import
java.util.concurrent.TimeUnit
;
public
final
class
SimplifiedHandlerScheduler
extends
Scheduler
{
@Override
public
Worker
createWorker
()
{
return
new
HandlerWorker
();
}
static
class
HandlerWorker
extends
Worker
{
private
final
Handler
handler
=
new
Handler
(
Looper
.
getMainLooper
());
@Override
public
void
unsubscribe
()
{
//Implementation coming soon...
}
@Override
public
boolean
isUnsubscribed
()
{
//Implementation coming soon...
return
false
;
}
@Override
public
Subscription
schedule
(
final
Action0
action
)
{
return
schedule
(
action
,
0
,
TimeUnit
.
MILLISECONDS
);
}
@Override
public
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
)
{
ScheduledAction
scheduledAction
=
new
ScheduledAction
(
action
);
handler
.
postDelayed
(
scheduledAction
,
unit
.
toMillis
(
delayTime
));
scheduledAction
.
add
(
Subscriptions
.
create
(()
->
handler
.
removeCallbacks
(
scheduledAction
)));
return
scheduledAction
;
}
}
}
Details of the Android API are not important at the moment. What happens here is that every time we schedule something on a HandlerWorker
, the block of code is passed to a special postDelayed()
method that executes it on a dedicated Android thread. There is just one such thread, so events are serialized not only within, but also across Worker
s.
Before we pass action
to be executed, we wrap it with ScheduledAction
, which implements both Runnable
and Subscription
. RxJava is lazy whenever it can be—this also applies to scheduling tasks. If for any reason you decide that a given action
should not be executed after all (this makes sense when the action was scheduled in the future, not immediately), simply run unsubscribe()
on the Subscription
returned from schedule()
. It is the responsibility of the Worker
to properly handle unsubscription (best effort at least).
Client code can also decide to unsubscribe()
from Worker
in its entirety. This should unsubscribe all queued tasks as well as release the Worker
so that the underlying thread can potentially be reused later. The following code snippet enhances the SimplifiedHandlerScheduler
by adding Worker
unsubscription flow (only modified methods are included):
private
CompositeSubscription
compositeSubscription
=
new
CompositeSubscription
();
@Override
public
void
unsubscribe
()
{
compositeSubscription
.
unsubscribe
();
}
@Override
public
boolean
isUnsubscribed
()
{
return
compositeSubscription
.
isUnsubscribed
();
}
@Override
public
Subscription
schedule
(
Action0
action
,
long
delayTime
,
TimeUnit
unit
)
{
if
(
compositeSubscription
.
isUnsubscribed
())
{
return
Subscriptions
.
unsubscribed
();
}
final
ScheduledAction
scheduledAction
=
new
ScheduledAction
(
action
);
scheduledAction
.
addParent
(
compositeSubscription
);
compositeSubscription
.
add
(
scheduledAction
);
handler
.
postDelayed
(
scheduledAction
,
unit
.
toMillis
(
delayTime
));
scheduledAction
.
add
(
Subscriptions
.
create
(()
->
handler
.
removeCallbacks
(
scheduledAction
)));
return
scheduledAction
;
}
In “Controlling Listeners by Using Subscription and Subscriber<T>”, we explored the Subscription
interface but never really looked at the implementation details. CompositeSubscription
is one out of many implementations available that itself is just a container for child Subscription
s (a Composite design pattern). Unsubscribing from CompositeSubscription
means unsubscribing from all children. You also can add and remove the children managed by CompositeSubscription
.
In our custom Scheduler
, CompositeSubscription
is used to track all Subscription
s from the previous schedule()
invocations (see compositeSubscription.add(scheduledAction)
). On the other hand, the child ScheduledAction
needs to know about its parent (see: addParent()
) so that it can remove itself when the action is completed or canceled. Otherwise, Worker
would accumulate stale child Subscription
s forever. When the client code decides that it no longer needs a HandlerWorker
instance, it unsubscribes from it. The unsubscription is propagated to all (if any) outstanding child Subscription
s.
That was a very brief introduction to Scheduler
s in RxJava. The details of their internals are not that useful in daily work; as a matter of fac, they are designed in such as way as to make using RxJava more intuitive and predictable. That being said, let’s quickly see how Scheduler
s solve many concurrency problems in Rx.
Declarative Subscription with subscribeOn()
In “Mastering Observable.create()” we saw that subscribe()
by default uses the client thread. To recap, here is the most simple subscription that you can come up with where no threading was involved whatsoever:
Observable
<
String
>
simple
()
{
return
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onCompleted
();
});
}
//...
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
final
Observable
<
String
>
obs2
=
obs
.
map
(
x
->
x
)
.
filter
(
x
->
true
);
log
(
"Transformed"
);
obs2
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Notice where the logging statements are placed and study the output carefully, especially with regard to which thread invoked the print statement:
33 | main | Starting 120 | main | Created 128 | main | Transformed 133 | main | Subscribed 133 | main | Got A 133 | main | Got B 133 | main | Completed 134 | main | Exiting
Pay attention: the order of statements is absolutely predictable. First, every line of code in the preceding code snippet runs in the main
thread, there are no thread pools and no asynchronous emission of events involved. Second, the order of execution might not be entirely clear at first sight.
When the program starts, it prints Starting
, which is understandable. After creating an instance of Observable<String>
, we see the Created
message. Notice that Subscribed
appears later, when we actually subscribe. Without the subscribe()
invocation, the block of code inside Observable.create()
is never executed. Moreover, even map()
and filter()
operators do not have any visible side effects, notice how the Transformed
message is printed even before Subscribed
.
Later, we receive all emitted events and completion notification. Finally, the Exiting
statement is printed and the program can return. This is an interesting observation—subscribe()
was supposed to be registering a callback when events appear asynchronously. This is the assumption that you should make by default. However in this case there is no threading involved and subscribe()
is actually blocking. How is this so?
There is an inherent but hidden connection between subscribe()
and create()
. Every time you call subscribe()
on an Observable
, its OnSubscribe
callback method is invoked (wrapping the lambda expression you passed to create()
). It receives your Subscriber
as an argument. By default, this happens in the same thread and is blocking, so whatever you do inside create()
will block subscribe()
. If your create()
method sleeps for few seconds, subscribe()
will block. Moreover, if there are operators between Observable.create()
and your Subscriber
(lambda acting as callback), all these operators are invoked on behalf of the thread that invoked subscribe()
.
RxJava does not inject any concurrency facilities by default between Observable
and Subscriber
.
The reason behind that is that Observable
s tend to be backed by other concurrency mechanisms like event loops or custom threads, so Rx lets you take full control rather than imposing any convention.
This observation prepares the landscape for the subscribeOn()
operator. By inserting subscribeOn()
anywhere between an original Observable
and subscribe()
, you declaratively select Scheduler
where the OnSubscribe
callback method will be invoked. No matter what you do inside create()
, this work is offloaded to an independent Scheduler
and your subscribe()
invocation no longer blocks:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
35 | main | Starting 112 | main | Created 123 | main | Exiting 123 | Sched-A-0 | Subscribed 124 | Sched-A-0 | Got A 124 | Sched-A-0 | Got B 124 | Sched-A-0 | Completed
Do you see how the main
thread exits before Observable
even begins emitting any values? Technically, the order of log messages is no longer that predictable because two threads are running concurrently: main
, which subscribed and wants to exit, and Sched-A-0
, which emits events as soon as someone subscribed. The schedulerA
as well as Sched-A-0
thread come from the following sample schedulers we built for illustration purposes:
import
static
java
.
util
.
concurrent
.
Executors
.
newFixedThreadPool
;
ExecutorService
poolA
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-A-%d"
));
Scheduler
schedulerA
=
Schedulers
.
from
(
poolA
);
ExecutorService
poolB
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-B-%d"
));
Scheduler
schedulerB
=
Schedulers
.
from
(
poolB
);
ExecutorService
poolC
=
newFixedThreadPool
(
10
,
threadFactory
(
"Sched-C-%d"
));
Scheduler
schedulerC
=
Schedulers
.
from
(
poolC
);
private
ThreadFactory
threadFactory
(
String
pattern
)
{
return
new
ThreadFactoryBuilder
()
.
setNameFormat
(
pattern
)
.
build
();
}
These schedulers will be used across all examples, but they are fairly easy to remember. Three independent schedulers, each managing 10 threads from an ExecutorService
. To make the output nicer, each thread pool has a distinct naming pattern.
Before we begin, you must understand that in mature applications, in terms of Rx adoption, subscribeOn()
is very seldom used.
Normally, Observable
s come from sources that are naturally asynchronous (like RxNetty, see “Nonblocking HTTP Server with Netty and RxNetty”) or apply scheduling on their own (like Hystrix, see “Managing Failures with Hystrix”).
You should treat subscribeOn()
only in special cases when the underlying Observable
is known to be synchronous (create()
being blocking). However, subscribeOn()
is still a much better solution than hand-crafted threading within create()
:
//Don't do this
Observable
<
String
>
obs
=
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
Runnable
code
=
()
->
{
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onCompleted
();
};
new
Thread
(
code
,
"Async"
).
start
();
});
The preceding code mixes two concepts: producing events and choosing concurrency strategy. Observable
should be responsible only for production logic, whereas it is only the client code that can make judicious decision about concurrency.
Remember that Observable
is lazy but also immutable, in the sense that subscribeOn()
affects only downstream subscribers, if someone subscribes to the exact same Observable
without subscribeOn()
in between, no concurrency will be involved by default.
Keep in mind that in this chapter our focus is on existing applications and introducing RxJava gradually. The subscribeOn()
operator is quite useful in such circumstances; however, after you grasp reactive extensions and begin using them on large scale, the value of subscribeOn()
diminishes.
In entirely reactive software stacks, as found for example at Netflix , subscribeOn()
is almost never used, yet all Observable
s are asynchronous.
Most of the time Observable
s come from asynchronous sources and they are treated as asynchronous by default.
Therefore, using subscribeOn()
is very limited, mostly when retrofitting existing APIs or libraries.
In Chapter 5, we write write truly asynchronous applications without explicit subscribeOn()
and Scheduler
s altogether.
subscribeOn() Concurrency and Behavior
There are several nuances regarding how subscribeOn()
works. First, curious reader should be wondering what happens if two invocations of the subscribeOn()
appear between Observable
and subscribe()
. The answer is simple: subscribeOn()
closest to the original Observable
wins.
This has important practical implications.
If you are designing an API and you use subscribeOn()
internally, the client code has no way of overriding the Scheduler
of your choice.
This can be a conscious design decision; after all, the API designer might know best which Scheduler
is appropriate.
On the other hand, providing an overloaded version of said API that allows overriding the chosen Scheduler
is always a good idea.
Let’s study how subscribeOn()
behaves:
log
(
"Starting"
);
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
//many other operators
.
subscribeOn
(
schedulerB
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
The output reveals only schedulerA
’s threads:
17 | main | Starting 73 | main | Created 83 | main | Exiting 84 | Sched-A-0 | Subscribed 84 | Sched-A-0 | Got A 84 | Sched-A-0 | Got B 84 | Sched-A-0 | Completed
Interestingly, subscribing on schedulerB
is not entirely ignored in favor of schedulerA
.
schedulerB
is still used for a short period of time, but it barely schedules new action on schedulerA
, which does all the work.
Thus, multiple subscribeOn()
are not only ignored, but also introduce small overhead.
Speaking of operators, we said that the create()
method used when there is a new Subscriber
is executed within the provided scheduler (if any). But which thread executes all these transformations happening between create()
and subscribe()
? We already know that when all operators are executed by default in the same thread (scheduler), no concurrency is involved by default:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
this
::
log
)
.
map
(
x
->
x
+
'1'
)
.
doOnNext
(
this
::
log
)
.
map
(
x
->
x
+
'2'
)
.
subscribeOn
(
schedulerA
)
.
doOnNext
(
this
::
log
)
.
subscribe
(
x
->
log
(
"Got "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
We sprinkled the pipeline of operators occasionally with doOnNext()
to see which thread is in control at this point. Remember that the position of subscribeOn()
is not relevant, it can be right after Observable
or just before subscribe()
. The output is unsurprising:
20 | main | Starting 104 | main | Created 123 | main | Exiting 124 | Sched-A-0 | Subscribed 124 | Sched-A-0 | A 124 | Sched-A-0 | A1 124 | Sched-A-0 | A12 124 | Sched-A-0 | Got A12 124 | Sched-A-0 | B 124 | Sched-A-0 | B1 124 | Sched-A-0 | B12 125 | Sched-A-0 | Got B12
Watch how create()
is invoked and produces A
and B
events. These events travel sequentially through the scheduler’s thread to finally reach the Subscriber
. Many newcomers to RxJava believe that using a Scheduler
with a large number of threads will automatically fork processing of events concurrently and somehow join all the results together in the end. This is not the case. RxJava creates a single Worker
instance (see: “Scheduler implementation details overview”) for the entire pipeline, mostly to guarantee sequential processing of events.
This means that if one of your operators is particularly slow—for example, map()
reading data from disk in order to transform events passing by—this costly operation will be invoked within the same thread. A single broken operator can slow down the entire pipeline, from production to consumption. This is an antipattern in RxJava, operators should be nonblocking, fast, and as pure as possible.
Again, flatMap()
comes to the rescue. Rather than blocking within map()
, we can invoke flatMap()
and asynchronously collect all the results.
Therefore, flatMap()
and merge()
are the operators when we want to achieve true parallelism.
But even with flatMap()
it is not obvious. Imagine a grocery store (let’s call it “RxGroceries”) that provides an API for purchasing goods:
class
RxGroceries
{
Observable
<
BigDecimal
>
purchase
(
String
productName
,
int
quantity
)
{
return
Observable
.
fromCallable
(()
->
doPurchase
(
productName
,
quantity
));
}
BigDecimal
doPurchase
(
String
productName
,
int
quantity
)
{
log
(
"Purchasing "
+
quantity
+
" "
+
productName
);
//real logic here
log
(
"Done "
+
quantity
+
" "
+
productName
);
return
priceForProduct
;
}
}
Obviously, the implementation of doPurchase()
is irrelevant here, just imagine it takes some time and resources to complete.
We simulate business logic by adding artificial sleep of one second, slightly higher if quantity
is bigger.
Blocking Observable
s like the one returned from purchase()
are unusual in a real application, but let’s keep it this way for educational purposes.
When purchasing several goods we would like to parallelize as much as possible and calculate total price for all goods in the end. The first attempt is fruitless:
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
subscribeOn
(
schedulerA
)
//BROKEN!!!
.
map
(
prod
->
rxGroceries
.
doPurchase
(
prod
,
1
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
The result is correct, it is an Observable
with just a single value: total price, calculated using reduce()
. For each product, we invoke doPurchase()
with quantity
one. However, despite using schedulerA
backed by a thread pool of 10, the code is entirely sequential:
144 | Sched-A-0 | Purchasing 1 bread 1144 | Sched-A-0 | Done 1 bread 1146 | Sched-A-0 | Purchasing 1 butter 2146 | Sched-A-0 | Done 1 butter 2146 | Sched-A-0 | Purchasing 1 milk 3147 | Sched-A-0 | Done 1 milk 3147 | Sched-A-0 | Purchasing 1 tomato 4147 | Sched-A-0 | Done 1 tomato 4147 | Sched-A-0 | Purchasing 1 cheese 5148 | Sched-A-0 | Done 1 cheese
Notice how each product blocks subsequent ones from processing. When the purchase of bread is done, butter begins immediately, but not earlier. Strangely, even replacing map()
with flatMap()
does not help, and the output is exactly the same:
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
subscribeOn
(
schedulerA
)
.
flatMap
(
prod
->
rxGroceries
.
purchase
(
prod
,
1
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
The code does not work concurrently because there is just a single flow of events, which by design must run sequentially. Otherwise, your Subscriber
would need to be aware of concurrent notifications (onNext()
, onComplete()
, etc.), so it is a fair compromise. Luckily, the idiomatic solution is very close. The main Observable
emitting products cannot be parallelized. However, for each product, we create a new, independent Observable
as returned from purchase()
. Because they are independent, we can safely schedule each one of them concurrently:
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"milk"
,
"tomato"
,
"cheese"
)
.
flatMap
(
prod
->
rxGroceries
.
purchase
(
prod
,
1
)
.
subscribeOn
(
schedulerA
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
Can you spot where subscribeOn()
is? The main Observable
is not really doing anything, so a special thread pool is unnecessary. However each substream created within flatMap()
is supplied with a schedulerA
. Every time subscribeOn()
is used to the Scheduler
gets a chance to return a new Worker
, and therefore a separate thread (simplifying a bit):
113 | Sched-A-1 | Purchasing 1 butter 114 | Sched-A-0 | Purchasing 1 bread 125 | Sched-A-2 | Purchasing 1 milk 125 | Sched-A-3 | Purchasing 1 tomato 126 | Sched-A-4 | Purchasing 1 cheese 1126 | Sched-A-2 | Done 1 milk 1126 | Sched-A-0 | Done 1 bread 1126 | Sched-A-1 | Done 1 butter 1128 | Sched-A-3 | Done 1 tomato 1128 | Sched-A-4 | Done 1 cheese
Finally, we achieved true concurrency. Each purchase operation now begins at the same time and they all eventually finish. The flatMap()
operator is carefully designed and implemented so that it collects all events from all independent streams and pushes them downstream sequentially. However, as we already learned in “Order of Events After flatMap()”, we can no longer rely on the order of downstream events—they neither begin nor complete in the same order as they were emitted (the original sequence began at bread). When events reach the reduce()
operator, they are already sequential and well behaving.
By now, you should slowly move away from the classic Thread
model and understand how Schedulers
work. But if you find it difficult, here is a simple analogy:
-
Observable
without anyScheduler
works like a single-threaded program with blocking method calls passing data between one another. -
Observable
with a singlesubscribeOn()
is like starting a big task in the backgroundThread
. The program within thatThread
is still sequential, but at least it runs in the background. -
Observable
usingflatMap()
where each internalObservable
hassubscribeOn()
works likeForkJoinPool
fromjava.util.concurrent
, where each substream is a fork of execution andflatMap()
is a safe join stage.
Of course, the preceding tips only apply to blocking Observable
s, which are rarely seen in real applications.
If your underlying Observable
s are already asynchronous, achieving concurrency is a matter of understanding how they are combined and when subscription occurs.
For example, merge()
on two streams will subscribe to both of them concurrently, whereas the concat()
operator waits until the first stream finishes before it subscribes to the second one.
Batching Requests Using groupBy()
Did you notice that RxGroceries.purchase()
takes productName
and quantity
even though the quantity was always one? What if our grocery list had some products multiple times, indicating bigger demand? The first naive implementation simply sends the same request—for example, for egg, multiple times, each time asking for one. Fortunately, we can declaratively batch such requests by using groupBy()
—and this still works with declarative concurrency:
import
org.apache.commons.lang3.tuple.Pair
;
Observable
<
BigDecimal
>
totalPrice
=
Observable
.
just
(
"bread"
,
"butter"
,
"egg"
,
"milk"
,
"tomato"
,
"cheese"
,
"tomato"
,
"egg"
,
"egg"
)
.
groupBy
(
prod
->
prod
)
.
flatMap
(
grouped
->
grouped
.
count
()
.
map
(
quantity
->
{
String
productName
=
grouped
.
getKey
();
return
Pair
.
of
(
productName
,
quantity
);
}))
.
flatMap
(
order
->
store
.
purchase
(
order
.
getKey
(),
order
.
getValue
())
.
subscribeOn
(
schedulerA
))
.
reduce
(
BigDecimal:
:
add
)
.
single
();
This code is quite complex, so before revealing the output, let’s quickly go through it. First, we group products simply by their name, thus identity function prod -> prod
. In return we get an awkward Observable<GroupedObservable<String, String>>
. There is nothing wrong with that. Next, flatMap()
receives each GroupedObservable<String, String>
, representing all products of the same name. So, for example, there will be an ["egg", "egg", "egg"]
Observable
there with a key "egg"
, as well. If groupBy()
used a different key function, like prod.length()
, the same sequence would have a key 3
.
At this point, within flatMap()
we need to construct an Observable
of type Pair<String, Integer>
which represents every unique product and its quantity. Both count()
and map()
return an Observable
, so everything lines up perfectly. Second flatMap()
receives order
of type Pair<String, Integer>
and makes a purchase, this time the quantity can be bigger. The output looks perfect; notice that bigger orders are slightly slower, but still it is much faster than having several repeated requests:
164 | Sched-A-0 | Purchasing 1 bread 165 | Sched-A-1 | Purchasing 1 butter 166 | Sched-A-2 | Purchasing 3 egg 166 | Sched-A-3 | Purchasing 1 milk 166 | Sched-A-4 | Purchasing 2 tomato 166 | Sched-A-5 | Purchasing 1 cheese 1151 | Sched-A-0 | Done 1 bread 1178 | Sched-A-1 | Done 1 butter 1180 | Sched-A-5 | Done 1 cheese 1183 | Sched-A-3 | Done 1 milk 1253 | Sched-A-4 | Done 2 tomato 1354 | Sched-A-2 | Done 3 egg
If you believe that your system can benefit from batching this way or the other, check out “Batching and Collapsing Commands”.
Declarative Concurrency with observeOn()
Believe it or not, concurrency in RxJava can be described by two operators: the aformentioned subscribeOn()
and observeOn()
. They seem very similar and are confusing to newcomers, but their semantics are actually quite clear and reasonable.
subscribeOn()
allows choosing which Scheduler
will be used to invoke OnSubscribe
(lambda expression inside create()
).
Therefore, any code inside create()
is pushed to a different thread—for example, to avoid blocking the main thread.
Conversely, observeOn()
controls which Scheduler
is used to invoke downstream Subscriber
s occurring after observeOn()
.
For example, calling create()
happens in the io()
Scheduler
(via subscribeOn(io())
) to avoid blocking the user interface.
However, updating the user interface widgets must happen in the UI thread (both Swing and Android have this constraint), so we use observeOn()
for example with AndroidSchedulers.mainThread()
before operators or subscribers changing UI.
This way we can use one Scheduler
to handle create()
and all operators up to the first observeOn()
, but other(s) to apply transformations.
This is best explained with an example:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
x
->
log
(
"Found 1: "
+
x
))
.
observeOn
(
schedulerA
)
.
doOnNext
(
x
->
log
(
"Found 2: "
+
x
))
.
subscribe
(
x
->
log
(
"Got 1: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
observeOn()
occurs somewhere in the pipeline chain, and this time, as opposed to subscribeOn()
, the position of observeOn()
is quite important. No matter what Scheduler
was running operators above observeOn()
(if any), everything below uses the supplied Scheduler
. In this example, there is no subscribeOn()
, so the default is applied (no concurrency):
23 | main | Starting 136 | main | Created 163 | main | Subscribed 163 | main | Found 1: A 163 | main | Found 1: B 163 | main | Exiting 163 | Sched-A-0 | Found 2: A 164 | Sched-A-0 | Got 1: A 164 | Sched-A-0 | Found 2: B 164 | Sched-A-0 | Got 1: B 164 | Sched-A-0 | Completed
All of the operators above observeOn
are executed within client thread, which happens to be the default in RxJava. But below observeOn()
, the operators are executed within the supplied Scheduler
. This will become even more obvious when both subscribeOn()
and multiple observeOn()
occur within the pipeline:
log
(
"Starting"
);
final
Observable
<
String
>
obs
=
simple
();
log
(
"Created"
);
obs
.
doOnNext
(
x
->
log
(
"Found 1: "
+
x
))
.
observeOn
(
schedulerB
)
.
doOnNext
(
x
->
log
(
"Found 2: "
+
x
))
.
observeOn
(
schedulerC
)
.
doOnNext
(
x
->
log
(
"Found 3: "
+
x
))
.
subscribeOn
(
schedulerA
)
.
subscribe
(
x
->
log
(
"Got 1: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Can you predict the output? Remember, everything below observeOn()
is run within the supplied Scheduler
, of course until another observeOn()
is encountered. Additionally subscribeOn()
can occur anywhere between Observable
and subscribe()
, but this time it only affects operators down to the first observeOn()
:
21 | main | Starting 98 | main | Created 108 | main | Exiting 129 | Sched-A-0 | Subscribed 129 | Sched-A-0 | Found 1: A 129 | Sched-A-0 | Found 1: B 130 | Sched-B-0 | Found 2: A 130 | Sched-B-0 | Found 2: B 130 | Sched-C-0 | Found 3: A 130 | Sched-C-0 | Got: A 130 | Sched-C-0 | Found 3: B 130 | Sched-C-0 | Got: B 130 | Sched-C-0 | Completed
Subscription occurs in schedulerA
because that is what we specified in subscribeOn()
. Also "Found 1"
operator was executed within that Scheduler
because it is before the first observeOn()
. Later, the situation becomes more interesting. observeOn()
switches current Scheduler
to schedulerB
, and "Found 2"
is using this one, instead. The last observeOn(schedulerC)
affects both "Found 3"
operator as well as Subscriber
. Remember that Subscriber
works within the context of the last encountered Scheduler
.
subscribeOn()
and observeOn()
work really well together when you want to physically decouple producer (Observable.create()
) and consumer (Subscriber
). By default, there is no such decoupling, and RxJava simply uses the same thread. subscribeOn()
only is not enough, we simply choose a different thread.
observeOn()
is better, but then we block the client thread in case of synchronous Observable
s.
Because most of the operators are nonblocking and lambda expressions used inside them tend to be short and cheap, typically there is just one subscribeOn()
and observeOn()
in the pipeline of operators. subscribeOn()
can be placed close to the original Observable
to improve readability, whereas observeOn()
is close to subscribe()
so that only Subscriber
uses that special Scheduler
, other operators rely on the Scheduler
from subscribeOn()
.
Here is a more advanced program that takes advantage of these two operators:
log
(
"Starting"
);
Observable
<
String
>
obs
=
Observable
.
create
(
subscriber
->
{
log
(
"Subscribed"
);
subscriber
.
onNext
(
"A"
);
subscriber
.
onNext
(
"B"
);
subscriber
.
onNext
(
"C"
);
subscriber
.
onNext
(
"D"
);
subscriber
.
onCompleted
();
});
log
(
"Created"
);
obs
.
subscribeOn
(
schedulerA
)
.
flatMap
(
record
->
store
(
record
).
subscribeOn
(
schedulerB
))
.
observeOn
(
schedulerC
)
.
subscribe
(
x
->
log
(
"Got: "
+
x
),
Throwable:
:
printStackTrace
,
()
->
log
(
"Completed"
)
);
log
(
"Exiting"
);
Where store()
is a simple nested operation:
Observable
<
UUID
>
store
(
String
s
)
{
return
Observable
.
create
(
subscriber
->
{
log
(
"Storing "
+
s
);
//hard work
subscriber
.
onNext
(
UUID
.
randomUUID
());
subscriber
.
onCompleted
();
});
}
The production of events occurs in schedulerA
, but each event is processed independently using schedulerB
to improve concurrency, a technique we learned in “subscribeOn() Concurrency and Behavior”. The subscription in the end happens in yet another schedulerC
. We are pretty sure you understand by now which Scheduler
/thread will execute which action, but just in case (empty lines added for clarity):
26 | main | Starting 93 | main | Created 121 | main | Exiting 122 | Sched-A-0 | Subscribed 124 | Sched-B-0 | Storing A 124 | Sched-B-1 | Storing B 124 | Sched-B-2 | Storing C 124 | Sched-B-3 | Storing D 1136 | Sched-C-1 | Got: 44b8b999-e687-485f-b17a-a11f6a4bb9ce 1136 | Sched-C-1 | Got: 532ed720-eb35-4764-844e-690327ac4fe8 1136 | Sched-C-1 | Got: 13ddf253-c720-48fa-b248-4737579a2c2a 1136 | Sched-C-1 | Got: 0eced01d-3fa7-45ec-96fb-572ff1e33587 1137 | Sched-C-1 | Completed
observeOn()
is especially important for applications with a UI for which we do not want to block the UI event-dispatching thread.
On Android (see “Android Development with RxJava”) or Swing, some actions like updating the UI must be executed within a specific thread.
But doing too much in that thread renders your UI unresponsive.
In these cases, you put observeOn()
close to subscribe()
so that code within the subscription is invoked within the context of a particular Scheduler
(like UI-thread).
However, other transformations, even rather cheap, should be executed outside UI thread.
On the server, observeOn()
is seldom used because the true source of concurrency is built into most Observable
s.
This leads to an interesting conclusion: RxJava controls concurrency with just two operators (subscribeOn()
and observeOn()
), but the more you use reactive extensions, the less frequently you will see these in production code.
Other Uses for Schedulers
There are numerous operators that by default use some Scheduler
.
Typically, Schedulers.computation()
is used if none is supplied—JavaDoc always makes it clear.
For example, the delay()
operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler
:
Observable
.
just
(
'A'
,
'B'
)
.
delay
(
1
,
SECONDS
,
schedulerA
)
.
subscribe
(
this
::
log
);
Without supplying a custom schedulerA
, all operators below delay()
would use the computation()
Scheduler
. There is nothing inherently wrong with that; however, if your Subscriber
is blocked on I/O it would consume one Worker
from globally shared computation()
scheduler, possibly affecting the entire system. Other important operators that support custom Scheduler
are: interval()
, range()
, timer()
, repeat()
, skip()
, take()
, timeout()
, and several others that have yet to be introduced.
If you do not provide a scheduler to such operators, computation()
Scheduler
is utilized, which is a safe default in most cases.
Mastering schedulers is essential to writing scalable and safe code using RxJava. The difference between subscribeOn()
and observeOn()
is especially important under high load where every task must be executed precisely when we expect. In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Scheduler
s are needed. But there is always this one API or dependency that requires blocking code.
Last but not least, we must be sure that Scheduler
s used downstream can keep up with the load generated by Scheduler
s upstream. But this danger will be explained in great detail in Chapter 6.
Summary
This chapter described several patterns in traditional applications that can be replaced with RxJava. I hope you understand by now that high-frequency trading or streaming posts from social media are not the only
use cases for RxJava. As a matter of fact, almost any API can be
seamlessly replaced with Observable
. Even if you don’t want or need
the power of reactive extensions at the moment, it will allow you to
evolve implementation without introducing backward-incompatible changes.
Moreover, it is the client that eventually harvests all the possibilities
offered by RxJava, like laziness, declarative concurrency, or
asynchronous chaining. Even better, because of seamless conversion from
Observable
to BlockingObservable
, traditional clients can consume
your API as they want, and you can always provide a simple bridge layer.
You should be fairly confident with RxJava and understand the benefits
of applying it even in legacy systems. Undoubtedly, working with reactive
Observable
s is more challenging and has a somewhat steep learning curve.
But the advantages and possibilities of growth simply can’t be
exaggerated. Imagine if we could write entire applications using reactive
extensions, from top to bottom? Like a greenfield project for which we have
control over every API, interface, and external system. Chapter 5 will
discuss how you can write such an application and what the
implications are.
1 In fact, RxJava tries to stay on the same thread via thread affinity in the event loop model to take advantage of this, as well.
2 See also “Bulkhead Pattern and Fail-Fast”
3 Compare it to lazy evaluation of expressions in Haskell.
4 Obviously, for any real project, you will use a production-grade logging system like Logback or Log4J 2.
Get Reactive Programming with RxJava now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.