Chapter 1. Reactive Programming with RxJava
RxJava is a specific implementation of reactive programming for Java and Android that is influenced by functional programming. It favors function composition, avoidance of global state and side effects, and thinking in streams to compose asynchronous and event-based programs. It begins with the observer pattern of producer/consumer callbacks and extends it with dozens of operators that allow composing, transforming, scheduling, throttling, error handling, and lifecycle management.
RxJava is a mature open source library that has found broad adoption both on the server and on Android mobile devices. Along with the library, an active community of developers has built up around RxJava and reactive programming to contribute to the project, speak, write, and help one another.
This chapter will provide an overview of RxJava—what it is and how it works—and the rest of this book will take you through all of the details of how to use and apply it in your applications. You can begin reading this book with no prior experience with reactive programming, but we will start at the beginning and take you through the concepts and practices of RxJava so that you can apply its strengths to your use cases.
Reactive Programming and RxJava
Reactive programming is a general programming term that is focused on reacting to changes, such as data values or events. It can and often is done imperatively. A callback is an approach to reactive programming done imperatively. A spreadsheet is a great example of reactive programming: cells dependent on other cells automatically “react” when those other cells change.
On today’s computers everything ends up being imperative at some point as it hits the operating system and hardware. The computer must be told explicitly what needs to be done and how to do it. Humans do not think like CPUs and related systems, so we add abstractions. Reactive-functional programming is an abstraction, just like our higher-level imperative programming idioms are abstractions for the underlying binary and assembly instructions. The fact that everything ends up imperative is important to remember and understand because it helps us with the mental model of what reactive-functional programming is addressing and how it ultimately executes—there is no magic.
Reactive-functional programming therefore is an approach to programming—an abstraction on top of imperative systems—that allows us to program asynchronous and event-driven use cases without having to think like the computer itself and imperatively define the complex interactions of state, particularly across thread and network boundaries. Not having to think like the computer is a useful trait when it comes to asynchrony and event-driven systems, because concurrency and parallelism are involved, and these are very challenging characteristics to use correctly and efficiently. Within the Java community, the books Java Concurrency in Practice by Brian Goetz and Concurrent Programming in Java by Doug Lea (Addison-Wesley), and forums such as “Mechanical Sympathy” are representative of the depth, breadth, and complexity of mastering concurrency. My interactions with experts from these books, forums, and communities since I started using RxJava has convinced me even more than before of how difficult it really is to write high-performance, efficient, scalable, and correct concurrent software. And we haven’t even brought in distributed systems, which take concurrency and parallelism to another level.
So, the short answer to what reactive-functional programming is solving is concurrency and parallelism. More colloquially, it is solving callback hell, which results from addressing reactive and asynchronous use cases in an imperative way. Reactive programming such as that implemented by RxJava is influenced by functional programming and uses a declarative approach to avoiding the typical pitfalls of reactive-imperative code.
When You Need Reactive Programming
Reactive programming is useful in scenarios such as the following:
-
Processing user events such as mouse movement and clicks, keyboard typing, GPS signals changing over time as users move with their device, device gyroscope signals, touch events, and so on.
-
Responding to and processing any and all latency-bound IO events from disk or network, given that IO is inherently asynchronous (a request is made, time passes, a response might or might not be received, which then triggers further work).
-
Handling events or data pushed at an application by a producer it cannot control (system events from a server, the aforementioned user events, signals from hardware, events triggered by the analog world from sensors, and so on).
Now, if the code in question is handling only one event stream, reactive-imperative programming with a callback is going to be fine, and bringing in reactive-functional programming is not going to give you much benefit. You can have hundreds of different event streams, and if they are all completely independent of one another, imperative programming is not likely to be a problem. In such straightforward use cases, imperative approaches are going to be the most efficient because they eliminate the abstraction layer of reactive programming and stay closer to that for which current operating systems, languages, and compilers are optimized.
If your program is like most though, you need to combine events (or asynchronous responses from functions or network calls), have conditional logic interacting between them, and must handle failure scenarios and resource cleanup on any and all of them. This is where the reactive-imperative approach begins to dramatically increase in complexity and reactive-functional programming begins to shine. A non-scientific view I have come to accept is that reactive-functional programming has an initially higher learning curve and barrier to entry but that the ceiling for complexity is far lower than with reactive-imperative programming.
Hence this is where the tagline for Reactive Extensions (Rx) in general and RxJava specifically comes from, “a library for composing asynchronous and event-based programs.” RxJava is a concrete implementation of reactive programming principles influenced by functional and data-flow programming. There are different approaches to being “reactive,” and RxJava is but one of them. Let’s dig into how it works.
How RxJava Works
Central to RxJava is the Observable
type that represents a stream of data or events. It is intended for push (reactive) but can also be used for pull (interactive). It is lazy rather than eager. It can be used asynchronously or synchronously. It can represent 0, 1, many, or infinite values or events over time.
That’s a lot of buzzwords and details, so let’s unpack it. You’ll get the full details in “Anatomy of rx.Observable”.
Push versus Pull
The entire point of RxJava being reactive is to support push, so the Observable
and related Observer
type signatures support events being pushed at it. This in turn generally is accompanied by asynchrony, which is discussed in the next section. But the Observable
type also supports an asynchronous feedback channel (also sometimes referred to as async-pull or reactive-pull), as an approach to flow control or backpressure in async systems. A later section in this chapter will address flow control and how this mechanism fits in.
To support receiving events via push, an Observable
/Observer
pair connect via subscription. The Observable
represents the stream of data and can be subscribed to by an Observer
(which you’ll learn more about in “Capturing All Notifications by Using Observer<T>”):
interface
Observable
<
T
>
{
Subscription
subscribe
(
Observer
s
)
}
Upon subscription, the Observer
can have three types of events pushed to it:
-
Data via the
onNext()
function -
Errors (exceptions or throwables) via the
onError()
function -
Stream completion via the
onCompleted()
function
interface
Observer
<
T
>
{
void
onNext
(
T
t
)
void
onError
(
Throwable
t
)
void
onCompleted
()
}
The onNext()
method might never be called or might be called once, many, or infinite times. The onError()
and onCompleted()
are terminal events, meaning that only one of them can be called and only once. When a terminal event is called, the Observable
stream is finished and no further events can be sent over it. Terminal events might never occur if the stream is infinite and does not fail.
As will be shown in “Flow Control” and “Backpressure”, there is an additional type of signature to permit interactive pull:
interface
Producer
{
void
request
(
long
n
)
}
This is used with a more advanced Observer
called Subscriber
(with more details given in “Controlling Listeners by Using Subscription and Subscriber<T>”):
interface
Subscriber
<
T
>
implements
Observer
<
T
>,
Subscription
{
void
onNext
(
T
t
)
void
onError
(
Throwable
t
)
void
onCompleted
()
...
void
unsubscribe
()
void
setProducer
(
Producer
p
)
}
The unsubcribe
function as part of the Subscription
interface is used to allow a subscriber to unsubscribe from an Observable
stream. The setProducer
function and Producer
types are used to form a bidirectional communication channel between the producer and consumer used for flow control.
Async versus Sync
Generally, an Observable
is going to be asynchronous, but it doesn’t need to be. An Observable
can be synchronous, and in fact defaults to being synchronous. RxJava never adds concurrency unless it is asked to do so. A synchronous Observable
would be subscribed to, emit all data using the subscriber’s thread, and complete (if finite). An Observable
backed by blocking network I/O would synchronously block the subscribing thread and then emit via onNext()
when the blocking network I/O returned.
For example, the following is completely synchronous:
Observable
.
create
(
s
->
{
s
.
onNext
(
"Hello World!"
);
s
.
onCompleted
();
}).
subscribe
(
hello
->
System
.
out
.
println
(
hello
));
You will learn more about Observable.create
in “Mastering Observable.create()” and Observable.subscribe
in “Subscribing to Notifications from Observable”.
Now, as you are probably thinking, this is generally not the desired behavior of a reactive system, and you are right. It is bad form to use an Observable
with synchronous blocking I/O (if blocking I/O needs to be used, it needs to be made asynchronous with threads). However, sometimes it is appropriate to synchronously fetch data from an in-memory cache and return it immediately. The “Hello World” case shown in the previous example does not need concurrency, and in fact will be far slower if asynchronous scheduling is added to it. Thus, the actual criteria that is generally important is whether the Observable
event production is blocking or nonblocking, not whether it is synchronous or asynchronous. The “Hello World” example is nonblocking because it never blocks a thread, thus it is correct (though superfluous) use of an Observable
.
The RxJava Observable
is purposefully agnostic with regard to async versus sync, and whether concurrency exists or where it comes from. This is by design and allows the implementation of the Observable
to decide what is best. Why might this be useful?
First of all, concurrency can come from multiple places, not just threadpools. If the data source is already async because it is on an event loop, RxJava should not add more scheduling overhead or force a particular scheduling implementation. Concurrency can come from threadpools, event loops, actors, and so on. It can be added, or it can originate from the data source. RxJava is agnostic with respect to where the asynchrony originates.
Second, there are two good reasons to use synchronous behavior, which we’ll look at in the following subsections.
In-memory data
If data exists in a local in-memory cache (with constant microsecond/nanosecond lookup times), it does not make sense to pay the scheduling cost to make it asynchronous. The Observable
can just fetch the data synchronously and emit it on the subscribing thread, as shown here:
Observable
.
create
(
s
->
{
s
.
onNext
(
cache
.
get
(
SOME_KEY
));
s
.
onCompleted
();
}).
subscribe
(
value
->
System
.
out
.
println
(
value
));
This scheduling choice is powerful when the data might or might not be in memory. If it is in memory, emit it synchronously; if it’s not, perform the network call asynchronously and return the data when it arrives. This choice can reside conditionally within the Observable
:
// pseudo-code
Observable
.
create
(
s
->
{
T
fromCache
=
getFromCache
(
SOME_KEY
);
if
(
fromCache
!=
null
)
{
// emit synchronously
s
.
onNext
(
fromCache
);
s
.
onCompleted
();
}
else
{
// fetch asynchronously
getDataAsynchronously
(
SOME_KEY
)
.
onResponse
(
v
->
{
putInCache
(
SOME_KEY
,
v
);
s
.
onNext
(
v
);
s
.
onCompleted
();
})
.
onFailure
(
exception
->
{
s
.
onError
(
exception
);
});
}
}).
subscribe
(
s
->
System
.
out
.
println
(
s
));
Synchronous computation (such as operators)
The more common reason for remaining synchronous is stream composition and transformation via operators. RxJava mostly uses the large API of operators used to manipulate, combine, and transform data, such as map()
, filter()
, take()
, flatMap()
, and groupBy()
. Most of these operators are synchronous, meaning that they perform their computation synchronously inside the onNext()
as the events pass by.
These operators are synchronous for performance reasons. Take this as an example:
Observable
<
Integer
>
o
=
Observable
.
create
(
s
->
{
s
.
onNext
(
1
);
s
.
onNext
(
2
);
s
.
onNext
(
3
);
s
.
onCompleted
();
});
o
.
map
(
i
->
"Number "
+
i
)
.
subscribe
(
s
->
System
.
out
.
println
(
s
));
If the map
operator defaulted to being asynchronous, each number (1, 2, 3) would be scheduled onto a thread where the string concatenation would be performed (“Number " + i). This is very inefficient and generally has nondeterministic latency due to scheduling, context switching, and so on.
The important thing to understand here is that most Observable
function pipelines are synchronous (unless a specific operator needs to be async, such as timeout
or observeOn
), whereas the Observable
itself can be async. These topics receive more in-depth treatment in “Declarative Concurrency with observeOn()” and “Timing Out When Events Do Not Occur”.
The following example demonstrates this mixture of sync and async:
Observable
.
create
(
s
->
{
...
async
subscription
and
data
emission
...
})
.
doOnNext
(
i
->
System
.
out
.
println
(
Thread
.
currentThread
()))
.
filter
(
i
->
i
%
2
==
0
)
.
map
(
i
->
"Value "
+
i
+
" processed on "
+
Thread
.
currentThread
())
.
subscribe
(
s
->
System
.
out
.
println
(
"SOME VALUE =>"
+
s
));
System
.
out
.
println
(
"Will print BEFORE values are emitted"
)
In this example, the Observable
is async (it emits on a thread different from that of the subscriber), so subscribe
is nonblocking, and the println
at the end will output before events are propagated and “SOME VALUE ⇒” output is shown.
However, the filter()
and map()
functions are synchronously executed on the calling thread that emits the events. This is generally the behavior we want: an asynchronous pipeline (the Observable
and composed operators) with efficient synchronous computation of the events.
Thus, the Observable
type itself supports both sync and async concrete implementations, and this is by design.
Concurrency and Parallelism
Individual Observable
streams permit neither concurrency nor parallelism. Instead, they are achieved via composition of async Observables
.
Parallelism is simultaneous execution of tasks, typically on different CPUs or machines. Concurrency, on the other hand, is the composition or interleaving of multiple tasks. If a single CPU has multiple tasks (such as threads) on it, they are executing concurrently but not in parallel by “time slicing.” Each thread gets a portion of CPU time before yielding to another thread, even if a thread has not yet finished.
Parallel execution is concurrent by definition, but concurrency is not necessarily parallelism. In practice, this means being multithreaded is concurrency, but parallelism only occurs if those threads are being scheduled and executed on different CPUs at the exact same time. Thus, generically we speak about concurrency and being concurrent, but parallelism is a specific form of concurrency.
The contract of an RxJava Observable
is that events (onNext()
, onCompleted()
, onError()
) can never be emitted concurrently. In other words, a single Observable
stream must always be serialized and thread-safe. Each event can be emitted from a different thread, as long as the emissions are not concurrent. This means no interleaving or simultaneous execution of onNext()
. If onNext()
is still being executed on one thread, another thread cannot begin invoking it again (interleaving).
Here’s an example of what’s okay:
Observable
.
create
(
s
->
{
new
Thread
(()
->
{
s
.
onNext
(
"one"
);
s
.
onNext
(
"two"
);
s
.
onNext
(
"three"
);
s
.
onNext
(
"four"
);
s
.
onCompleted
();
}).
start
();
});
This code emits data sequentially, so it complies with the contract. (Note, however, that it is generally advised to not start a thread like that inside an Observable
. Use schedulers, instead, as discussed in “Multithreading in RxJava”.)
Here’s an example of code that is illegal:
// DO NOT DO THIS
Observable
.
create
(
s
->
{
// Thread A
new
Thread
(()
->
{
s
.
onNext
(
"one"
);
s
.
onNext
(
"two"
);
}).
start
();
// Thread B
new
Thread
(()
->
{
s
.
onNext
(
"three"
);
s
.
onNext
(
"four"
);
}).
start
();
// ignoring need to emit s.onCompleted() due to race of threads
});
// DO NOT DO THIS
This code is illegal because it has two threads that can both invoke onNext()
concurrently. This breaks the contract. (Also, it would need to safely wait for both threads to complete to call onComplete
, and as mentioned earlier, it is generally a bad idea to manually start threads like this.)
So, how do you take advantage of concurrency and/or parallelism with RxJava? Composition.
A single Observable
stream is always serialized, but each Observable
stream can operate independently of one another, and thus concurrently and/or in parallel. This is why merge
and flatMap
end up being so commonly used in RxJava—to compose asynchronous streams together concurrently. (You can learn more about the details of merge
and flatMap
in “Wrapping Up Using flatMap()” and “Treating Several Observables as One Using merge()”.)
Here is a contrived example showing the mechanics of two asynchronous Observables
running on separate threads and merged together:
Observable
<
String
>
a
=
Observable
.
create
(
s
->
{
new
Thread
(()
->
{
s
.
onNext
(
"one"
);
s
.
onNext
(
"two"
);
s
.
onCompleted
();
}).
start
();
});
Observable
<
String
>
b
=
Observable
.
create
(
s
->
{
new
Thread
(()
->
{
s
.
onNext
(
"three"
);
s
.
onNext
(
"four"
);
s
.
onCompleted
();
}).
start
();
});
// this subscribes to a and b concurrently,
// and merges into a third sequential stream
Observable
<
String
>
c
=
Observable
.
merge
(
a
,
b
);
Observable c
will receive items from both a
and b
, and due to their asynchrony, three things occur:
-
“one” will appear before “two”
-
“three” will appear before “four”
-
The order between one/two and three/four is unspecified
So why not just allow onNext()
to be invoked concurrently?
Primarily because onNext()
is meant for us humans to use, and concurrency is difficult. If onNext()
could be invoked concurrently, it would mean that every Observer
would need to code defensively for concurrent invocation, even when not expected or wanted.
A second reason is because some operations just aren’t possible with concurrent emission; for example, scan
and reduce
, which are common and important behaviors. Operators such as scan
and reduce
require sequential event propagation so that state can be accumulated on streams of events that are not both associative and commutative. Allowing concurrent Observable
streams (with concurrent onNext()
) would limit the types of events that can be processed and require thread-safe data structures.
Note
The Java 8 Stream
type supports concurrent emission. This is why java.util.stream.Stream
requires reduce
functions to be associative, because they must support concurrent invocation on parallel streams. The documentation of the java.util.stream
package about parallelism, ordering (related to commutativity), reduction operations, and associativity further illustrates the complexity of the same Stream
type permitting both sequential and concurrent emission.
A third reason is that performance is affected by synchronization overhead because all observers and operators would need to be thread-safe, even if most of the time data arrives sequentially. Despite the JVM often being good at eliminating synchronization overhead, it is not always possible (particularly with nonblocking algorithms using atomics) so this ends up being a performance tax not needed on sequential streams.
Additionally, it is often slower to do generic fine-grained parallelism. Parallelism typically needs to be done coarsely, such as in batches of work, to make up for the overhead of switching threads, scheduling work, and recombining. It is far more efficient to synchronously execute on a single thread and take advantage of the many memory and CPU optimizations for sequential computation. On a List
or array
, it is quite easy to have reasonable defaults for batched parallelism, because all the items are known upfront and can be split into batches (though even then it is often faster to just process the full list on a single CPU unless the list is very large, or the compute per item is significant). A stream, however, does not know the work ahead of time, it just receives data via onNext()
and therefore cannot automatically chunk the work.
In fact, prior to RxJava v1, a .parallel(Function f)
operator was added to try to behave like java.util.stream.Stream.parallel()
because that was considered a nice convenience. It was done in a way to not break the RxJava contract by splitting a single Observable
into many Observable
s that each executed in parallel, and then merging them back together. However, it ended up being removed from the library prior to v1 because it was very confusing and almost always resulted in worse performance. Adding computational parallelism to a stream of events almost always needs to be reasoned about and tested. Perhaps a ParallelObservable
could make sense, for which the operators are restricted to a subset that assume associativity, but in the years of RxJava being used, it has never ended up being worth the effort, because composition with merge
and flatMap
are effective building blocks to address the use cases.
Chapter 3 will teach how to use operators to compose Observables
to benefit from concurrency and parallelism.
Lazy versus Eager
The Observable
type is lazy, meaning it does nothing until it is subscribed to. This differs from an eager type such as a Future
, which when created represents active work. Lazyiness allows composing Observables
together without data loss due to race conditions without caching. In a Future
, this isn’t a concern, because the single value can be cached, so if the value is delivered before composition, the value will be fetched. With an unbounded stream, an unbounded buffer would be required to provide this same guarantee. Thus, the Observable
is lazy and will not start until subscribed to so that all composition can be done before data starts flowing.
In practice, this means two things:
- Subscription, not construction starts work
-
Due to the laziness of an
Observable
, creating one does not actually cause any work to happen (ignoring the “work” of allocating theObservable
object itself). All it does is define what work should be done when it is eventually subscribed to. Consider anObservable
defined like this:Observable
<
T
>
someData
=
Observable
.
create
(
s
->
{
getDataFromServerWithCallback
(
args
,
data
->
{
s
.
onNext
(
data
);
s
.
onCompleted
();
});
})
The
someData
reference now exists, butgetDataFromServerWithCallback
is not yet being executed. All that has happened is that theObservable
wrapper has been declared around a unit of work to be performed, the function that lives inside theObservable
.Subscribing to the
Observable
causes the work to be done:someData
.
subscribe
(
s
->
System
.
out
.
println
(
s
));
This lazily executes the work represented by the
Observable
. - Observables can be reused
-
Because the
Observable
is lazy, it also means a particular instance can be invoked more than once. Continuing with the previous example this means we can do the following:someData
.
subscribe
(
s
->
System
.
out
.
println
(
"Subscriber 1: "
+
s
));
someData
.
subscribe
(
s
->
System
.
out
.
println
(
"Subscriber 2: "
+
s
));
Now there will be two separate subscriptions, each calling
getDataFromServerWithCallback
and emitting events.This laziness differs from async types such as
Future
where theFuture
is created to represent work already started. AFuture
cannot be reused (subscribed to multiple times to trigger work). If a reference to aFuture
exists, it means work is already happening. You can see in the preceding sample code exactly where the eagerness is; thegetDataFromServerWithCallback
method is eager because it immediately executes when invoked. Wrapping anObservable
aroundgetDataFromServerWithCallback
allows it to be used lazily.
This laziness is powerful when doing composition. For example:
someData
.
onErrorResumeNext
(
lazyFallback
)
.
subscribe
(
s
->
System
.
out
.
println
(
s
));
In this case, lazyFallback
Observable
represents work that can be done, but will only be done if something subscribes to it, and that we only want subscribed to if someData
fails. Of course, eager types can be made lazy by using function calls (such as getDataAsFutureA()
).
Eagerness and laziness each have their strengths and weaknesses, but RxJava Observable
is lazy. Therefore, if you have an Observable
it won’t do anything until you subscribe to it.
This topic is discussed in greater detail in “Embracing Laziness”.
Duality
An Rx Observable
is the async “dual” of an Iterable
. By “dual,” we mean the Observable
provides all the functionality of an Iterable
except in the reverse flow of data: it is push instead of pull. The table that follows shows types that serve both push and pull functionality:
Pull (Iterable) | Push (Observable) |
---|---|
T next() |
onNext(T) |
throws Exception |
onError(Throwable) |
returns |
onCompleted() |
As per the table, instead of data being pulled out via next()
by the consumer, it is pushed to onNext(T)
by the producer. Successful termination is signaled via the onCompleted()
callback rather than blocking the thread until all items have been iterated. In place of exceptions being thrown up the callstack, errors are emitted as events to the onError(Throwable)
callback.
The fact that it behaves as a dual effectively means anything you can do synchronously via pull with an Iterable
and Iterator
can be done asynchronously via push with an Observable
and Observer
. This means that the same programming model can be applied to both!
For example, as of Java 8 an Iterable
can be upgraded to have function composition via the java.util.stream.Stream
type to work like this:
// Iterable<String> as Stream<String>
// that contains 75 strings
getDataFromLocalMemorySynchronously
()
.
skip
(
10
)
.
limit
(
5
)
.
map
(
s
->
s
+
"_transformed"
)
.
forEach
(
System
.
out
::
println
)
This will retrieve 75 strings from getDataFromLocalMemorySynchronously()
, get items 11–15 and ignore the rest, transform the strings, and print them out. (Learn more about operators such as take
, skip
, and limit
in “Slicing and Dicing Using skip(), takeWhile(), and Others”.)
An RxJava Observable
is used the same way:
// Observable<String>
// that emits 75 strings
getDataFromNetworkAsynchronously
()
.
skip
(
10
)
.
take
(
5
)
.
map
(
s
->
s
+
"_transformed"
)
.
subscribe
(
System
.
out
::
println
)
This will receive 5 strings (15 were emitted but the first 10 were dropped), and then unsubscribe (ignoring or stopping the rest of the strings that were to be emitted). It transforms and prints the strings just like the previous Iterable
/Stream
example.
In other words, the Rx Observable
allows programming with async data via push just like Streams
around Iterables
and Lists
using synchronous pull.
Cardinality
The Observable
supports asynchronously pushing multiple values. This nicely fits into the lower right of the following table, the async dual of Iterable
(or Stream
, List
, Enumerable
, etc.) and multivalued version of a Future
:
One | Many | |
---|---|---|
Synchronous |
T getData() |
Iterable<T> getData() |
Asynchronous |
Future<T> getData() |
Observable<T> getData() |
Note that this section refers to Future
generically. It uses Future.onSuccess(callback)
syntax to represent its behavior. Different implementations exist, such as CompletableFuture
, ListenableFuture
, or the Scala Future
. But whatever you do, don’t use java.util.Future
, which requires blocking to retrieve a value.
So, why might an Observable
be valuable instead of just Future
? The most obvious reason is that you are dealing with either an event stream or a multivalued response. The less obvious reason is composition of multiple single-valued responses. Let’s look at each of these.
Event stream
Event stream is straightforward. Over time the producer pushes events at the consumer, as demonstrated here:
// producer
Observable
<
Event
>
mouseEvents
=
...;
// consumer
mouseEvents
.
subscribe
(
e
->
doSomethingWithEvent
(
e
));
This doesn’t work very well with a Future
:
// producer
Future
<
Event
>
mouseEvents
=
...;
// consumer
mouseEvents
.
onSuccess
(
e
->
doSomethingWithEvent
(
e
));
The onSuccess
callback could have received the “last event,” but some questions remain: Does the consumer now need to poll? Will the producer enqueue them? Or will they be lost in between each fetch? The Observable
is definitely beneficial here. In the absence of Observable
, a callback approach would be better than modeling this with a Future
.
Multiple values
Multivalued responses are the next use of Observable
. Basically, anywhere that a List
, Iterable
, or Stream
would be used, Observable
can be used instead:
// producer
Observable
<
Friend
>
friends
=
...
// consumer
friends
.
subscribe
(
friend
->
sayHello
(
friend
));
Now, this can work with a Future
, like this:
// producer
Future
<
List
<
Friend
>>
friends
=
...
// consumer
friends
.
onSuccess
(
listOfFriends
->
{
listOfFriends
.
forEach
(
friend
->
sayHello
(
friend
));
});
So why use the Observable<Friend>
approach?
If the list of data to return is small, it probably doesn’t matter for performance and it becomes a subjective choice. If the list is large, though, or the remote data source must fetch different portions of the list from different locations, using the Observable<Friend>
approach can be a performance or latency benefit.
The most compelling reason is that items can be processed as received rather than waiting for the entire collection to arrive. This is particularly true when different network latencies on the backend can affect each item differently, which is actually fairly common due to long-tail latencies (such as in service-oriented or microservice architectures) and shared data stores. If waiting for the entire collection, the consumer will always experience the maximum latency of the aggregate work done for the collection. If items are returned as an Observable
stream, the consumer receives them immediately and “time to first item” can be significantly lower than the last and slowest item. To make this work, ordering of the stream must be sacrified so that the items can be emitted in whatever order the server gets them. If order is eventually important to the consumer, a ranking or position can be included in the item data or metadata, and the client can then sort or position the items as needed.
Additionally, it keeps memory usage limited to that needed per item rather than needing to allocate and collect memory for the entire collection.
Composition
A multivalued Observable
type is also useful when composing single-valued responses, such as from Futures
.
When merging together multiple Futures
, they emit another Future
with a single value, such as this:
CompletableFuture
<
String
>
f1
=
getDataAsFuture
(
1
);
CompletableFuture
<
String
>
f2
=
getDataAsFuture
(
2
);
CompletableFuture
<
String
>
f3
=
f1
.
thenCombine
(
f2
,
(
x
,
y
)
->
{
return
x
+
y
;
});
That might be exactly what is wanted, and is actually available in RxJava via Observable.zip
(which you’ll learn more about in “Pairwise Composing Using zip() and zipWith()”):
Observable
<
String
>
o1
=
getDataAsObservable
(
1
);
Observable
<
String
>
o2
=
getDataAsObservable
(
2
);
Observable
<
String
>
o3
=
Observable
.
zip
(
o1
,
o2
,
(
x
,
y
)
->
{
return
x
+
y
;
});
However, it means waiting until all Futures
are completed before emitting anything. Oftentimes, it is preferable to emit each returned Future
value as it completes. In this case, Observable.merge
(or the related flatMap
) is preferable. It allows composing the results (even if each is just an Observable
emitting one value) into a stream of values that are each emitted as soon as they are ready:
Observable
<
String
>
o1
=
getDataAsObservable
(
1
);
Observable
<
String
>
o2
=
getDataAsObservable
(
2
);
// o3 is now a stream of o1 and o2 that emits each item without waiting
Observable
<
String
>
o3
=
Observable
.
merge
(
o1
,
o2
);
Single
Now, despite Rx Observable
being great at handling multivalued streams, the simplicity of a single-valued representation is very nice for API design and consumption. Additionally, basic request/response behavior is extremely common in applications. For this reason, RxJava provides a Single
type, which is a lazy equivalent to a Future
. Think of it as a Future
with two benefits: first, it is lazy, so it can be subscribed to multiple times and easily composed, and second, it fits the RxJava API, so it can easily interact with an Observable
.
For example, consider these accessors:
public
static
Single
<
String
>
getDataA
()
{
return
Single
.<
String
>
create
(
o
->
{
o
.
onSuccess
(
"DataA"
);
}).
subscribeOn
(
Schedulers
.
io
());
}
public
static
Single
<
String
>
getDataB
()
{
return
Single
.
just
(
"DataB"
)
.
subscribeOn
(
Schedulers
.
io
());
}
These can then be used and optionally composed like this:
// merge a & b into an Observable stream of 2 values
Observable
<
String
>
a_merge_b
=
getDataA
().
mergeWith
(
getDataB
());
Note how two Single
s are merged into an Observable
. This could result in an emission of [A, B] or [B, A], depending on which completes first.
Going back to the previous example, we can now use Single
instead of Observable
to represent the data fetches, but merge them into a stream of values:
// Observable<String> o1 = getDataAsObservable(1);
// Observable<String> o2 = getDataAsObservable(2);
Single
<
String
>
s1
=
getDataAsSingle
(
1
);
Single
<
String
>
s2
=
getDataAsSingle
(
2
);
// o3 is now a stream of s1 and s2 that emits each item without waiting
Observable
<
String
>
o3
=
Single
.
merge
(
s1
,
s2
);
Using Single
instead of Observable
to represent a “stream of one” simplifies consumption because a developer must consider only the following behaviors for the Single
type:
-
It can respond with an error
-
Never respond
-
Respond with a success
Compare this with the additional states a consumer must consider with an Observable
:
-
It can respond with an error
-
Never respond
-
Respond successfully with no data and terminate
-
Respond successfully with a single value and terminate
-
Respond successfully with multiple values and terminate
-
Respond successfully with one or more values and never terminate (waiting for more data)
By using Single
, the mental model is simpler for consuming the API, and only after composition into an Observable
happens must a developer consider the additional states. This is often a better place for it to occur because typically the developer controls that code, whereas the data API is often from a third party.
You’ll learn more about Single
in “Observable versus Single”.
Completable
In addition to Single
, RxJava also has a Completable
type that addresses the surprisingly common use case of having no return type, just the need to represent successful or failed completion. Often Observable<Void>
or Single<Void>
ends up being used. This is awkward, so Completable
came to be, as demonstrated here:
Completable
c
=
writeToDatabase
(
"data"
);
This use case is common when doing asynchronous writes for which no return value is expected but notification of successful or failed completion is needed. The preceding code with Completable
is similar to this:
Observable
<
Void
>
c
=
writeToDatabase
(
"data"
);
The Completable
itself is an abstraction for two callbacks, completion and failure, like this:
static
Completable
writeToDatabase
(
Object
data
)
{
return
Completable
.
create
(
s
->
{
doAsyncWrite
(
data
,
// callback for successful completion
()
->
s
.
onCompleted
(),
// callback for failure with Throwable
error
->
s
.
onError
(
error
));
});
}
Zero to infinity
Observable
can support cardinalities from zero to infinity (which is explored more in “Infinite Streams”). But for simplicity and clarity, Single
is an "Observable
of One,” and Completable
is an "Observable
of None.”
With these newly introduced types, our table ends up looking like this:
Zero | One | Many | |
---|---|---|---|
Synchronous |
void doSomething() |
T getData() |
Iterable<T> getData() |
Asynchronous |
Completable doSomething() |
Single<T> getData() |
Observable<T> getData() |
Mechanical Sympathy: Blocking versus Nonblocking I/O
Thus far, the argument for the reactive-functional style of programming has primarily been about providing an abstraction over async callbacks to allow more manageable composition. And, it is fairly obvious that performing unrelated network requests concurrently rather than sequentially is beneficial to experienced latency, thus the reason for adopting asynchrony and needing composition.
But is there an efficiency reason for adopting the reactive approach (either imperative or functional) in how we perform I/O? Are there benefits to using nonblocking I/O, or is blocking I/O threads to wait on a single network request okay? Performance testing I was involved in at Netflix demonstrated that there are objective and measurable efficiency benefits to adopting nonblocking I/O and event loops over thread-per-request blocking I/O. This section provides reasons why this is the case as well as the data to help you make your own decision.
As referenced in “The Pursuit of Answers”, tests were done to compare performance of blocking and nonblocking I/O with Tomcat and Netty on Linux. Because this type of testing is always controversial and difficult to get right, I’ll be very clear that this test is only intended to be relevant for the following:
-
Behavior on typical Linux systems being used around 2015/2016
-
Java 8 (OpenJDK and Oracle)
-
Unmodified Tomcat and Netty as used in typical production environments
-
Representative web service request/response workload involving composition of multiple other web services
Considering that context, we learned the following:
-
Netty code is more efficient than Tomcat code, allowing it to consume less CPU per request.
-
The Netty event-loop architecture reduces thread migrations under load, which improves CPU cache warmth and memory locality, which improves CPU Instructions-per-Cycle (IPC), which lowers CPU cycle consumption per request.
-
Tomcat code has higher latencies under load due to its thread pool architecture, which involves thread pool locks (and lock contention) and thread migrations to service load.
The following graph best illustrates the difference between the architectures:
Note how the lines diverge as load increases. These are the thread migrations. The most interesting thing I learned was that the Netty application actually becomes more efficient as it is put under load and the threads become “hot” and stick to a CPU core. Tomcat, on the other hand, has a separate thread per request and thus cannot gain this benefit and retains higher thread migrations due to each thread needing to be scheduled for every request.
Netty CPU consumption remains mostly flat through increasing load and actually becomes slightly more efficient as the load is maxed out, as opposed to Tomcat, which becomes less efficient.
The resulting impact on latency and throughput is seen in the following graph:
Despite averages not being very valuable (as opposed to percentiles), this graph shows how both have similar latency with little load, but diverge significantly as load increases. Netty is able to better utilize the machine until higher load with less impact on latency:
This graph of maximum latency was chosen to show how the outliers affect users and system resources. Netty handles load far more gracefully and avoids the worst-case outliers.
The following image shows throughput:
Two strong benefits come out of these findings. First, better latency and throughput means both better user experience and lower infrastructure cost. Second, though, the event-loop architecture is more resilient under load. Instead of falling apart when the load is increased, the machine can be pushed to its limit and handles it gracefully. This is a very compelling argument for large-scale production systems that need to handle unexpected spikes of traffic and remain responsive.
I also found the event-loop architecture easier to operate. It does not1 require tuning to get optimal performance, whereas the thread-per-request architecture often needs tweaking of thread pool sizes (and subsequently garbage collection) depending on workload.
This is not intended to be an exhaustive study of the topic, but I found this experiment and resulting data as compelling evidence for pursuing the “reactive” architecture in the form of nonblocking IO and event loops. In other words, with hardware, the Linux kernel, and JVM circa 2015/2016, nonblocking I/O via event loops does have benefits.
Using Netty with RxJava will be further explored later in “Nonblocking HTTP Server with Netty and RxNetty”.
Reactive Abstraction
Ultimately RxJava types and operators are just an abstraction over imperative callbacks. However, this abstraction completely changes the coding style and provides very powerful tools for doing async and nonblocking programming. It takes effort to learn and requires a shift of thinking to be comfortable with function composition and thinking in streams, but when you’ve achieved this it is a very effective tool alongside our typical object-oriented and imperative programming styles.
The rest of this book takes you through the many details of how RxJava works and how to use it. Chapter 2 explains where Observable
s come from and how you can consume them. Chapter 3 will guide you through several dozen declarative and powerful transformations.
1 Beyond perhaps debating when the number of event loops is sized at 1x, 1.5x, or 2x the number of cores. I have not found strong differences between these values, though, and generally default to 1x.
Get Reactive Programming with RxJava now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.