Kapitel 1. Reaktive Programmierung mit RxJava
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
RxJava ist eine spezielle Implementierung der reaktiven Programmierung für Java und Android, die von der funktionalen Programmierung beeinflusst ist. Sie bevorzugt die Komposition von Funktionen, die Vermeidung von globalen Zuständen und Seiteneffekten und das Denken in Streams, um asynchrone und ereignisbasierte Programme zu komponieren. Sie beginnt mit dem Beobachtermuster von Producer/Consumer-Callbacks und erweitert es um Dutzende von Operatoren, die Komposition, Transformation, Scheduling, Throttling, Fehlerbehandlung und Lifecycle-Management ermöglichen.
RxJava ist eine ausgereifte Open-Source-Bibliothek, die sich sowohl auf dem Server als auch auf Android-Mobilgeräten durchgesetzt hat. Zusammen mit der Bibliothek hat sich eine aktive Gemeinschaft von Entwicklern um RxJava und die reaktive Programmierung gebildet, die zu dem Projekt beitragen, sprechen, schreiben und sich gegenseitig helfen.
In diesem Kapitel erhältst du einen Überblick über RxJava - was es ist und wie es funktioniert - und im Rest des Buches erfährst du alle Details, wie du es in deinen Anwendungen einsetzen kannst. Du kannst dieses Buch auch ohne vorherige Erfahrung mit reaktiver Programmierung lesen, aber wir fangen am Anfang an und führen dich durch die Konzepte und Praktiken von RxJava, damit du seine Stärken auf deine Anwendungsfälle anwenden kannst.
Reaktive Programmierung und RxJava
Reaktive Programmierung ist ein allgemeiner Programmierbegriff, der sich darauf konzentriert, auf Veränderungen zu reagieren, z. B. auf Datenwerte oder Ereignisse. Sie kann und wird oft zwingend durchgeführt. Ein Callback ist ein Ansatz zur reaktiven Programmierung, der zwingend erforderlich ist. Eine Tabellenkalkulation ist ein gutes Beispiel für reaktive Programmierung: Zellen, die von anderen Zellen abhängig sind, "reagieren" automatisch, wenn sich diese anderen Zellen ändern.
Bei den heutigen Computern ist alles irgendwann zwingend erforderlich, wenn es auf das Betriebssystem und die Hardware trifft. Dem Computer muss explizit gesagt werden, was er tun soll und wie er es tun soll. Menschen denken nicht wie CPUs und ähnliche Systeme, also fügen wir Abstraktionen hinzu. Reaktiv-funktionale Programmierung ist eine Abstraktion, genauso wie unsere höheren imperativen Programmiersprachen Abstraktionen für die zugrundeliegenden Binär- und Assemblerbefehle sind. Die Tatsache, dass am Ende alles imperativ ist, ist wichtig, um sich daran zu erinnern und zu verstehen, denn sie hilft uns dabei, das mentale Modell zu verstehen, das die reaktiv-funktionale Programmierung anspricht und wie sie letztendlich ausgeführt wird - es gibt keine Magie.
Reaktiv-funktionale Programmierung ist also ein Programmieransatz - eine Abstraktion über imperativen Systemen -, der es uns ermöglicht, asynchrone und ereignisgesteuerte Anwendungsfälle zu programmieren, ohne wie ein Computer zu denken und die komplexen Interaktionen des Zustands zwingend zu definieren, insbesondere über Thread- und Netzwerkgrenzen hinweg. Nicht wie ein Computer denken zu müssen, ist eine nützliche Eigenschaft, wenn es um asynchrone und ereignisgesteuerte Systeme geht, denn es geht um Gleichzeitigkeit und Parallelität, und diese Eigenschaften sind sehr schwierig richtig und effizient zu nutzen. In der Java-Community sind die Bücher Java Concurrency in Practice von Brian Goetz und Concurrent Programming in Java von Doug Lea (Addison-Wesley) sowie Foren wie "Mechanical Sympathy" repräsentativ für die Tiefe, Breite und Komplexität der Beherrschung der Gleichzeitigkeit. Seit ich mit RxJava arbeite, haben mich die Gespräche mit den Experten aus diesen Büchern, Foren und Communities noch mehr als zuvor davon überzeugt, wie schwierig es wirklich ist, leistungsstarke, effiziente, skalierbare und korrekte nebenläufige Software zu schreiben. Und dabei haben wir noch nicht einmal die verteilten Systeme berücksichtigt, die die Gleichzeitigkeit und Parallelität auf eine neue Ebene heben.
Die kurze Antwort auf die Frage, was die reaktiv-funktionale Programmierung löst, lautet also: Gleichzeitigkeit und Parallelität. Umgangssprachlich ausgedrückt, löst sie die Callback-Hölle, die entsteht, wenn reaktive und asynchrone Anwendungsfälle auf imperative Weise angegangen werden. Die reaktive Programmierung, wie sie von RxJava umgesetzt wird, ist von der funktionalen Programmierung beeinflusst und verwendet einen deklarativen Ansatz, um die typischen Fallstricke von reaktiv-imperativem Code zu vermeiden.
Wenn du reaktive Programmierung brauchst
Reaktive Programmierung ist in Szenarien wie den folgenden nützlich:
-
Verarbeitung von Benutzerereignissen wie Mausbewegungen und -klicks, Tastatureingaben, GPS-Signale, die sich im Laufe der Zeit ändern, wenn sich der Benutzer mit seinem Gerät bewegt, Gyroskop-Signale des Geräts, Berührungsereignisse und so weiter.
-
Reaktion auf und Verarbeitung aller latenzabhängigen IO-Ereignisse von der Festplatte oder aus dem Netzwerk, da IO von Natur aus asynchron ist (eine Anfrage wird gestellt, Zeit vergeht, eine Antwort wird empfangen oder nicht, was dann weitere Arbeit auslöst).
-
Verarbeitung von Ereignissen oder Daten, die einer Anwendung von einem Produzenten zugeführt werden, den sie nicht kontrollieren kann (Systemereignisse von einem Server, die oben erwähnten Benutzerereignisse, Signale von der Hardware, von der analogen Welt ausgelöste Ereignisse von Sensoren und so weiter).
Wenn der betreffende Code nur einen einzigen Ereignisstrom verarbeitet, ist die reaktiv-imperative Programmierung mit einem Callback in Ordnung und die reaktiv-funktionale Programmierung bringt dir keine großen Vorteile. Du kannst Hunderte von verschiedenen Ereignisströmen haben, und wenn sie alle völlig unabhängig voneinander sind, wird die imperative Programmierung wahrscheinlich kein Problem darstellen. In solchen einfachen Anwendungsfällen sind imperative Ansätze am effizientesten, weil sie die Abstraktionsebene der reaktiven Programmierung eliminieren und näher an dem bleiben, wofür die aktuellen Betriebssysteme, Sprachen und Compiler optimiert sind.
Wenn dein Programm jedoch wie die meisten anderen ist, musst du Ereignisse (oder asynchrone Antworten von Funktionen oder Netzwerkaufrufen) kombinieren, bedingte Logik zwischen ihnen einsetzen und Fehlerszenarien sowie die Bereinigung von Ressourcen für alle diese Ereignisse behandeln. An dieser Stelle wird der reaktiv-imperative Ansatz immer komplexer und die reaktiv-funktionale Programmierung kommt erst richtig zur Geltung. Eine unwissenschaftliche Ansicht, die ich mittlerweile akzeptiere, ist, dass die reaktiv-funktionale Programmierung anfangs eine höhere Lernkurve und Einstiegshürde hat, dass aber die Obergrenze für die Komplexität viel niedriger ist als bei der reaktiv-imperativen Programmierung.
Daher kommt auch der Slogan für Reactive Extensions (Rx) im Allgemeinen und RxJava im Besonderen: "eine Bibliothek zum Erstellen asynchroner und ereignisbasierter Programme". RxJava ist eine konkrete Umsetzung reaktiver Programmierprinzipien, die von funktionaler und Datenflussprogrammierung beeinflusst sind. Es gibt verschiedene Ansätze, um "reaktiv" zu sein, und RxJava ist nur einer von ihnen. Schauen wir uns an, wie es funktioniert.
So funktioniert RxJava
Das Herzstück von RxJava ist der Typ Observable
, der einen Strom von Daten oder Ereignissen darstellt. Er ist für Push (reaktiv) gedacht, kann aber auch für Pull (interaktiv) verwendet werden. Er ist eher lazy als eager. Er kann asynchron oder synchron verwendet werden. Er kann 0, 1, viele oder unendlich viele Werte oder Ereignisse über die Zeit darstellen.
Das sind eine Menge Schlagworte und Details, also packen wir sie aus. Die vollständigen Details findest du in "Anatomie von rx.Observable".
Push versus Pull
Da RxJava reaktiv ist, unterstützt es Push-Ereignisse, so dass die Observable
und die zugehörigen Observer
Typsignaturen Ereignisse unterstützen, die an sie weitergeleitet werden. Dies wiederum geht in der Regel mit Asynchronität einher, die im nächsten Abschnitt behandelt wird. Der Typ Observable
unterstützt aber auch einen asynchronen Rückkanal (manchmal auch als async-pull oder reactive-pull bezeichnet) als Ansatz zur Flusskontrolle oder zum Gegendruck in asynchronen Systemen. Ein späterer Abschnitt in diesem Kapitel wird sich mit der Flusskontrolle befassen und damit, wie dieser Mechanismus dazu passt.
Um den Empfang von Ereignissen per Push zu unterstützen, verbindet sich ein Observable
/Observer
Paar per Abonnement. Die Observable
repräsentiert den Datenstrom und kann von einem Observer
abonniert werden (mehr dazu erfährst du in "Erfassen aller Benachrichtigungen mit Hilfe von Observer<T>"):
interface
Observable
<
T
>
{
Subscription
subscribe
(
Observer
s
)
}
Nach der Anmeldung können drei Arten von Ereignissen an Observer
gesendet werden:
-
Daten über die Funktion
onNext()
-
Fehler (Exceptions oder Throwables) über die Funktion
onError()
-
Streamabschluss über die Funktion
onCompleted()
interface
Observer
<
T
>
{
void
onNext
(
T
t
)
void
onError
(
Throwable
t
)
void
onCompleted
()
}
Die Methode onNext()
kann nie, einmal, mehrmals oder unendlich oft aufgerufen werden. onError()
und onCompleted()
sind Terminal-Ereignisse, was bedeutet, dass nur eines von ihnen und nur einmal aufgerufen werden kann. Wenn ein Terminal-Ereignis aufgerufen wird, ist der Stream Observable
beendet und es können keine weiteren Ereignisse über ihn gesendet werden. Wenn der Stream unendlich ist und nicht fehlschlägt, können Terminalereignisse nie auftreten.
Wie in "Flusskontrolle" und "Gegendruck" gezeigt wird , gibt es eine zusätzliche Art von Unterschrift, die interaktives Ziehen ermöglicht:
interface
Producer
{
void
request
(
long
n
)
}
Dies wird mit einem fortschrittlicheren Observer
verwendet, das Subscriber
heißt (mehr dazu in "Steuerung von Listenern durch Verwendung von Subscription und Subscriber<T>"):
interface
Subscriber
<
T
>
implements
Observer
<
T
>,
Subscription
{
void
onNext
(
T
t
)
void
onError
(
Throwable
t
)
void
onCompleted
()
...
void
unsubscribe
()
void
setProducer
(
Producer
p
)
}
Die Funktion unsubcribe
als Teil der Schnittstelle Subscription
wird verwendet, um einem Abonnenten die Abmeldung von einem Observable
Stream zu ermöglichen. Die Funktion setProducer
und die Typen Producer
werden verwendet, um einen bidirektionalen Kommunikationskanal zwischen Produzent und Konsument zu bilden, der zur Flusskontrolle dient.
Async versus Sync
In der Regel ist eine Observable
asynchron, aber das muss nicht sein. Eine Observable
kann synchron sein und ist sogar standardmäßig synchron. RxJava fügt niemals Gleichzeitigkeit hinzu, es sei denn, es wird darum gebeten. Eine synchrone Observable
wird abonniert, sendet alle Daten über den Thread des Abonnenten und wird abgeschlossen (wenn sie endlich ist). Eine Observable
, die durch blockierende Netzwerk-E/A unterstützt wird, würde den abonnierenden Thread synchron blockieren und dann über onNext()
senden, wenn die blockierende Netzwerk-E/A zurückkehrt.
Das Folgende ist zum Beispiel komplett synchron:
Observable
.
create
(
s
->
{
s
.
onNext
(
"Hello World!"
);
s
.
onCompleted
();
}).
subscribe
(
hello
->
System
.
out
.
println
(
hello
));
Mehr über Observable.create
erfährst du in "Beherrschen von Observable.create()" und Observable.subscribe
in "Abonnieren von Benachrichtigungen aus Observable".
Du denkst jetzt wahrscheinlich, dass dies im Allgemeinen nicht das gewünschte Verhalten eines reaktiven Systems ist, und du hast recht. Es ist keine gute Idee, ein Observable
mit synchroner blockierender E/A zu verwenden (wenn blockierende E/A verwendet werden muss, muss sie mit Threads asynchron gemacht werden). Manchmal ist es jedoch sinnvoll, Daten synchron aus einem In-Memory-Cache zu holen und sie sofort zurückzugeben. Der "Hello World"-Fall aus dem vorherigen Beispiel braucht keine Gleichzeitigkeit und wird sogar viel langsamer, wenn er mit einem asynchronen Zeitplanungsprogramm versehen wird. Das eigentliche Kriterium, das im Allgemeinen wichtig ist, ist also, ob die Observable
Ereignisproduktion blockierend oder nicht blockierend ist, und nicht, ob sie synchron oder asynchron ist. Das "Hello World"-Beispiel ist nicht blockierend, weil es nie einen Thread blockiert, also ist es eine korrekte (wenn auch überflüssige) Verwendung eines Observable
.
RxJava Observable
ist bewusst unabhängig von Asynchronität und Synchronität und davon, ob es Gleichzeitigkeit gibt oder woher sie kommt. Das ist so gewollt und erlaubt es der Implementierung von Observable
zu entscheiden, was am besten ist. Warum könnte das nützlich sein?
Zunächst einmal kann die Gleichzeitigkeit von mehreren Stellen ausgehen, nicht nur von Threadpools. Wenn die Datenquelle bereits asynchron ist, weil sie sich in einer Ereignisschleife befindet, sollte RxJava nicht noch mehr Scheduling-Overhead hinzufügen oder eine bestimmte Scheduling-Implementierung erzwingen. Gleichzeitigkeit kann von Threadpools, Ereignisschleifen, Akteuren und so weiter kommen. Sie kann hinzugefügt werden oder sie kann aus der Datenquelle stammen. RxJava ist unabhängig davon, woher die Asynchronität kommt.
Zweitens gibt es zwei gute Gründe für synchrones Verhalten, die wir uns in den folgenden Unterabschnitten ansehen werden.
In-Memory-Daten
Wenn die Daten in einem lokalen In-Memory-Cache gespeichert sind (mit konstanten Suchzeiten im Mikrosekunden- oder Nanosekundenbereich), ist es nicht sinnvoll, die Kosten für das Scheduling zu tragen, um sie asynchron zu machen. Die Observable
kann die Daten einfach synchron abrufen und sie an den abonnierenden Thread weiterleiten, wie hier gezeigt:
Observable
.
create
(
s
->
{
s
.
onNext
(
cache
.
get
(
SOME_KEY
));
s
.
onCompleted
();
}).
subscribe
(
value
->
System
.
out
.
println
(
value
));
Dieses Zeitplannungsprogramm ist sehr leistungsfähig, wenn sich die Daten im Speicher befinden oder nicht. Wenn sie sich im Speicher befinden, gibst du sie synchron aus; wenn nicht, führst du den Netzwerkaufruf asynchron aus und gibst die Daten zurück, wenn sie ankommen. Diese Entscheidung kann bedingt in der Observable
enthalten sein:
// pseudo-code
Observable
.
create
(
s
->
{
T
fromCache
=
getFromCache
(
SOME_KEY
);
if
(
fromCache
!=
null
)
{
// emit synchronously
s
.
onNext
(
fromCache
);
s
.
onCompleted
();
}
else
{
// fetch asynchronously
getDataAsynchronously
(
SOME_KEY
)
.
onResponse
(
v
->
{
putInCache
(
SOME_KEY
,
v
);
s
.
onNext
(
v
);
s
.
onCompleted
();
})
.
onFailure
(
exception
->
{
s
.
onError
(
exception
);
});
}
}).
subscribe
(
s
->
System
.
out
.
println
(
s
));
Synchrone Berechnungen (z. B. Operatoren)
Der häufigste Grund, synchron zu bleiben, ist die Komposition und Transformation von Datenströmen mit Hilfe von Operatoren. RxJava verwendet meist die große API von Operatoren, die zur Manipulation, Kombination und Transformation von Daten verwendet werden, wie map()
, filter()
, take()
, flatMap()
und groupBy()
. Die meisten dieser Operatoren sind synchron, d. h., sie führen ihre Berechnungen synchron innerhalb von onNext()
aus, während die Ereignisse vorbeiziehen.
Diese Operatoren sind aus Leistungsgründen synchron. Nimm dies als Beispiel:
Observable
<
Integer
>
o
=
Observable
.
create
(
s
->
{
s
.
onNext
(
1
);
s
.
onNext
(
2
);
s
.
onNext
(
3
);
s
.
onCompleted
();
});
o
.
map
(
i
->
"Number "
+
i
)
.
subscribe
(
s
->
System
.
out
.
println
(
s
));
Wäre der Operator map
standardmäßig asynchron, würde jede Zahl (1, 2, 3) in einen Thread eingeplant, in dem die String-Verkettung durchgeführt würde ("Zahl " + i). Das ist sehr ineffizient und hat in der Regel eine unbestimmte Latenzzeit aufgrund von Scheduling, Kontextwechsel und so weiter.
Es ist wichtig zu verstehen, dass die meisten Observable
Funktionspipelines synchron sind (es sei denn, ein bestimmter Operator muss asynchron sein, wie timeout
oder observeOn
), während Observable
selbst asynchron sein kann. Diese Themen werden in den Kapiteln "Declarative Concurrency with observeOn()" und "Timing Out When Events Do Not Occurrence" ausführlicher behandelt .
Das folgende Beispiel demonstriert diese Mischung aus sync und async:
Observable
.
create
(
s
->
{
...
async
subscription
and
data
emission
...
})
.
doOnNext
(
i
->
System
.
out
.
println
(
Thread
.
currentThread
()))
.
filter
(
i
->
i
%
2
==
0
)
.
map
(
i
->
"Value "
+
i
+
" processed on "
+
Thread
.
currentThread
())
.
subscribe
(
s
->
System
.
out
.
println
(
"SOME VALUE =>"
+
s
));
System
.
out
.
println
(
"Will print BEFORE values are emitted"
)
In diesem Beispiel ist Observable
asynchron (es sendet auf einem anderen Thread als dem des Abonnenten), also ist subscribe
nicht blockierend und println
am Ende wird ausgegeben, bevor die Ereignisse propagiert werden und die Ausgabe "SOME VALUE ⇒" angezeigt wird.
Die Funktionen filter()
und map()
werden jedoch synchron in dem aufrufenden Thread ausgeführt, der die Ereignisse sendet. Das ist im Allgemeinen das Verhalten, das wir wollen: eine asynchrone Pipeline (die Operatoren Observable
und composed) mit effizienter synchroner Berechnung der Ereignisse.
Der Typ Observable
selbst unterstützt also sowohl synchrone als auch asynchrone konkrete Implementierungen, und das ist so gewollt.
Gleichzeitigkeit und Parallelität
Einzelne Observable
Streams erlauben weder Gleichzeitigkeit noch Parallelität. Stattdessen werden sie durch die Komposition von asynchronen Observables
erreicht.
Parallelität ist die gleichzeitige Ausführung von Aufgaben, normalerweise auf verschiedenen CPUs oder Maschinen. Gleichzeitigkeit hingegen ist die Zusammensetzung oder Verschachtelung mehrerer Aufgaben. Wenn auf einer einzelnen CPU mehrere Aufgaben (z. B. Threads) laufen, werden sie zwar gleichzeitig, aber nicht parallel durch "Time Slicing" ausgeführt. Jeder Thread erhält einen Teil der CPU-Zeit, bevor er an einen anderen Thread übergeben wird, auch wenn ein Thread noch nicht fertig ist.
Parallele Ausführung ist per Definition gleichlaufend, aber Gleichzeitigkeit ist nicht unbedingt Parallelität. In der Praxis bedeutet das, dass Multithreading Gleichzeitigkeit ist, aber Parallelität tritt nur auf, wenn diese Threads auf verschiedenen CPUs genau zur gleichen Zeit ausgeführt werden. Wir sprechen also allgemein von Gleichzeitigkeit und Gleichzeitigkeit, aber Parallelität ist eine besondere Form der Gleichzeitigkeit.
Der Vertrag von RxJava Observable
besagt, dass Ereignisse (onNext()
, onCompleted()
, onError()
) niemals gleichzeitig ausgegeben werden können. Mit anderen Worten: Ein einzelner Observable
Stream muss immer serialisiert und thread-sicher sein. Jedes Ereignis kann von einem anderen Thread ausgegeben werden, solange die Ausgaben nicht gleichzeitig erfolgen. Das bedeutet, dass onNext()
nicht verschachtelt oder gleichzeitig ausgeführt werden darf. Wenn onNext()
noch auf einem Thread ausgeführt wird, kann ein anderer Thread nicht damit beginnen, es erneut aufzurufen (Verschachtelung).
Hier ist ein Beispiel dafür, was in Ordnung ist:
Observable
.
create
(
s
->
{
new
Thread
(()
->
{
s
.
onNext
(
"one"
);
s
.
onNext
(
"two"
);
s
.
onNext
(
"three"
);
s
.
onNext
(
"four"
);
s
.
onCompleted
();
}).
start
();
});
Dieser Code gibt die Daten sequentiell aus und erfüllt somit den Vertrag. (Beachten Sie jedoch, dass es im Allgemeinen nicht ratsam ist, einen solchen Thread innerhalb eines Observable
zu starten. Verwenden Sie stattdessen Zeitplanungsprogramme, wie in "Multithreading in RxJava" beschrieben).
Hier ist ein Beispiel für einen illegalen Code:
// DO NOT DO THIS
Observable
.
create
(
s
->
{
// Thread A
new
Thread
(()
->
{
s
.
onNext
(
"one"
);
s
.
onNext
(
"two"
);
}).
start
();
// Thread B
new
Thread
(()
->
{
s
.
onNext
(
"three"
);
s
.
onNext
(
"four"
);
}).
start
();
// ignoring need to emit s.onCompleted() due to race of threads
});
// DO NOT DO THIS
Dieser Code ist illegal, weil er zwei Threads enthält, die beide gleichzeitig onNext()
aufrufen können. Das bricht den Vertrag. (Außerdem müsste er sicher warten, bis beide Threads fertig sind, um onComplete
aufzurufen, und wie bereits erwähnt, ist es generell eine schlechte Idee, solche Threads manuell zu starten).
Wie nutzt du also die Vorteile von Gleichzeitigkeit und/oder Parallelität mit RxJava? Komposition.
Ein einzelner Observable
Stream wird immer serialisiert, aber jeder Observable
Stream kann unabhängig voneinander und somit gleichzeitig und/oder parallel arbeiten. Aus diesem Grund werden merge
und flatMap
in RxJava so häufig verwendet, um asynchrone Streams gleichzeitig zusammenzustellen. (Mehr über die Details von merge
und flatMap
erfährst du in "Wrapping Up Using flatMap()" und "Treating Several Observables as One Using merge()") .
Das folgende Beispiel zeigt die Mechanik von zwei asynchronen Observables
, die auf getrennten Threads laufen und zusammengeführt werden:
Observable
<
String
>
a
=
Observable
.
create
(
s
->
{
new
Thread
(()
->
{
s
.
onNext
(
"one"
);
s
.
onNext
(
"two"
);
s
.
onCompleted
();
}).
start
();
});
Observable
<
String
>
b
=
Observable
.
create
(
s
->
{
new
Thread
(()
->
{
s
.
onNext
(
"three"
);
s
.
onNext
(
"four"
);
s
.
onCompleted
();
}).
start
();
});
// this subscribes to a and b concurrently,
// and merges into a third sequential stream
Observable
<
String
>
c
=
Observable
.
merge
(
a
,
b
);
Observable c
empfängt Artikel sowohl von a
als auch von b
, und aufgrund ihrer Asynchronität treten drei Dinge auf:
-
"Eins" wird vor "Zwei" erscheinen
-
"drei" wird vor "vier" erscheinen
-
Die Reihenfolge zwischen eins/zwei und drei/vier ist unbestimmt
Warum kann onNext()
nicht einfach gleichzeitig aufgerufen werden?
In erster Linie, weil onNext()
für uns Menschen gedacht ist, und Gleichzeitigkeit ist schwierig. Wenn onNext()
gleichzeitig aufgerufen werden könnte, würde das bedeuten, dass jeder Observer
defensiv für gleichzeitige Aufrufe programmieren müsste, auch wenn sie nicht erwartet oder gewünscht werden.
Ein zweiter Grund ist, dass einige Operationen bei gleichzeitiger Emission einfach nicht möglich sind, z. B. scan
und reduce
, die gängige und wichtige Verhaltensweisen sind. Operatoren wie scan
und reduce
erfordern eine sequentielle Ereignisfortpflanzung, damit der Zustand in Ereignisströmen akkumuliert werden kann, die nicht assoziativ und kommutativ sind. Die Zulassung gleichzeitiger Observable
Streams (mit gleichzeitigen onNext()
) würde die Arten von Ereignissen, die verarbeitet werden können, einschränken und erfordert thread-sichere Datenstrukturen.
Hinweis
Der Java 8 Stream
Typ unterstützt gleichzeitige Aufrufe. Aus diesem Grund verlangt java.util.stream.Stream
, dass die Funktionen von reduce
assoziativ sind, da sie gleichzeitige Aufrufe auf parallelen Streams unterstützen müssen. Die Dokumentation des java.util.stream
Pakets über Parallelität, Ordnung (in Verbindung mit Kommutativität), Reduktionsoperationen und Assoziativität zeigt die Komplexität desselben Stream
Typs, der sowohl sequentielle als auch gleichzeitige Emission ermöglicht.
Ein dritter Grund ist, dass die Leistung durch den Synchronisations-Overhead beeinträchtigt wird, da alle Beobachter und Operatoren thread-sicher sein müssen, auch wenn die Daten meistens sequentiell ankommen. Obwohl die JVM oft in der Lage ist, den Synchronisations-Overhead zu eliminieren, ist dies nicht immer möglich (insbesondere bei nicht blockierenden Algorithmen, die Atomics verwenden), so dass dies letztendlich eine Leistungssteuer ist, die bei sequenziellen Streams nicht benötigt wird.
Außerdem ist es oft langsamer, eine generische, feinkörnige Parallelität durchzuführen. Parallelität muss in der Regel grob erfolgen, z. B. in Stapeln, um den Aufwand für das Umschalten von Threads, die Planung der Arbeit und die Neukombination zu kompensieren. Es ist viel effizienter, eine synchrone Ausführung in einem einzigen Thread durchzuführen und die vielen Speicher- und CPU-Optimierungen für sequenzielle Berechnungen zu nutzen. Auf einer List
oder array
ist es recht einfach, vernünftige Vorgaben für die Stapelparallelität zu machen, weil alle Elemente im Voraus bekannt sind und in Stapel aufgeteilt werden können (obwohl es selbst dann oft schneller ist, die gesamte Liste auf einer einzigen CPU zu verarbeiten, es sei denn, die Liste ist sehr groß oder der Rechenaufwand pro Element ist erheblich). Ein Stream hingegen kennt die Arbeit nicht im Voraus, er empfängt die Daten nur über onNext()
und kann die Arbeit daher nicht automatisch aufteilen.
Tatsächlich wurde vor RxJava v1 ein .parallel(Function f)
Operator hinzugefügt, um zu versuchen, sich wie java.util.stream.Stream.parallel()
zu verhalten, weil das als eine nette Bequemlichkeit angesehen wurde. Damit wurde versucht, den RxJava-Vertrag nicht zu brechen, indem ein einzelnes Observable
in viele Observable
aufgeteilt wurde, die alle parallel ausgeführt und dann wieder zusammengeführt wurden. Sie wurde jedoch noch vor der Version 1 aus der Bibliothek entfernt, weil sie sehr verwirrend war und fast immer zu einer schlechteren Leistung führte. Das Hinzufügen von parallelen Berechnungen zu einem Strom von Ereignissen muss fast immer durchdacht und getestet werden. Vielleicht könnte eine ParallelObservable
sinnvoll sein, bei der die Operatoren auf eine Teilmenge beschränkt sind, die Assoziativität voraussetzt, aber in den Jahren, in denen RxJava verwendet wird, hat sich das nie gelohnt, weil die Komposition mit merge
und flatMap
effektive Bausteine für die Anwendungsfälle sind.
In Kapitel 3 lernst du, wie du mit Hilfe von Operatoren Observables
zusammenstellst, um von Gleichzeitigkeit und Parallelität zu profitieren.
Faul versus eifrig
Der Typ Observable
ist träge, das heißt, er tut nichts, bis er abonniert wird. Dies unterscheidet sich von einem eifrigen Typ wie Future
, der bei seiner Erstellung aktive Arbeit leistet. Lazyiness ermöglicht es, Observables
ohne Datenverlust aufgrund von Race Conditions ohne Caching zusammenzusetzen. Bei einem Future
ist dies kein Problem, da der einzelne Wert zwischengespeichert werden kann, d. h., wenn der Wert vor der Zusammenstellung geliefert wird, wird er geholt. Bei einem unbeschränkten Stream wäre ein unbeschränkter Puffer erforderlich, um die gleiche Garantie zu bieten. Daher ist Observable
faul und startet erst, wenn es abonniert wurde, damit die Zusammenstellung abgeschlossen werden kann, bevor die Daten fließen.
In der Praxis bedeutet das zwei Dinge:
- Subskription, nicht Bau beginnt Arbeit
-
Aufgrund der Faulheit von
Observable
verursacht die Erstellung eines solchen Objekts keine Arbeit (abgesehen von der "Arbeit" der Zuweisung desObservable
Objekts selbst). Es wird lediglich festgelegt, welche Arbeit geleistet werden soll, wenn es schließlich abonniert wird. Betrachte einObservable
, das wie folgt definiert ist:Observable
<
T
>
someData
=
Observable
.
create
(
s
->
{
getDataFromServerWithCallback
(
args
,
data
->
{
s
.
onNext
(
data
);
s
.
onCompleted
();
});
})
Die Referenz
someData
existiert jetzt, abergetDataFromServerWithCallback
wird noch nicht ausgeführt. Alles, was passiert ist, ist, dass derObservable
Wrapper um eine auszuführende Arbeitseinheit deklariert wurde, nämlich die Funktion, die sich innerhalb vonObservable
befindet.Wenn du die
Observable
abonnierst, wird die Arbeit erledigt:someData
.
subscribe
(
s
->
System
.
out
.
println
(
s
));
Dies führt die Arbeit, die durch die
Observable
dargestellt wird, träge aus. - Observables können wiederverwendet werden
-
Da
Observable
lazy ist, bedeutet das auch, dass eine bestimmte Instanz mehr als einmal aufgerufen werden kann. In Anlehnung an das vorherige Beispiel bedeutet das, dass wir Folgendes tun können:someData
.
subscribe
(
s
->
System
.
out
.
println
(
"Subscriber 1: "
+
s
));
someData
.
subscribe
(
s
->
System
.
out
.
println
(
"Subscriber 2: "
+
s
));
Jetzt gibt es zwei getrennte Abonnements, die jeweils
getDataFromServerWithCallback
aufrufen und Ereignisse ausgeben.Diese Faulheit unterscheidet sich von asynchronen Typen wie
Future
, bei denenFuture
erstellt wird, um bereits begonnene Arbeit zu repräsentieren. EinFuture
kann nicht wiederverwendet werden (mehrfach abonniert werden, um Arbeit auszulösen). Wenn ein Verweis auf einFuture
existiert, bedeutet das, dass die Arbeit bereits begonnen hat. Im vorangegangenen Beispielcode kannst du genau sehen, wo der Eifer liegt; die MethodegetDataFromServerWithCallback
ist eifrig, weil sie sofort ausgeführt wird, wenn sie aufgerufen wird. Wenn du eineObservable
-Methode um diegetDataFromServerWithCallback
-Methode wickelst, kannst du sie "faul" verwenden.
Diese Faulheit ist bei der Komposition sehr stark. Zum Beispiel:
someData
.
onErrorResumeNext
(
lazyFallback
)
.
subscribe
(
s
->
System
.
out
.
println
(
s
));
In diesem Fall stellt lazyFallback
Observable
eine Arbeit dar, die erledigt werden kann, aber nur erledigt wird, wenn sie abonniert wird, und die wir nur abonniert haben wollen, wenn someData
fehlschlägt. Natürlich können eager-Typen durch Funktionsaufrufe (z. B. getDataAsFutureA()
) faul gemacht werden.
Eifrigkeit und Faulheit haben beide ihre Stärken und Schwächen, aber RxJava Observable
ist faul. Wenn du also eine Observable
hast, wird sie nichts tun, bis du sie abonnierst.
Dieses Thema wird in "Die Faulheit umarmen" ausführlicher behandelt .
Dualität
Ein Rx Observable
ist das asynchrone "Zwillingspaar" eines Iterable
. Mit "Zwillingspaar" meinen wir, dass das Observable
alle Funktionen eines Iterable
bietet, außer dem umgekehrten Datenfluss: Es ist ein Push- statt ein Pull-Typ. In der folgenden Tabelle sind Typen aufgeführt, die sowohl Push- als auch Pull-Funktionen bieten:
Ziehen (iterierbar) | Drücken (Observable) |
---|---|
T next() |
onNext(T) |
throws Exception |
onError(Throwable) |
gibt zurück |
onCompleted() |
Wie in der Tabelle dargestellt, werden die Daten nicht vom Konsumenten über next()
abgeholt, sondern vom Produzenten an onNext(T)
gesendet. Die erfolgreiche Beendigung wird über den Callback onCompleted()
signalisiert, anstatt den Thread zu blockieren, bis alle Elemente iteriert wurden. Anstelle von Ausnahmen, die auf den Callstack geworfen werden, werden Fehler als Ereignisse an den onError(Throwable)
Callback gesendet.
Die Tatsache, dass es sich wie ein Dual verhält, bedeutet, dass alles, was du synchron per Pull mit einem Iterable
und Iterator
tun kannst, auch asynchron per Push mit einem Observable
und Observer
getan werden kann. Das bedeutet, dass das gleiche Programmiermodell auf beide angewendet werden kann!
Ab Java 8 kann zum Beispiel ein Iterable
so erweitert werden, dass die Funktionskomposition über den Typ java.util.stream.Stream
funktioniert:
// Iterable<String> as Stream<String>
// that contains 75 strings
getDataFromLocalMemorySynchronously
()
.
skip
(
10
)
.
limit
(
5
)
.
map
(
s
->
s
+
"_transformed"
)
.
forEach
(
System
.
out
::
println
)
Damit werden 75 Strings von getDataFromLocalMemorySynchronously()
abgerufen, die Einträge 11-15 erhalten und der Rest ignoriert, die Strings transformiert und ausgedruckt. (Mehr über Operatoren wie take
, skip
und limit
erfährst du in "Slicing and Dicing Using skip(), takeWhile(), and Others") .
Ein RxJava Observable
wird auf die gleiche Weise verwendet:
// Observable<String>
// that emits 75 strings
getDataFromNetworkAsynchronously
()
.
skip
(
10
)
.
take
(
5
)
.
map
(
s
->
s
+
"_transformed"
)
.
subscribe
(
System
.
out
::
println
)
Es empfängt 5 Strings (15 wurden gesendet, aber die ersten 10 wurden verworfen) und meldet sich dann ab (ignoriert oder stoppt den Rest der Strings, die gesendet werden sollten). Es wandelt die Strings um und druckt sie aus, genau wie das vorherige Iterable
/Stream
Beispiel.
Mit anderen Worten, der Rx Observable
ermöglicht die Programmierung mit asynchronen Daten per Push, genauso wie Streams
um Iterables
und Lists
mit synchronem Pull.
Kardinalität
Die Observable
unterstützt das asynchrone Pushen mehrerer Werte. Das passt gut in die untere rechte Ecke der folgenden Tabelle, dem asynchronen Dual von Iterable
(oder Stream
, List
, Enumerable
, etc.) und der mehrwertigen Version eines Future
:
Eine | Viele | |
---|---|---|
Synchron |
T getData() |
Iterable<T> getData() |
Asynchron |
Future<T> getData() |
Observable<T> getData() |
Beachte, dass sich dieser Abschnitt allgemein auf Future
bezieht. Er verwendet die Future.onSuccess(callback)
Syntax, um sein Verhalten darzustellen. Es gibt verschiedene Implementierungen, wie z. B. CompletableFuture
, ListenableFuture
, oder die Scala Future
. Aber was auch immer du tust, verwende nicht java.util.Future
, da dies eine Blockierung erfordert, um einen Wert abzurufen.
Warum also könnte Observable
wertvoll sein und nicht nur Future
? Der offensichtlichste Grund ist, dass du es entweder mit einem Ereignisstrom oder einer mehrwertigen Antwort zu tun hast. Der weniger offensichtliche Grund ist die Zusammensetzung von mehreren einwertigen Antworten. Schauen wir uns diese beiden Gründe an.
Event-Stream
Der Ereignisstrom ist ganz einfach. Mit der Zeit schickt der Produzent Ereignisse an den Konsumenten, wie hier gezeigt:
// producer
Observable
<
Event
>
mouseEvents
=
...;
// consumer
mouseEvents
.
subscribe
(
e
->
doSomethingWithEvent
(
e
));
Das funktioniert nicht sehr gut mit einer Future
:
// producer
Future
<
Event
>
mouseEvents
=
...;
// consumer
mouseEvents
.
onSuccess
(
e
->
doSomethingWithEvent
(
e
));
Der onSuccess
Rückruf könnte das "letzte Ereignis" erhalten haben, aber es bleiben einige Fragen offen: Muss der Konsument jetzt pollen? Wird der Produzent sie in die Warteschlange stellen? Oder werden sie zwischen den einzelnen Abrufen verloren gehen? Die Observable
ist hier definitiv von Vorteil. In Ermangelung von Observable
wäre ein Callback-Ansatz besser, als dies mit Future
zu modellieren.
Mehrere Werte
Mehrwertige Antworten sind die nächste Verwendung von Observable
. Grundsätzlich kann überall dort, wo List
, Iterable
oder Stream
verwendet werden, stattdessen Observable
verwendet werden:
// producer
Observable
<
Friend
>
friends
=
...
// consumer
friends
.
subscribe
(
friend
->
sayHello
(
friend
));
Das kann mit einer Future
funktionieren, etwa so:
// producer
Future
<
List
<
Friend
>>
friends
=
...
// consumer
friends
.
onSuccess
(
listOfFriends
->
{
listOfFriends
.
forEach
(
friend
->
sayHello
(
friend
));
});
Warum also der Observable<Friend>
Ansatz?
Wenn die Liste der zurückzugebenden Daten klein ist, spielt das für die Leistung wahrscheinlich keine Rolle und es ist eine subjektive Entscheidung. Wenn die Liste jedoch groß ist oder die entfernte Datenquelle verschiedene Teile der Liste von verschiedenen Orten abrufen muss, kann der Observable<Friend>
Ansatz einen Leistungs- oder Latenzvorteil bedeuten.
Der überzeugendste Grund ist, dass die Daten direkt nach Eingang verarbeitet werden können, anstatt darauf zu warten, dass die gesamte Sammlung eintrifft. Das gilt vor allem dann, wenn sich unterschiedliche Netzwerklatenzen im Backend auf jedes Element unterschiedlich auswirken können, was aufgrund von Long-Tail-Latenzen (z. B. in serviceorientierten oder Microservice-Architekturen) und gemeinsam genutzten Datenspeichern durchaus üblich ist. Wenn du auf die gesamte Sammlung wartest, erfährt der Verbraucher immer die maximale Latenz der gesamten Arbeit, die für die Sammlung geleistet wird. Wenn die Daten als Observable
zurückgegeben werden, erhält der Kunde sie sofort und die "Zeit bis zum ersten Datenpunkt" kann deutlich kürzer sein als beim letzten und langsamsten Datenpunkt. Damit das funktioniert, muss die Reihenfolge des Datenstroms geopfert werden, damit die Daten in der Reihenfolge gesendet werden können, in der der Server sie erhält. Wenn die Reihenfolge für den Verbraucher wichtig ist, kann eine Rangfolge oder Position in die Daten oder Metadaten der Objekte aufgenommen werden, und der Kunde kann die Objekte dann nach Bedarf sortieren oder positionieren.
Außerdem wird nur so viel Speicherplatz benötigt, wie für jeden Artikel benötigt wird, anstatt Speicher für die gesamte Sammlung zuzuweisen und zu sammeln.
Zusammensetzung
Ein mehrwertiger Observable
Typ ist auch nützlich, wenn du einwertige Antworten zusammenstellst, wie z.B. von Futures
.
Wenn sie mehrere Futures
zusammenführen, geben sie einen weiteren Future
mit einem einzigen Wert aus, wie zum Beispiel diesen:
CompletableFuture
<
String
>
f1
=
getDataAsFuture
(
1
);
CompletableFuture
<
String
>
f2
=
getDataAsFuture
(
2
);
CompletableFuture
<
String
>
f3
=
f1
.
thenCombine
(
f2
,
(
x
,
y
)
->
{
return
x
+
y
;
});
Das könnte genau das sein, was du willst, und ist in RxJava über Observable.zip
verfügbar (mehr dazu erfährst du in "Paarweises Zusammensetzen mit zip() und zipWith()"):
Observable
<
String
>
o1
=
getDataAsObservable
(
1
);
Observable
<
String
>
o2
=
getDataAsObservable
(
2
);
Observable
<
String
>
o3
=
Observable
.
zip
(
o1
,
o2
,
(
x
,
y
)
->
{
return
x
+
y
;
});
Das bedeutet jedoch, dass du warten musst, bis alle Futures
abgeschlossen sind, bevor du etwas ausgibst. Oft ist es besser, jeden zurückgegebenen Future
Wert zu senden, sobald er fertig ist. In diesem Fall ist Observable.merge
(oder das verwandte flatMap
) vorzuziehen. Sie ermöglicht es, die Ergebnisse (auch wenn es sich nur um einen Observable
handelt, der einen Wert sendet) zu einem Strom von Werten zusammenzufassen, die jeweils gesendet werden, sobald sie fertig sind:
Observable
<
String
>
o1
=
getDataAsObservable
(
1
);
Observable
<
String
>
o2
=
getDataAsObservable
(
2
);
// o3 is now a stream of o1 and o2 that emits each item without waiting
Observable
<
String
>
o3
=
Observable
.
merge
(
o1
,
o2
);
Single
Obwohl Rx Observable
sehr gut mit mehrwertigen Datenströmen umgehen kann, ist die Einfachheit einer einwertigen Darstellung sehr gut für das API-Design und die Nutzung. Außerdem ist das grundlegende Anfrage/Antwort-Verhalten in Anwendungen sehr verbreitet. Aus diesem Grund gibt es in RxJava den Typ Single
, der ein lazy Äquivalent zu Future
ist. Betrachten Sie ihn als Future
mit zwei Vorteilen: Erstens ist er lazy, d.h. er kann mehrfach abonniert und einfach zusammengesetzt werden, und zweitens passt er zur RxJava API, d.h. er kann einfach mit einer Observable
interagieren.
Betrachte zum Beispiel diese Accessors:
public
static
Single
<
String
>
getDataA
()
{
return
Single
.<
String
>
create
(
o
->
{
o
.
onSuccess
(
"DataA"
);
}).
subscribeOn
(
Schedulers
.
io
());
}
public
static
Single
<
String
>
getDataB
()
{
return
Single
.
just
(
"DataB"
)
.
subscribeOn
(
Schedulers
.
io
());
}
Diese können dann verwendet und optional wie folgt zusammengesetzt werden:
// merge a & b into an Observable stream of 2 values
Observable
<
String
>
a_merge_b
=
getDataA
().
mergeWith
(
getDataB
());
Beachte, wie zwei Single
s zu einem Observable
verschmolzen werden. Das kann zu einer Emission von [A, B] oder [B, A] führen, je nachdem, was zuerst fertig wird.
Um zum vorherigen Beispiel zurückzukehren, können wir jetzt Single
statt Observable
verwenden, um die Datenabrufe zu repräsentieren, aber sie zu einem Strom von Werten zusammenführen:
// Observable<String> o1 = getDataAsObservable(1);
// Observable<String> o2 = getDataAsObservable(2);
Single
<
String
>
s1
=
getDataAsSingle
(
1
);
Single
<
String
>
s2
=
getDataAsSingle
(
2
);
// o3 is now a stream of s1 and s2 that emits each item without waiting
Observable
<
String
>
o3
=
Single
.
merge
(
s1
,
s2
);
Die Verwendung von Single
anstelle von Observable
zur Darstellung eines "Stream of One" vereinfacht den Verbrauch, da ein Entwickler nur die folgenden Verhaltensweisen für den Typ Single
berücksichtigen muss:
-
Es kann mit einer Fehlermeldung antworten
-
Niemals antworten
-
Reagiere mit einem Erfolg
Vergleiche dies mit den zusätzlichen Staaten, die ein Verbraucher bei einer Observable
berücksichtigen muss:
-
Es kann mit einer Fehlermeldung antworten
-
Niemals antworten
-
Antwortet erfolgreich ohne Daten und beendet sich
-
Antworte erfolgreich mit einem einzigen Wert und beende
-
Reagiere erfolgreich mit mehreren Werten und beende
-
Antwortet erfolgreich mit einem oder mehreren Werten und bricht nie ab (wartet auf weitere Daten)
Durch die Verwendung von Single
ist das mentale Modell für die Nutzung der API einfacher, und erst nach der Komposition in eine Observable
muss ein Entwickler die zusätzlichen Zustände berücksichtigen. Das ist oft ein besserer Ort dafür, denn normalerweise kontrolliert der Entwickler diesen Code, während die Daten-API oft von einem Dritten stammt.
Mehr über Single
erfährst du in "Observable versus Single".
Vervollständigbar
Neben Single
gibt es in RxJava auch den Typ Completable
, der für den überraschend häufigen Fall gedacht ist, dass es keinen Rückgabetyp gibt, sondern nur die Notwendigkeit, einen erfolgreichen oder fehlgeschlagenen Abschluss darzustellen. Oft wird Observable<Void>
oder Single<Void>
verwendet. Das ist umständlich, deshalb wurde Completable
entwickelt, wie hier gezeigt wird:
Completable
c
=
writeToDatabase
(
"data"
);
Dieser Anwendungsfall tritt häufig bei asynchronen Schreibvorgängen auf, bei denen kein Rückgabewert erwartet wird, aber eine Benachrichtigung über den erfolgreichen oder fehlgeschlagenen Abschluss erforderlich ist. Der vorangehende Code mit Completable
ist ähnlich wie dieser:
Observable
<
Void
>
c
=
writeToDatabase
(
"data"
);
Die Completable
selbst ist eine Abstraktion für zwei Rückrufe, Vollendung und Fehlschlag, wie hier:
static
Completable
writeToDatabase
(
Object
data
)
{
return
Completable
.
create
(
s
->
{
doAsyncWrite
(
data
,
// callback for successful completion
()
->
s
.
onCompleted
(),
// callback for failure with Throwable
error
->
s
.
onError
(
error
));
});
}
Null bis unendlich
Observable
kann Kardinalitäten von null bis unendlich unterstützen (mehr dazu in "Unendliche Ströme"). Der Einfachheit und Klarheit halber ist Single
ein "Observable
of One" und Completable
ein "Observable
of None".
Mit diesen neu eingeführten Typen sieht unsere Tabelle am Ende wie folgt aus:
Null | Eine | Viele | |
---|---|---|---|
Synchron |
void doSomething() |
T getData() |
Iterable<T> getData() |
Asynchron |
Abschließbar doSomething() |
Single<T> getData() |
Observable<T> getData() |
Mechanische Sympathie: Blocking versus Nonblocking I/O
Bislang ging es bei den Argumenten für den reaktiv-funktionalen Programmierstil vor allem darum, eine Abstraktion über asynchrone Rückrufe zu schaffen, um eine leichtere Komposition zu ermöglichen. Es liegt auf der Hand, dass die gleichzeitige und nicht die sequentielle Ausführung von unverbundenen Netzwerkanfragen die Latenzzeit verkürzt, was der Grund für die Einführung von Asynchronität und Komposition ist.
Aber gibt es einen effizienten Grund für den reaktiven Ansatz (entweder imperativ oder funktional) bei der Ausführung von E/A? Gibt es Vorteile, wenn wir nicht blockierende E/A verwenden, oder ist es in Ordnung, E/A-Threads zu blockieren, um auf eine einzelne Netzwerkanfrage zu warten? Leistungstests, an denen ich bei Netflix beteiligt war, haben gezeigt, dass es objektive und messbare Effizienzvorteile gibt, wenn man nicht blockierende E/A und Ereignisschleifen gegenüber blockierenden E/A-Threads pro Anfrage einsetzt. In diesem Abschnitt erfährst du, warum das so ist und welche Daten dir helfen, deine eigene Entscheidung zu treffen.
Wie in "The Pursuit of Answers" erwähnt , wurden Tests durchgeführt, um die Leistung von blockierenden und nichtblockierenden E/A mit Tomcat und Netty unter Linux zu vergleichen. Da diese Art von Tests immer umstritten und schwer richtig zu machen ist, möchte ich klarstellen, dass dieser Test nur für das Folgende relevant ist:
-
Verhalten auf typischen Linux-Systemen, die um 2015/2016 verwendet wurden
-
Java 8 (OpenJDK und Oracle)
-
Unveränderter Tomcat und Netty, wie sie in typischen Produktionsumgebungen verwendet werden
-
Repräsentative Webdienst-Anfrage/Antwort-Arbeitslast, die die Komposition mehrerer anderer Webdienste beinhaltet
In diesem Zusammenhang haben wir Folgendes gelernt:
-
Der Netty-Code ist effizienter als der Tomcat-Code und verbraucht daher weniger CPU pro Anfrage.
-
Die Event-Loop-Architektur von Netty reduziert Thread-Migrationen unter Last, was die CPU-Cache-Wärme und die Speicherlokalität verbessert, was wiederum die CPU Instructions-per-Cycle (IPC) verbessert und den CPU-Zyklusverbrauch pro Anfrage senkt.
-
Der Code von Tomcat hat unter Last höhere Latenzen, weil die Thread-Pool-Architektur Sperren (und Sperrkonflikte) und Thread-Migrationen zur Bedienung der Last vorsieht.
Die folgende Grafik verdeutlicht den Unterschied zwischen den Architekturen am besten:
Beachte, wie die Linien mit zunehmender Belastung auseinanderlaufen. Das sind die Thread-Migrationen. Das Interessanteste, was ich gelernt habe, ist, dass die Netty-Anwendung tatsächlich effizienter wird, wenn sie unter Last steht und die Threads "heiß" werden und an einem CPU-Kern hängen bleiben. Tomcat hingegen hat für jede Anfrage einen eigenen Thread und kann daher diesen Vorteil nicht nutzen und behält höhere Thread-Migrationen bei, da jeder Thread für jede Anfrage geplant werden muss.
Der CPU-Verbrauch von Netty bleibt bei steigender Last weitgehend konstant und wird bei maximaler Last sogar etwas effizienter, im Gegensatz zu Tomcat, der weniger effizient wird.
Die daraus resultierenden Auswirkungen auf Latenz und Durchsatz sind in der folgenden Grafik zu sehen:
Obwohl Durchschnittswerte nicht sehr aussagekräftig sind (im Gegensatz zu Perzentilen), zeigt diese Grafik, dass die Latenzzeiten beider Systeme bei geringer Last ähnlich sind, aber bei steigender Last deutlich voneinander abweichen. Netty ist in der Lage, die Maschine bis zu einer höheren Auslastung besser auszulasten, was sich weniger auf die Latenz auswirkt:
Diese Grafik der maximalen Latenz wurde gewählt, um zu zeigen, wie sich die Ausreißer auf Nutzer und Systemressourcen auswirken. Netty handhabt die Last viel eleganter und vermeidet die schlimmsten Ausreißer.
Das folgende Bild zeigt den Durchsatz:
Aus diesen Erkenntnissen ergeben sich zwei große Vorteile. Erstens bedeuten bessere Latenzzeiten und ein höherer Durchsatz sowohl ein besseres Nutzererlebnis als auch geringere Infrastrukturkosten. Zweitens ist die Event-Loop-Architektur unter Last belastbarer. Anstatt zusammenzubrechen, wenn die Last erhöht wird, kann die Maschine bis an ihre Grenzen getrieben werden und bewältigt dies problemlos. Das ist ein sehr überzeugendes Argument für große Produktionssysteme, die unerwartete Lastspitzen bewältigen und reaktionsfähig bleiben müssen.
Außerdem fand ich die Event-Loop-Architektur einfacher zu bedienen. Sie muss nicht1 um eine optimale Leistung zu erzielen, während bei der Thread-per-Request-Architektur die Größe des Thread-Pools (und damit die Speicherbereinigung) je nach Arbeitsbelastung angepasst werden muss.
Dies soll keine erschöpfende Studie zu diesem Thema sein, aber ich fand dieses Experiment und die daraus resultierenden Daten als überzeugenden Beweis für die Verfolgung der "reaktiven" Architektur in Form von nicht blockierenden E/A und Ereignisschleifen. Mit anderen Worten: Mit der Hardware, dem Linux-Kernel und der JVM von 2015/2016 hat nonblocking I/O über Ereignisschleifen durchaus Vorteile.
Die Verwendung von Netty mit RxJava wird später in "Nonblocking HTTP Server mit Netty und RxNetty" näher erläutert .
Reaktive Abstraktion
Letztendlich sind die RxJava-Typen und Operatoren nur eine Abstraktion über imperative Callbacks. Diese Abstraktion verändert jedoch den Programmierstil komplett und bietet sehr leistungsfähige Werkzeuge für die asynchrone und blockierungsfreie Programmierung. Es ist mühsam zu lernen und erfordert ein Umdenken, um mit der Funktionskomposition und dem Denken in Streams zurechtzukommen, aber wenn du das geschafft hast, ist es ein sehr effektives Werkzeug neben unseren typischen objektorientierten und imperativen Programmierstilen.
Der Rest dieses Buches führt dich durch die vielen Details, wie RxJava funktioniert und wie du es verwenden kannst. Kapitel 2 erklärt, woher Observable
kommt und wie du sie nutzen kannst. Kapitel 3 führt dich durch mehrere Dutzend deklarative und leistungsstarke Transformationen.
1 Darüber hinaus gibt es vielleicht Diskussionen darüber, ob die Anzahl der Ereignisschleifen 1x, 1,5x oder 2x so groß wie die Anzahl der Kerne sein soll. Ich habe jedoch keine großen Unterschiede zwischen diesen Werten festgestellt und wähle in der Regel 1x.
Get Reaktive Programmierung mit RxJava now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.