Kapitel 4. Gleichzeitigkeitsmuster in Go
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Wir haben die Grundlagen der Gleichzeitigkeits-Primitive von Go erforscht und besprochen, wie man diese Primitive richtig einsetzt. In diesem Kapitel gehen wir der Frage nach, wie du diese Primitive zu Mustern zusammensetzen kannst, die dein System skalierbar und wartbar machen.
Bevor wir jedoch loslegen, müssen wir uns mit dem Format einiger der Muster in diesem Kapitel befassen. In vielen Beispielen werden wir Kanäle verwenden, die leere Schnittstellen (interface{}
) weitergeben. Die Verwendung von leeren Schnittstellen in Go ist umstritten, aber ich habe mich aus mehreren Gründen dafür entschieden. Der erste Grund ist, dass es einfacher ist, im weiteren Verlauf des Buches prägnante Beispiele zu schreiben. Zweitens bin ich der Meinung, dass dies in einigen Fällen repräsentativer für das ist, was das Muster zu erreichen versucht. Wir werden diesen Punkt im Abschnitt "Pipelines" genauer besprechen .
Wenn dir das zu umstritten ist, erinnere dich daran, dass du jederzeit Go-Generatoren für diesen Code erstellen kannst und die Muster generieren kannst, um den Typ zu verwenden, an dem du interessiert bist.
Also, lass uns eintauchen und einige Muster für die Gleichzeitigkeit in Go kennenlernen!
Gefangenschaft
Wenn du mit nebenläufigem Code arbeitest, gibt es ein paar verschiedene Möglichkeiten für einen sicheren Betrieb. Wir haben uns zwei davon angeschaut:
-
Synchronisationsprimitive für die gemeinsame Nutzung von Speicher (z. B.
sync.Mutex
) -
Synchronisierung über Kommunikation (z.B. Kanäle)
Es gibt jedoch noch ein paar andere Optionen, die innerhalb mehrerer gleichzeitiger Prozesse implizit sicher sind:
-
Unveränderliche Daten
-
Daten durch Einschluss geschützt
In gewisser Weise sind unveränderliche Daten ideal, weil sie implizit konkurrenzsicher sind. Jeder gleichzeitige Prozess kann mit denselben Daten arbeiten, darf sie aber nicht verändern. Wenn er neue Daten erstellen will, muss er eine neue Kopie der Daten mit den gewünschten Änderungen erstellen. Das entlastet nicht nur den Entwickler, sondern kann auch zu schnelleren Programmen führen, wenn es zu kleineren kritischen Abschnitten führt (oder sie ganz eliminiert). In Go kannst du dies erreichen, indem du Code schreibst, der Kopien von Werten anstelle von Zeigern auf Werte im Speicher verwendet. Einige Sprachen unterstützen die Verwendung von Zeigern mit explizit unveränderlichen Werten; Go gehört jedoch nicht zu diesen Sprachen.
Confinement kann auch eine geringere kognitive Belastung für den Entwickler und kleinere kritische Abschnitte bedeuten. Die Techniken zum Confinement von konkurrierenden Werten sind etwas komplizierter als die einfache Übergabe von Kopien von Werten, daher werden wir uns in diesem Kapitel eingehend mit diesen Confinement-Techniken beschäftigen.
Confinement ist die einfache, aber leistungsstarke Idee, die sicherstellt, dass Informationen immer nur von einem gleichzeitigen Prozess verfügbar sind. Wenn dies erreicht ist, ist ein nebenläufiges Programm implizit sicher und es ist keine Synchronisierung erforderlich. Es gibt zwei Arten von Confinement: ad hoc und lexikalisch.
Ad-hoc-Einschränkung bedeutet, dass du die Einschränkung durch eine Konvention erreichst - sei es durch die Sprachgemeinschaft, die Gruppe, in der du arbeitest, oder die Codebasis, in der du arbeitest. Meiner Meinung nach ist die Einhaltung von Konventionen in Projekten jeder Größe schwer zu erreichen, es sei denn, du verfügst über Werkzeuge, mit denen du deinen Code jedes Mal statisch analysieren kannst, wenn jemand eine Codeübergabe vornimmt. Hier ist ein Beispiel für Ad-hoc-Einschränkung, das zeigt, warum:
data
:=
make
([]
int
,
4
)
loopData
:=
func
(
handleData
chan
<-
int
)
{
defer
close
(
handleData
)
for
i
:=
range
data
{
handleData
<-
data
[
i
]
}
}
handleData
:=
make
(
chan
int
)
go
loopData
(
handleData
)
for
num
:=
range
handleData
{
fmt
.
Println
(
num
)
}
Wir sehen, dass der data
Slice von Ganzzahlen sowohl in der loopData
Funktion als auch in der Schleife über den handleData
Kanal zur Verfügung steht, aber aus Konvention greifen wir nur in der loopData
Funktion darauf zu. Da der Code aber von vielen Leuten angefasst wird und Fristen drohen, könnten Fehler gemacht werden und die Eingrenzung könnte zusammenbrechen und Probleme verursachen. Wie ich bereits erwähnt habe, kann ein Tool zur statischen Analyse diese Art von Problemen aufdecken, aber die statische Analyse einer Go-Codebasis setzt einen Reifegrad voraus, den nicht viele Teams erreichen. Deshalb bevorzuge ich lexikalisches Confinement: Es nutzt den Compiler, um das Confinement durchzusetzen.
Bei der lexikalischen Eingrenzung wird der lexikalische Geltungsbereich genutzt, um nur die richtigen Daten und Gleichzeitigkeitsprimitive für die Nutzung durch mehrere gleichzeitige Prozesse freizugeben. Das macht es unmöglich, etwas Falsches zu tun. Wir haben dieses Thema bereits in Kapitel 3 behandelt. Erinnere dich an den Abschnitt über Kanäle, in dem es darum geht, nur die Lese- oder Schreibaspekte eines Kanals für die gleichzeitigen Prozesse freizugeben, die sie benötigen. Schauen wir uns das Beispiel noch einmal an:
chanOwner
:=
func
(
)
<-
chan
int
{
results
:=
make
(
chan
int
,
5
)
go
func
(
)
{
defer
close
(
results
)
for
i
:=
0
;
i
<=
5
;
i
++
{
results
<-
i
}
}
(
)
return
results
}
consumer
:=
func
(
results
<-
chan
int
)
{
for
result
:=
range
results
{
fmt
.
Printf
(
"Received: %d\n"
,
result
)
}
fmt
.
Println
(
"Done receiving!"
)
}
results
:=
chanOwner
(
)
consumer
(
results
)
Hier instanziieren wir den Kanal innerhalb des lexikalischen Bereichs der Funktion
chanOwner
. Dadurch wird der Geltungsbereich des Schreibaspekts desresults
Kanals auf die darunter definierte Closure beschränkt. Mit anderen Worten: Der Schreibaspekt des Kanals wird eingeschränkt, um zu verhindern, dass andere Goroutinen in den Kanal schreiben können.Hier erhalten wir den Leseaspekt des Kanals und können ihn an den Verbraucher weitergeben, der nichts anderes tun kann, als von ihm zu lesen. Auch hier beschränkt sich die Haupt-Goroutine auf eine reine Leseansicht des Kanals.
Hier erhalten wir eine Nur-Lese-Kopie eines
int
Kanals. Indem wir angeben, dass wir nur Lesezugriff benötigen, beschränken wir die Nutzung des Kanals innerhalb der Funktionconsume
auf Lesezugriff.
Auf diese Weise ist es unmöglich, die Kanäle in diesem kleinen Beispiel zu nutzen. Das ist eine gute Einführung in das Confinement, aber wahrscheinlich kein sehr interessantes Beispiel, da Kanäle nebenläufigkeitssicher sind. Schauen wir uns ein Beispiel für Confinement an, bei dem eine Datenstruktur verwendet wird, die nicht nebenläufigkeitssicher ist, nämlich eine Instanz von bytes.Buffer
:
printData
:=
func
(
wg
*
sync
.
WaitGroup
,
data
[
]
byte
)
{
defer
wg
.
Done
(
)
var
buff
bytes
.
Buffer
for
_
,
b
:=
range
data
{
fmt
.
Fprintf
(
&
buff
,
"%c"
,
b
)
}
fmt
.
Println
(
buff
.
String
(
)
)
}
var
wg
sync
.
WaitGroup
wg
.
Add
(
2
)
data
:=
[
]
byte
(
"golang"
)
go
printData
(
&
wg
,
data
[
:
3
]
)
go
printData
(
&
wg
,
data
[
3
:
]
)
wg
.
Wait
(
)
Hier übergeben wir ein Slice, das die ersten drei Bytes in der
data
Struktur enthält.Hier übergeben wir ein Slice, das die letzten drei Bytes in der
data
Struktur enthält.
In diesem Beispiel kannst du sehen, dass printData
nicht auf das data
-Slice zugreifen kann, weil es nicht geschlossen ist, sondern ein Slice von byte
aufnehmen muss, um es zu bearbeiten. Wir übergeben verschiedene Teilmengen des Slice und beschränken so die Goroutinen, die wir starten, auf den Teil des Slice, den wir übergeben. Durch den lexikalischen Geltungsbereich haben wir es unmöglich gemacht1 Deshalb müssen wir den Speicherzugriff nicht synchronisieren und keine Daten über die Kommunikation austauschen.
Was ist also der Sinn? Warum sollten wir die Begrenzung verfolgen, wenn uns die Synchronisierung zur Verfügung steht? Die Antwort ist eine bessere Leistung und eine geringere kognitive Belastung für die Entwickler. Die Synchronisierung ist mit Kosten verbunden. Wenn du sie vermeiden kannst, hast du keine kritischen Abschnitte und musst daher auch nicht die Kosten für deren Synchronisierung tragen. Außerdem umgehst du eine ganze Reihe von Problemen, die bei der Synchronisierung auftreten können; die Entwickler müssen sich einfach nicht um diese Probleme kümmern. Gleichzeitiger Code, der lexikalische Begrenzung verwendet, hat außerdem den Vorteil, dass er in der Regel einfacher zu verstehen ist als gleichzeitiger Code ohne lexikalisch begrenzte Variablen. Das liegt daran, dass du im Kontext deines lexikalischen Bereichs synchronen Code schreiben kannst.
Allerdings kann es schwierig sein, Confinement zu etablieren, und deshalb müssen wir manchmal auf unsere wunderbaren Go-Gleichzeitigkeits-Primitive zurückgreifen.
Die for-select-Schleife
Etwas, das du in Go-Programmen immer wieder siehst, ist die for-select-Schleife. Sie ist nichts anderes als etwas wie das hier:
for
{
// Either loop infinitely or range over something
select
{
// Do some work with channels
}
}
Es gibt verschiedene Szenarien, in denen dieses Muster auftauchen kann.
- Iterationsvariablen auf einem Kanal aussenden
-
Oft möchtest du etwas, das iteriert werden kann, in Werte für einen Kanal umwandeln. Das ist nichts Besonderes und sieht normalerweise so aus:
for
_
,
s
:=
range
[]
string
{
"a"
,
"b"
,
"c"
}
{
select
{
case
<-
done
:
return
case
stringStream
<-
s
:
}
}
- Unendliche Schleifen, die darauf warten, gestoppt zu werden
-
Es ist sehr verbreitet, Goroutinen zu erstellen, die in einer Endlosschleife laufen, bis sie gestoppt werden. Es gibt mehrere Varianten davon. Welche du wählst, ist eine reine Frage des Stils.
Bei der ersten Variante wird die Anweisung
select
so kurz wie möglich gehalten:for
{
select
{
case
<-
done
:
return
default
:
}
// Do non-preemptable work
}
Wenn der Kanal
done
nicht geschlossen ist, verlassen wir die Anweisungselect
und fahren mit dem Rest derfor
Schleife fort.Die zweite Variante bettet die Arbeit in eine
default
Klausel derselect
Anweisung ein:for
{
select
{
case
<-
done
:
return
default
:
// Do non-preemptable work
}
}
Wenn wir die Anweisung
select
eingeben und der Kanaldone
noch nicht geschlossen wurde, führen wir stattdessen die Klauseldefault
aus.Mehr gibt es zu diesem Muster nicht zu sagen, aber es taucht überall auf und ist deshalb erwähnenswert.
Goroutine-Lecks verhindern
Wie wir im Abschnitt "Goroutinen" beschrieben haben , sind Goroutinen billig und einfach zu erstellen; das ist einer der Gründe, die Go zu einer so produktiven Sprache machen. Die Runtime kümmert sich um das Multiplexing der Goroutinen auf eine beliebige Anzahl von Betriebssystem-Threads, so dass wir uns über diese Abstraktionsebene nicht oft Gedanken machen müssen. Aber sie kosten Ressourcen, und Goroutinen werden nicht von der Laufzeitumgebung entsorgt. Unabhängig davon, wie klein ihr Speicherplatzbedarf ist, wollen wir sie nicht in unserem Prozess herumliegen lassen. Wie können wir also sicherstellen, dass sie aufgeräumt werden?
Fangen wir von vorne an und denken wir Schritt für Schritt darüber nach: Warum sollte es eine Goroutine geben? In Kapitel 2 haben wir festgestellt, dass Goroutinen Arbeitseinheiten darstellen, die parallel zueinander ablaufen können, aber nicht müssen. Die Goroutine hat mehrere Wege zur Beendigung:
-
Wenn sie ihre Arbeit beendet hat.
-
Wenn er seine Arbeit aufgrund eines nicht behebbaren Fehlers nicht fortsetzen kann.
-
Wenn man ihm sagt, dass er aufhören soll zu arbeiten.
Die ersten beiden Pfade sind kostenlos - das ist dein Algorithmus -, aber was ist mit der Arbeitsaufhebung? Das ist wegen des Netzwerkeffekts das Wichtigste: Wenn du eine Goroutine gestartet hast, arbeitet sie höchstwahrscheinlich mit mehreren anderen Goroutines in einer Art organisierter Weise zusammen. Wir könnten diese Verflechtung sogar als Graph darstellen: Ob eine untergeordnete Goroutine weiter ausgeführt werden soll oder nicht, kann von der Kenntnis des Zustands vieler anderer Goroutinen abhängen. Die übergeordnete Goroutine (oft die Hauptgoroutine), die über dieses vollständige Kontextwissen verfügt, sollte in der Lage sein, ihren untergeordneten Goroutinen mitzuteilen, dass sie die Ausführung beenden sollen. Im nächsten Kapitel werden wir uns weiter mit der Abhängigkeit von großen Goroutinen beschäftigen, aber jetzt wollen wir erst einmal überlegen, wie wir sicherstellen können, dass eine einzelne Child-Goroutine garantiert aufgeräumt wird. Beginnen wir mit einem einfachen Beispiel für ein Leck in einer Goroutine:
doWork
:=
func
(
strings
<-
chan
string
)
<-
chan
interface
{}
{
completed
:=
make
(
chan
interface
{})
go
func
()
{
defer
fmt
.
Println
(
"doWork exited."
)
defer
close
(
completed
)
for
s
:=
range
strings
{
// Do something interesting
fmt
.
Println
(
s
)
}
}()
return
completed
}
doWork
(
nil
)
// Perhaps more work is done here
fmt
.
Println
(
"Done."
)
Hier sehen wir, dass die Haupt-Goroutine einen Null-Kanal an doWork
übergibt. Daher werden in den Kanal strings
nie irgendwelche Strings geschrieben, und die Goroutine, die doWork
enthält, bleibt für die gesamte Dauer des Prozesses im Speicher (wir würden sogar in eine Sackgasse geraten, wenn wir die Goroutine innerhalb von doWork
und die Hauptgoroutine verbinden würden).
In diesem Beispiel ist die Lebensdauer des Prozesses sehr kurz, aber in einem realen Programm könnten Goroutinen leicht am Anfang eines langlebigen Programms gestartet werden. Im schlimmsten Fall könnte die Haupt-Goroutine während ihrer gesamten Lebensdauer weiterhin Goroutinen starten, was zu einer schleichenden Speicherauslastung führen würde.
Um dies zu verhindern, muss ein Signal zwischen der übergeordneten Goroutine und ihren Kindern eingerichtet werden, mit dem die übergeordnete Goroutine ihren Kindern den Abbruch signalisieren kann. In der Regel ist dieses Signal ein schreibgeschützter Kanal namens done
. Die Eltern-Goroutine gibt diesen Kanal an die Kind-Goroutine weiter und schließt den Kanal, wenn sie die Kind-Goroutine abbrechen will. Hier ist ein Beispiel:
doWork
:=
func
(
done
<-
chan
interface
{
}
,
strings
<-
chan
string
,
)
<-
chan
interface
{
}
{
terminated
:=
make
(
chan
interface
{
}
)
go
func
(
)
{
defer
fmt
.
Println
(
"doWork exited."
)
defer
close
(
terminated
)
for
{
select
{
case
s
:=
<-
strings
:
// Do something interesting
fmt
.
Println
(
s
)
case
<-
done
:
return
}
}
}
(
)
return
terminated
}
done
:=
make
(
chan
interface
{
}
)
terminated
:=
doWork
(
done
,
nil
)
go
func
(
)
{
// Cancel the operation after 1 second.
time
.
Sleep
(
1
*
time
.
Second
)
fmt
.
Println
(
"Canceling doWork goroutine..."
)
close
(
done
)
}
(
)
<-
terminated
fmt
.
Println
(
"Done."
)
Hier übergeben wir den Kanal
done
an die FunktiondoWork
. In der Regel ist dieser Kanal der erste Parameter.In dieser Zeile sehen wir, dass das allgegenwärtige for-select-Muster verwendet wird. Eine unserer Case-Anweisungen prüft, ob unser
done
Kanal signalisiert wurde. Ist dies der Fall, kehren wir aus der Goroutine zurück.Hier erstellen wir eine weitere Goroutine, die die Goroutine abbricht, die in
doWork
abbricht, wenn mehr als eine Sekunde vergeht.Hier verbinden wir die Goroutine, die von
doWork
gestartet wurde, mit der Hauptgoroutine.
Und das Ergebnis ist:
Canceling doWork goroutine... doWork exited. Done.
Du kannst sehen, dass unsere Goroutine trotz der Übergabe von nil
für unseren strings
Kanal erfolgreich beendet wird. Im Gegensatz zum vorherigen Beispiel werden in diesem Beispiel die beiden Goroutinen zusammengeführt, ohne dass es zu einem Deadlock kommt. Das liegt daran, dass wir, bevor wir die beiden Goroutinen verbinden, eine dritte Goroutine erstellen, um die Goroutine innerhalb von doWork
nach einer Sekunde abzubrechen. Wir haben unser Goroutine-Leck erfolgreich beseitigt!
Das vorige Beispiel behandelt den Fall, dass eine Goroutine einen Kanal empfängt, ganz gut. Aber was ist, wenn wir es mit der umgekehrten Situation zu tun haben: Eine Goroutine blockiert, wenn sie versucht, einen Wert in einen Kanal zu schreiben? Hier ist ein kurzes Beispiel, um das Problem zu verdeutlichen:
newRandStream
:=
func
(
)
<-
chan
int
{
randStream
:=
make
(
chan
int
)
go
func
(
)
{
defer
fmt
.
Println
(
"newRandStream closure exited."
)
defer
close
(
randStream
)
for
{
randStream
<-
rand
.
Int
(
)
}
}
(
)
return
randStream
}
randStream
:=
newRandStream
(
)
fmt
.
Println
(
"3 random ints:"
)
for
i
:=
1
;
i
<=
3
;
i
++
{
fmt
.
Printf
(
"%d: %d\n"
,
i
,
<-
randStream
)
}
Die Ausführung dieses Codes ergibt:
3 random ints: 1: 5577006791947779410 2: 8674665223082153551 3: 6129484611666145821
An der Ausgabe kannst du sehen, dass die Anweisung fmt.Println
nie ausgeführt wird. Nach der dritten Iteration unserer Schleife blockiert unsere Goroutine beim Versuch, die nächste zufällige Ganzzahl an einen Kanal zu senden, von dem nicht mehr gelesen wird. Wir haben keine Möglichkeit, dem Produzenten zu sagen, dass er aufhören kann. Die Lösung ist, genau wie im Fall des Empfängers, der Producer-Goroutine einen Kanal zu geben, der sie zum Beenden auffordert:
newRandStream
:=
func
(
done
<-
chan
interface
{})
<-
chan
int
{
randStream
:=
make
(
chan
int
)
go
func
()
{
defer
fmt
.
Println
(
"newRandStream closure exited."
)
defer
close
(
randStream
)
for
{
select
{
case
randStream
<-
rand
.
Int
():
case
<-
done
:
return
}
}
}()
return
randStream
}
done
:=
make
(
chan
interface
{})
randStream
:=
newRandStream
(
done
)
fmt
.
Println
(
"3 random ints:"
)
for
i
:=
1
;
i
<=
3
;
i
++
{
fmt
.
Printf
(
"%d: %d\n"
,
i
,
<-
randStream
)
}
close
(
done
)
// Simulate ongoing work
time
.
Sleep
(
1
*
time
.
Second
)
Dieser Code ergibt:
3 random ints: 1: 5577006791947779410 2: 8674665223082153551 3: 6129484611666145821 newRandStream closure exited.
Wir sehen jetzt, dass die Goroutine richtig aufgeräumt wird.
Da wir nun wissen, wie wir sicherstellen können, dass Goroutinen nicht auslaufen, können wir eine Konvention festlegen: Wenn eine Goroutine für die Erstellung einer Goroutine verantwortlich ist, ist sie auch dafür verantwortlich, dass sie die Goroutine stoppen kann.
Diese Konvention sorgt dafür, dass deine Programme zusammensetzbar sind und mitwachsen. Wir werden auf diese Technik und die Regeln in den Abschnitten "Pipelines" und "Das Kontextpaket" näher eingehen . Wie wir sicherstellen, dass Goroutinen gestoppt werden können, hängt von der Art und dem Zweck der Goroutine ab, aber alle bauen auf der Grundlage der Übergabe eines done
Kanals auf.
Der Oder-Kanal
Es kann vorkommen, dass du einen oder mehrere done
Kanäle zu einem einzigen done
Kanal zusammenfassen möchtest, der geschlossen wird, wenn einer seiner Komponenten-Kanäle geschlossen wird. Es ist durchaus akzeptabel, eine select
Anweisung zu schreiben, die diese Kopplung vornimmt, auch wenn sie sehr ausführlich ist. Manchmal kannst du jedoch die Anzahl der done
Kanäle, mit denen du arbeitest, zur Laufzeit nicht kennen. In diesem Fall oder wenn du einfach nur einen Einzeiler bevorzugst, kannst du diese Kanäle mit dem oder-Kanal-Muster kombinieren.
Dieses Muster erstellt einen zusammengesetzten done
Kanal durch Rekursion und Goroutinen. Schauen wir uns das mal an:
var
or
func
(
channels
...
<-
chan
interface
{
}
)
<-
chan
interface
{
}
or
=
func
(
channels
...
<-
chan
interface
{
}
)
<-
chan
interface
{
}
{
switch
len
(
channels
)
{
case
0
:
return
nil
case
1
:
return
channels
[
0
]
}
orDone
:=
make
(
chan
interface
{
}
)
go
func
(
)
{
defer
close
(
orDone
)
switch
len
(
channels
)
{
case
2
:
select
{
case
<-
channels
[
0
]
:
case
<-
channels
[
1
]
:
}
default
:
select
{
case
<-
channels
[
0
]
:
case
<-
channels
[
1
]
:
case
<-
channels
[
2
]
:
case
<-
or
(
append
(
channels
[
3
:
]
,
orDone
)
...
)
:
}
}
}
(
)
return
orDone
}
Hier haben wir unsere Funktion
or
, die einen variablen Teil der Kanäle aufnimmt und einen einzelnen Kanal zurückgibt.Da es sich um eine rekursive Funktion handelt, müssen wir Abbruchkriterien festlegen. Das erste ist, dass wir, wenn die variadische Scheibe leer ist, einfach einen Null-Kanal zurückgeben. Das entspricht der Idee, keine Kanäle zu übergeben; wir würden nicht erwarten, dass ein zusammengesetzter Kanal etwas tut.
Unser zweites Abbruchkriterium besagt, dass wir, wenn unsere variadische Scheibe nur ein Element enthält, nur dieses Element zurückgeben.
Das ist der Hauptteil der Funktion, in dem die Rekursion stattfindet. Wir erstellen eine Goroutine, damit wir auf Nachrichten in unseren Kanälen warten können, ohne zu blockieren.
Da wir rekursiv vorgehen, wird jeder rekursive Aufruf von
or
mindestens zwei Kanäle haben. Um die Anzahl der Goroutinen in Grenzen zu halten, haben wir hier einen Sonderfall für Aufrufe vonor
mit nur zwei Kanälen.Hier erstellen wir rekursiv einen Oder-Kanal aus allen Kanälen in unserem Slice nach dem dritten Index und wählen dann aus diesem aus. Durch diese Rekursionsbeziehung wird der Rest des Slice in Oder-Kanäle umstrukturiert, um einen Baum zu bilden, aus dem das erste Signal zurückkehrt. Wir geben auch den Kanal
orDone
ein, damit die Goroutinen, die den Baum oben verlassen, auch die Goroutinen unten im Baum verlassen.
Dies ist eine recht übersichtliche Funktion, mit der du eine beliebige Anzahl von Kanälen zu einem einzigen Kanal zusammenfassen kannst, der geschlossen wird, sobald einer seiner Komponenten-Kanäle geschlossen oder angeschrieben wird. Schauen wir uns einmal an, wie wir diese Funktion verwenden können. Hier ein kurzes Beispiel, bei dem Kanäle, die nach einer bestimmten Zeitspanne geschlossen werden, mit der Funktion or
zu einem einzigen Kanal zusammengefasst werden:
sig
:=
func
(
after
time
.
Duration
)
<-
chan
interface
{
}
{
c
:=
make
(
chan
interface
{
}
)
go
func
(
)
{
defer
close
(
c
)
time
.
Sleep
(
after
)
}
(
)
return
c
}
start
:=
time
.
Now
(
)
<-
or
(
sig
(
2
*
time
.
Hour
)
,
sig
(
5
*
time
.
Minute
)
,
sig
(
1
*
time
.
Second
)
,
sig
(
1
*
time
.
Hour
)
,
sig
(
1
*
time
.
Minute
)
,
)
fmt
.
Printf
(
"done after %v"
,
time
.
Since
(
start
)
)
Diese Funktion erstellt einfach einen Kanal, der geschlossen wird, wenn die in
after
angegebene Zeit verstrichen ist.Hier halten wir ungefähr fest, wann der Kanal von der Funktion
or
zu blockieren beginnt.Und hier drucken wir die Zeit aus, die für das Lesen benötigt wurde.
Wenn du dieses Programm ausführst, erhältst du:
done after 1.000216772s
Obwohl wir in unserem Aufruf an or
mehrere Kanäle platziert haben, die unterschiedlich lange brauchen, um sich zu schließen, führt unser Kanal, der sich nach einer Sekunde schließt, dazu, dass der gesamte Kanal, der durch den Aufruf an or
erstellt wurde, sich schließt. Das liegt daran, dass er trotz seiner Position im Baum, den die Funktion or
aufbaut, immer zuerst geschlossen wird und somit auch die Kanäle, die von seiner Schließung abhängen, geschlossen werden.
Wir erreichen diese Straffheit auf Kosten zusätzlicher Goroutinen - f(x)=⌊x/2⌋, wobei x
die Anzahl der Goroutinen ist - aber erinnere dich daran, dass eine der Stärken von Go die Fähigkeit ist, Goroutinen schnell zu erstellen, zu planen und auszuführen, und die Sprache fördert aktiv die Verwendung von Goroutinen, um Probleme korrekt zu modellieren. Sich über die Anzahl der erstellten Goroutinen Gedanken zu machen, ist wahrscheinlich eine verfrühte Optimierung. Wenn du zur Kompilierzeit nicht weißt, mit wie vielen done
Kanälen du arbeitest, gibt es auch keine andere Möglichkeit, done
Kanäle zu kombinieren.
Dieses Muster ist nützlich, wenn sich die Module in deinem System kreuzen. An diesen Schnittpunkten gibt es in der Regel mehrere Bedingungen für das Abbrechen von Goroutine-Bäumen in deinem Aufrufstapel. Mit der Funktion or
kannst du diese einfach zusammenfassen und den Stack weitergeben. In "Das Kontextpaket" werden wir uns eine andere Möglichkeit ansehen, die ebenfalls sehr schön und vielleicht etwas anschaulicher ist.
Wir werden uns auch ansehen, wie wir eine Variation dieses Musters verwenden können, um ein komplizierteres Muster in "Replicated Requests" zu bilden .
Fehlerbehandlung
In nebenläufigen Programmen kann es schwierig sein, die Fehlerbehandlung richtig hinzubekommen. Manchmal verbringen wir so viel Zeit damit, uns Gedanken darüber zu machen, wie unsere verschiedenen Prozesse Informationen austauschen und sich koordinieren, dass wir vergessen, wie sie mit fehlerhaften Zuständen umgehen sollen. Als Go das beliebte Ausnahmemodell für Fehler verwarf, machte es deutlich, dass die Fehlerbehandlung wichtig ist und dass wir bei der Entwicklung unserer Programme unseren Fehlerpfaden die gleiche Aufmerksamkeit schenken sollten wie unseren Algorithmen. In diesem Sinne wollen wir uns einmal ansehen, wie wir das bei der Arbeit mit mehreren gleichzeitigen Prozessen machen.
Die grundlegendste Frage beim Nachdenken über Fehlerbehandlung ist: "Wer sollte für die Behandlung des Fehlers verantwortlich sein?" Irgendwann muss das Programm aufhören, den Fehler auf dem Stapel nach oben zu befördern, und etwas mit ihm machen. Wer ist für diese Aufgabe verantwortlich?
Bei nebenläufigen Prozessen wird diese Frage ein wenig komplexer. Da ein nebenläufiger Prozess unabhängig von seinem Elternteil oder seinen Geschwistern arbeitet, kann es für ihn schwierig sein, herauszufinden, was das Richtige für den Fehler ist. Im folgenden Code findest du ein Beispiel für dieses Problem:
checkStatus
:=
func
(
done
<-
chan
interface
{
}
,
urls
...
string
,
)
<-
chan
*
http
.
Response
{
responses
:=
make
(
chan
*
http
.
Response
)
go
func
(
)
{
defer
close
(
responses
)
for
_
,
url
:=
range
urls
{
resp
,
err
:=
http
.
Get
(
url
)
if
err
!=
nil
{
fmt
.
Println
(
err
)
continue
}
select
{
case
<-
done
:
return
case
responses
<-
resp
:
}
}
}
(
)
return
responses
}
done
:=
make
(
chan
interface
{
}
)
defer
close
(
done
)
urls
:=
[
]
string
{
"https://www.google.com"
,
"https://badhost"
}
for
response
:=
range
checkStatus
(
done
,
urls
...
)
{
fmt
.
Printf
(
"Response: %v\n"
,
response
.
Status
)
}
Hier sehen wir, wie die Goroutine ihr Bestes tut, um zu signalisieren, dass ein Fehler aufgetreten ist. Was kann sie sonst tun? Sie kann ihn nicht zurückgeben! Wie viele Fehler sind zu viele? Macht sie weiter mit ihren Anfragen?
Die Ausführung dieses Codes ergibt:
Response: 200 OK Get https://badhost: dial tcp: lookup badhost on 127.0.1.1:53: no such host
Hier sehen wir, dass die Goroutine keine andere Wahl hat. Sie kann den Fehler nicht einfach schlucken und tut daher das einzig Vernünftige: Sie gibt den Fehler aus und hofft, dass jemand darauf achtet. Bringe deine Goroutinen nicht in diese missliche Lage. Ich schlage vor, dass du deine Sorgen trennst: Im Allgemeinen sollten deine nebenläufigen Prozesse ihre Fehler an einen anderen Teil deines Programms senden, der vollständige Informationen über den Zustand deines Programms hat und eine fundiertere Entscheidung darüber treffen kann, was zu tun ist. Das folgende Beispiel demonstriert eine korrekte Lösung für dieses Problem:
type
Result
struct
{
Error
error
Response
*
http
.
Response
}
checkStatus
:=
func
(
done
<-
chan
interface
{
}
,
urls
...
string
)
<-
chan
Result
{
results
:=
make
(
chan
Result
)
go
func
(
)
{
defer
close
(
results
)
for
_
,
url
:=
range
urls
{
var
result
Result
resp
,
err
:=
http
.
Get
(
url
)
result
=
Result
{
Error
:
err
,
Response
:
resp
}
select
{
case
<-
done
:
return
case
results
<-
result
:
}
}
}
(
)
return
results
}
done
:=
make
(
chan
interface
{
}
)
defer
close
(
done
)
urls
:=
[
]
string
{
"https://www.google.com"
,
"https://badhost"
}
for
result
:=
range
checkStatus
(
done
,
urls
...
)
{
if
result
.
Error
!=
nil
{
fmt
.
Printf
(
"error: %v"
,
result
.
Error
)
continue
}
fmt
.
Printf
(
"Response: %v\n"
,
result
.
Response
.
Status
)
}
Hier erstellen wir einen Typ, der sowohl die
*http.Response
als auch dieerror
umfasst, die bei einer Iteration der Schleife innerhalb unserer Goroutine möglich sind.Diese Zeile gibt einen Kanal zurück, der gelesen werden kann, um die Ergebnisse einer Iteration unserer Schleife abzurufen.
Hier erstellen wir eine
Result
Instanz mit den FeldernError
undResponse
.Hier schreiben wir die
Result
zu unserem Kanal.Hier, in unserer Hauptgoroutine, sind wir in der Lage, mit Fehlern umzugehen, die aus der von
checkStatus
gestarteten Goroutine kommen, und zwar auf intelligente Weise und mit dem vollen Kontext des größeren Programms.
Dieser Code ergibt:
Response: 200 OK error: Get https://badhost: dial tcp: lookup badhost on 127.0.1.1:53: no such host
Das Wichtigste ist, dass wir das mögliche Ergebnis mit dem möglichen Fehler gekoppelt haben. Dies repräsentiert die gesamte Menge möglicher Ergebnisse, die von der Goroutine checkStatus
erzeugt werden, und ermöglicht es unserer Hauptgoroutine, Entscheidungen darüber zu treffen, was zu tun ist, wenn Fehler auftreten. Im Großen und Ganzen haben wir die Belange der Fehlerbehandlung erfolgreich von unserer Produzenten-Goroutine getrennt. Das ist wünschenswert, weil die Goroutine, die die Erzeuger-Goroutine gestartet hat - in diesem Fall unsere Haupt-Goroutine - mehr Informationen über das laufende Programm hat und intelligentere Entscheidungen darüber treffen kann, was bei Fehlern zu tun ist.
Im vorherigen Beispiel haben wir Fehler einfach an stdio
geschrieben, aber wir könnten auch etwas anderes tun. Ändern wir unser Programm leicht ab, damit es nicht mehr versucht, den Status zu prüfen, wenn drei oder mehr Fehler auftreten:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
errCount
:=
0
urls
:=
[]
string
{
"a"
,
"https://www.google.com"
,
"b"
,
"c"
,
"d"
}
for
result
:=
range
checkStatus
(
done
,
urls
...
)
{
if
result
.
Error
!=
nil
{
fmt
.
Printf
(
"error: %v\n"
,
result
.
Error
)
errCount
++
if
errCount
>=
3
{
fmt
.
Println
(
"Too many errors, breaking!"
)
break
}
continue
}
fmt
.
Printf
(
"Response: %v\n"
,
result
.
Response
.
Status
)
}
Dieser Code erzeugt diese Ausgabe:
error: Get a: unsupported protocol scheme "" Response: 200 OK error: Get b: unsupported protocol scheme "" error: Get c: unsupported protocol scheme "" Too many errors, breaking!
Du kannst sehen, dass die Fehlerbehandlung dem bekannten Go-Muster folgt, da die Fehler von checkStatus
zurückgegeben und nicht intern in der Goroutine behandelt werden. Dies ist ein einfaches Beispiel, aber es ist nicht schwer, sich Situationen vorzustellen, in denen die Haupt-Goroutine die Ergebnisse mehrerer Goroutinen koordiniert und komplexere Regeln für das Fortsetzen oder Abbrechen von Unter-Goroutinen aufstellt. Die wichtigste Erkenntnis hier ist, dass Fehler als Bürger erster Klasse betrachtet werden sollten, wenn es darum geht, Werte zu konstruieren, die von Goroutinen zurückgegeben werden. Wenn deine Goroutine Fehler produzieren kann, sollten diese Fehler eng mit dem Ergebnistyp gekoppelt sein und über dieselben Kommunikationswege weitergegeben werden - genau wie bei normalen synchronen Funktionen.
Pipelines
Wenn du ein Programm schreibst, setzt du dich wahrscheinlich nicht hin und schreibst eine einzige lange Funktion - zumindest hoffe ich, dass du das nicht tust! Du konstruierst Abstraktionen in Form von Funktionen, Structs, Methoden usw. Warum tun wir das? Zum einen, um Details zu abstrahieren, die für den Gesamtablauf unwichtig sind, und zum anderen, damit wir an einem Bereich des Codes arbeiten können, ohne andere Bereiche zu beeinflussen. Hast du schon einmal ein System ändern müssen und festgestellt, dass du mehrere Bereiche anfassen musst, nur um eine logische Änderung vorzunehmen? Das könnte daran liegen, dass das System unter einer schlechten Abstraktion leidet.
Eine Pipeline ist nur ein weiteres Werkzeug, mit dem du eine Abstraktion in deinem System bilden kannst. Sie ist vor allem dann ein sehr leistungsfähiges Werkzeug, wenn dein Programm Datenströme oder -stapel verarbeiten muss. Das Wort Pipeline wurde vermutlich erstmals 1856 verwendet und bezog sich auf eine Reihe von Rohren, die Flüssigkeiten von einem Ort zum anderen transportierten. In der Informatik haben wir diesen Begriff übernommen, weil wir auch etwas von einem Ort zum anderen transportieren: Daten. Eine Pipeline ist nichts anderes als eine Reihe von Dingen, die Daten aufnehmen, sie verarbeiten und wieder abgeben. Wir nennen jeden dieser Vorgänge eine Stufe der Pipeline.
Mit einer Pipeline trennst du die Belange der einzelnen Stufen, was zahlreiche Vorteile mit sich bringt. Du kannst die einzelnen Stufen unabhängig voneinander ändern, du kannst die Kombination der Stufen mischen und anpassen, unabhängig davon, ob du die Stufen änderst, du kannst jede Stufe gleichzeitig mit vor- oder nachgelagerten Stufen bearbeiten und du kannst Teile deiner Pipeline auffächern oder die Rate begrenzen. Das Fan-Out wird im Abschnitt "Fan-Out, Fan-In" behandelt , die Ratenbegrenzung in Kapitel 5. Du musst dir jetzt keine Gedanken darüber machen, was diese Begriffe bedeuten; fangen wir einfach an und versuchen, eine Pipelinestufe zu konstruieren.
Wie bereits erwähnt, ist eine Stufe nur etwas, das Daten entgegennimmt, sie umwandelt und dann wieder zurückschickt. Hier ist eine Funktion, die man als Pipeline-Stufe bezeichnen könnte:
multiply
:=
func
(
values
[]
int
,
multiplier
int
)
[]
int
{
multipliedValues
:=
make
([]
int
,
len
(
values
))
for
i
,
v
:=
range
values
{
multipliedValues
[
i
]
=
v
*
multiplier
}
return
multipliedValues
}
Diese Funktion nimmt eine Reihe von Ganzzahlen mit einem Multiplikator auf, durchläuft sie in einer Schleife, multipliziert sie und gibt eine neue transformierte Reihe zurück. Sieht nach einer langweiligen Funktion aus, oder? Lass uns eine weitere Stufe erstellen:
add
:=
func
(
values
[]
int
,
additive
int
)
[]
int
{
addedValues
:=
make
([]
int
,
len
(
values
))
for
i
,
v
:=
range
values
{
addedValues
[
i
]
=
v
+
additive
}
return
addedValues
}
Eine weitere langweilige Funktion! Diese Funktion erstellt einfach ein neues Slice und fügt jedem Element einen Wert hinzu. An dieser Stelle fragst du dich vielleicht, was diese beiden Funktionen zu Pipeline-Stufen und nicht nur zu Funktionen macht. Lass uns versuchen, sie zu kombinieren:
ints
:=
[]
int
{
1
,
2
,
3
,
4
}
for
_
,
v
:=
range
add
(
multiply
(
ints
,
2
),
1
)
{
fmt
.
Println
(
v
)
}
Dieser Code ergibt:
3 5 7 9
Schau dir an, wie wir add
und multiply
in der Klausel range
kombinieren. Das sind Funktionen wie die, mit denen du jeden Tag arbeitest, aber weil wir sie so konstruiert haben, dass sie die Eigenschaften einer Pipelinestufe haben, können wir sie zu einer Pipeline kombinieren. Das ist interessant: Was sind die Eigenschaften einer Pipelinestufe?
-
Eine Stufe verbraucht und liefert den gleichen Typ.
-
Eine Bühne muss verdinglicht werden2 durch die Sprache verifiziert werden, damit sie weitergegeben werden kann. Funktionen in Go sind verdinglicht und eignen sich gut für diesen Zweck.
Diejenigen unter euch, die mit funktionaler Programmierung vertraut sind, werden vielleicht mit dem Kopf nicken und an Begriffe wie Funktionen höherer Ordnung und Monaden denken. In der Tat sind Pipeline-Stufen sehr eng mit der funktionalen Programmierung verwandt und können als eine Untermenge der Monaden betrachtet werden. Ich werde hier nicht explizit auf Monaden oder funktionale Programmierung eingehen, aber beides sind interessante Themen für sich und Kenntnisse über beide Themen sind nützlich, wenn auch unnötig, wenn es darum geht, Pipelines zu verstehen.
Hier erfüllen unsere Stufen add
und multiply
alle Eigenschaften einer Pipeline-Stufe: Sie verbrauchen beide ein Slice von int
und geben ein Slice von int
zurück, und da Go verifizierte Funktionen hat, können wir add
und multiple
weitergeben. Diese Eigenschaften führen zu den interessanten Eigenschaften von Pipeline-Stufen, die wir bereits erwähnt haben: Es ist nämlich sehr einfach, unsere Stufen auf einer höheren Ebene zu kombinieren, ohne die Stufen selbst zu verändern.
Wenn wir zum Beispiel eine zusätzliche Stufe zu unserer Pipeline hinzufügen möchten, um mit zwei zu multiplizieren, würden wir unsere bisherige Pipeline einfach in eine neue multiply
Stufe einpacken, etwa so:
ints
:=
[]
int
{
1
,
2
,
3
,
4
}
for
_
,
v
:=
range
multiply
(
add
(
multiply
(
ints
,
2
),
1
),
2
)
{
fmt
.
Println
(
v
)
}
Die Ausführung dieses Codes ergibt:
6 10 14 18
Beachte, dass wir das geschafft haben, ohne eine neue Funktion zu schreiben, eine der bestehenden Funktionen zu ändern oder zu modifizieren, was wir mit dem Ergebnis unserer Pipeline machen. Vielleicht erkennst du jetzt die Vorteile des Pipeline-Musters. Natürlich könnten wir diesen Code auch prozedural schreiben:
ints
:=
[]
int
{
1
,
2
,
3
,
4
}
for
_
,
v
:=
range
ints
{
fmt
.
Println
(
2
*
(
v
*
2
+
1
))
}
Auf den ersten Blick sieht das viel einfacher aus, aber wie du im weiteren Verlauf sehen wirst, bietet der prozedurale Code nicht dieselben Vorteile, die eine Pipeline beim Umgang mit Datenströmen bietet.
Hast du bemerkt, dass jeder Schritt einen Teil der Daten aufnimmt und einen Teil der Daten zurückgibt? Diese Stufen führen etwas durch, das wir Stapelverarbeitung nennen. Das bedeutet, dass sie Datenpakete auf einmal verarbeiten und nicht einen einzelnen Wert nach dem anderen. Es gibt noch eine andere Art von Pipeline-Stufe, die eine Stream-Verarbeitung durchführt. Das bedeutet, dass die Stufe ein Element nach dem anderen empfängt und ausgibt.
Es gibt Vor- und Nachteile der Stapelverarbeitung gegenüber der Stream-Verarbeitung, die wir gleich noch besprechen werden. Damit die ursprünglichen Daten unverändert bleiben, muss jede Stufe ein neues Slice gleicher Länge erstellen, um die Ergebnisse ihrer Berechnungen zu speichern. Das bedeutet, dass der Speicherbedarf unseres Programms zu jedem Zeitpunkt doppelt so groß ist wie das Slice, das wir an den Anfang unserer Pipeline schicken. Konvertieren wir unsere Stages in einen Stream und sehen wir uns an, wie das aussieht:
multiply
:=
func
(
value
,
multiplier
int
)
int
{
return
value
*
multiplier
}
add
:=
func
(
value
,
additive
int
)
int
{
return
value
+
additive
}
ints
:=
[]
int
{
1
,
2
,
3
,
4
}
for
_
,
v
:=
range
ints
{
fmt
.
Println
(
multiply
(
add
(
multiply
(
v
,
2
),
1
),
2
))
}
Dieser Code ergibt:
6 10 14 18
Jede Stufe empfängt und sendet einen diskreten Wert, und der Speicherbedarf unseres Programms ist nur noch so groß wie die Eingabe der Pipeline. Aber wir mussten die Pipeline in den Hauptteil der for
Schleife ziehen und die range
die schwere Arbeit des Fütterns unserer Pipeline machen lassen. Das schränkt nicht nur die Wiederverwendung der Pipeline ein, sondern auch die Skalierbarkeit, wie wir später in diesem Abschnitt sehen werden. Wir haben auch noch andere Probleme. Wir instanziieren unsere Pipeline für jede Iteration der Schleife. Obwohl es billig ist, Funktionsaufrufe zu machen, machen wir drei Funktionsaufrufe für jede Iteration der Schleife. Und was ist mit der Gleichzeitigkeit? Ich habe vorhin gesagt, dass einer der Vorteile von Pipelines die Möglichkeit ist, einzelne Phasen gleichzeitig zu verarbeiten, und ich habe etwas über Fan-out gesagt. Wo kommt das alles ins Spiel?
Ich könnte unsere Funktionen multiply
und add
wahrscheinlich noch ein wenig erweitern, um diese Konzepte vorzustellen, aber sie haben ihre Aufgabe erfüllt, das Konzept einer Pipeline einzuführen. Jetzt ist es an der Zeit, die bewährten Methoden für den Aufbau von Pipelines in Go kennenzulernen, und das beginnt mit dem Kanalprimitiv von Go.
Bewährte Methoden für den Bau von Pipelines
Kanäle eignen sich hervorragend für den Aufbau von Pipelines in Go, weil sie alle unsere grundlegenden Anforderungen erfüllen. Sie können Werte empfangen und ausgeben, sie können sicher gleichzeitig verwendet werden, sie können in einem Bereich angeordnet werden und sie sind in der Sprache verankert. Nehmen wir uns einen Moment Zeit und konvertieren das vorherige Beispiel, um stattdessen Kanäle zu verwenden:
generator
:=
func
(
done
<-
chan
interface
{},
integers
...
int
)
<-
chan
int
{
intStream
:=
make
(
chan
int
)
go
func
()
{
defer
close
(
intStream
)
for
_
,
i
:=
range
integers
{
select
{
case
<-
done
:
return
case
intStream
<-
i
:
}
}
}()
return
intStream
}
multiply
:=
func
(
done
<-
chan
interface
{},
intStream
<-
chan
int
,
multiplier
int
,
)
<-
chan
int
{
multipliedStream
:=
make
(
chan
int
)
go
func
()
{
defer
close
(
multipliedStream
)
for
i
:=
range
intStream
{
select
{
case
<-
done
:
return
case
multipliedStream
<-
i
*
multiplier
:
}
}
}()
return
multipliedStream
}
add
:=
func
(
done
<-
chan
interface
{},
intStream
<-
chan
int
,
additive
int
,
)
<-
chan
int
{
addedStream
:=
make
(
chan
int
)
go
func
()
{
defer
close
(
addedStream
)
for
i
:=
range
intStream
{
select
{
case
<-
done
:
return
case
addedStream
<-
i
+
additive
:
}
}
}()
return
addedStream
}
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
intStream
:=
generator
(
done
,
1
,
2
,
3
,
4
)
pipeline
:=
multiply
(
done
,
add
(
done
,
multiply
(
done
,
intStream
,
2
),
1
),
2
)
for
v
:=
range
pipeline
{
fmt
.
Println
(
v
)
}
Dieser Code ergibt:
6 10 14 18
Es sieht so aus, als ob wir das gewünschte Ergebnis erreicht haben, aber um den Preis, dass wir viel mehr Code haben. Was genau haben wir gewonnen? Schauen wir uns zunächst an, was wir geschrieben haben. Wir haben jetzt drei Funktionen statt zwei. Sie sehen alle so aus, als würden sie eine Goroutine in ihrem Körper starten und das Muster verwenden, das wir in "Goroutine-Lecks verhindern" festgelegt haben: Sie nehmen einen Kanal auf, um zu signalisieren, dass die Goroutine beendet werden soll. Sie sehen alle so aus, als würden sie Kanäle zurückgeben, und einige von ihnen sehen so aus, als würden sie auch einen zusätzlichen Kanal aufnehmen. Interessant! Fangen wir an, das weiter aufzuschlüsseln:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
Das erste, was unser Programm macht, ist, einen done
Kanal zu erstellen und close
in einer defer
Anweisung aufzurufen. Wie bereits erwähnt, stellt dies sicher, dass unser Programm sauber beendet wird und keine Goroutine ausläuft. Das ist nichts Neues. Als Nächstes sehen wir uns die Funktion an, generator
:
generator
:=
func
(
done
<-
chan
interface
{},
integers
...
int
)
<-
chan
int
{
intStream
:=
make
(
chan
int
)
go
func
()
{
defer
close
(
intStream
)
for
_
,
i
:=
range
integers
{
select
{
case
<-
done
:
return
case
intStream
<-
i
:
}
}
}()
return
intStream
}
// ...
intStream
:=
generator
(
done
,
1
,
2
,
3
,
4
)
Die Funktion generator
nimmt einen variadischen Slice von Ganzzahlen entgegen, erstellt einen gepufferten Kanal von Ganzzahlen mit einer Länge, die dem eingehenden Integer-Slice entspricht, startet eine Goroutine und gibt den erstellten Kanal zurück. Dann durchläuft generator
in der erstellten Goroutine die variadische Scheibe, die übergeben wurde, und sendet die Werte der Scheiben an den erstellten Kanal.
Beachte, dass das Senden auf dem Kanal eine select
Anweisung mit einer Auswahl auf dem done
Kanal teilt. Auch dies ist das Muster, das wir in "Goroutine-Lecks verhindern" eingeführt haben, um undichte Goroutinen zu verhindern.
Kurz gesagt: Die Funktion generator
wandelt eine diskrete Menge von Werten in einen Datenstrom auf einem Kanal um. Passenderweise nennt man diese Art von Funktion einen Generator. Du wirst diese Funktion häufig bei der Arbeit mit Pipelines sehen, denn am Anfang der Pipeline gibt es immer eine Reihe von Daten, die du in einen Kanal umwandeln musst. Wir werden gleich ein paar Beispiele für lustige Generatoren vorstellen, aber lass uns zuerst unsere Analyse dieses Programms beenden. Als Nächstes bauen wir unsere Pipeline auf:
pipeline
:=
multiply
(
done
,
add
(
done
,
multiply
(
done
,
intStream
,
2
),
1
),
2
)
Es ist dieselbe Pipeline, mit der wir die ganze Zeit gearbeitet haben: Für einen Zahlenstrom multiplizieren wir sie mit zwei, addieren eine und multiplizieren dann das Ergebnis mit zwei. Diese Pipeline ähnelt unserer Pipeline mit den Funktionen aus dem vorherigen Beispiel, aber sie unterscheidet sich in wichtigen Punkten.
Erstens: Wir verwenden Kanäle. Das ist offensichtlich, aber wichtig, weil es zwei Dinge ermöglicht: Am Ende unserer Pipeline können wir eine Bereichsanweisung verwenden, um die Werte zu extrahieren, und in jeder Phase können wir sicher nebeneinander ausgeführt werden, weil unsere Eingaben und Ausgaben sicher in gleichzeitigen Kontexten sind.
Das bringt uns zu unserem zweiten Unterschied: Jede Stufe der Pipeline wird gleichzeitig ausgeführt. Das bedeutet, dass jede Stufe nur auf ihre Eingaben warten muss und ihre Ausgaben senden kann. Das hat weitreichende Folgen, wie wir im Abschnitt "Fan-Out, Fan-In" feststellen werden , aber für den Moment können wir einfach festhalten, dass unsere Stufen dadurch für eine gewisse Zeitspanne unabhängig voneinander ausgeführt werden können.
In unserem Beispiel gehen wir über diese Pipeline und die Werte werden durch das System gezogen:
for
v
:=
range
pipeline
{
fmt
.
Println
(
v
)
}
Die folgende Tabelle zeigt, wie die einzelnen Werte im System in die einzelnen Kanäle gelangen und wann die Kanäle geschlossen werden. Die Iteration ist der Basisnullpunkt der Iteration der for
Schleife, in der wir uns befinden, und der Wert für jede Spalte ist der Wert, der in die Pipeline-Stufe gelangt:
Iteration | Generator | Multiplizieren | hinzufügen | Multiplizieren | Wert |
---|---|---|---|---|---|
0 |
1 |
||||
0 |
1 |
||||
0 |
2 |
2 |
|||
0 |
2 |
3 |
|||
0 |
3 |
4 |
6 |
||
1 |
3 |
5 |
|||
1 |
4 |
6 |
10 |
||
2 |
(geschlossen) |
4 |
7 |
||
2 |
(geschlossen) |
8 |
14 |
||
3 |
(geschlossen) |
9 |
|||
3 |
(geschlossen) |
18 |
Schauen wir uns auch genauer an, wie wir das Muster verwenden, um Goroutinen das Beenden zu signalisieren. Wenn wir es mit mehreren voneinander abhängigen Goroutinen zu tun haben, wie funktioniert dann dieses Muster? Was würde passieren, wenn wir close
auf dem Kanal done
aufrufen würden, bevor das Programm fertig ausgeführt wurde?
Um diese Fragen zu beantworten, lass uns noch einmal einen Blick auf unsere Pipelinekonstruktion werfen:
pipeline
:=
multiply
(
done
,
add
(
done
,
multiply
(
done
,
intStream
,
2
),
1
),
2
)
Die Stufen sind auf zwei Arten miteinander verbunden: durch den gemeinsamen Kanal done
und durch die Kanäle, die an die nachfolgenden Stufen der Pipeline weitergegeben werden. Mit anderen Worten: Der Kanal, der von der Funktion multiply
erzeugt wird, wird an die Funktion add
weitergegeben und so weiter. Schauen wir uns die vorangegangene Tabelle noch einmal an. Bevor wir sie abschließen, rufen wir close
auf dem Kanal done
auf und sehen, was passiert:
Iteration | Generator | Multiplizieren | hinzufügen | Multiplizieren | Wert |
---|---|---|---|---|---|
0 |
1 |
||||
0 |
1 |
||||
0 |
2 |
2 |
|||
0 |
2 |
3 |
|||
1 |
3 |
4 |
6 |
||
close(done) |
(geschlossen) |
3 |
5 |
||
(geschlossen) |
6 |
||||
(geschlossen) |
7 |
||||
(geschlossen) |
|||||
(Ausgangsbereich) |
Siehst du, wie sich das Schließen des done
Kanals kaskadenartig durch die Pipeline zieht? Dies wird durch zwei Dinge in jeder Phase der Pipeline ermöglicht:
-
Ranging über den Eingangskanal. Wenn der Eingangskanal geschlossen wird, wird die Reichweite beendet.
-
Das Senden teilt eine
select
Anweisung mit demdone
Kanal.
Unabhängig davon, in welchem Zustand sich die Pipeline-Stufe befindet - das Warten auf den eingehenden Kanal oder das Warten auf das Senden - wird das Schließen des done
Kanals die Pipeline-Stufe zur Beendigung zwingen.
Hier ist eine Rekursionsbeziehung im Spiel. Zu Beginn der Pipeline haben wir festgestellt, dass wir diskrete Werte in einen Kanal umwandeln müssen. In diesem Prozess gibt es zwei Punkte, die vorweggenommen werden können:
-
Die Erstellung des diskreten Wertes erfolgt nicht sofort.
-
Senden des diskreten Wertes auf seinem Kanal.
Die erste ist dir überlassen. In unserem Beispiel werden die diskreten Werte in der Funktion generator
durch das Durchlaufen des variadischen Slice erzeugt, was so schnell ist, dass es nicht unterbrechbar sein muss. Die zweite wird über unsere select
Anweisung und den done
Kanal gehandhabt, der sicherstellt, dass generator
auch dann preemptable ist, wenn er beim Versuch, auf intStream
zu schreiben, blockiert wird.
Am anderen Ende der Pipeline wird die letzte Stufe durch Induktion als preemptable eingestuft. Sie ist preemptable, weil der Kanal, über den wir uns bewegen, geschlossen wird, wenn er preempted ist, und deshalb wird unser Bereich unterbrochen, wenn das passiert. Die letzte Phase ist preemptable, weil der Stream, auf den wir uns verlassen, preemptable ist.
Zwischen dem Beginn der Pipeline und dem Ende der Pipeline wird der Code immer über einen Kanal und auf einem anderen Kanal innerhalb einer select
Anweisung mit einem done
Kanal gesendet.
Wenn eine Stufe beim Abrufen eines Wertes aus dem Eingangskanal blockiert ist, wird sie entsperrt, wenn der Kanal geschlossen wird. Wir wissen durch Induktion, dass der Kanal geschlossen wird, weil es sich entweder um eine Stufe handelt, die wie die Stufe, in der wir uns befinden, geschrieben wurde, oder weil der Anfang der Pipeline, die wir eingerichtet haben, preemptable ist. Wenn eine Stufe beim Senden eines Wertes blockiert ist, ist sie dank der Anweisung select
preemptable.
Unsere gesamte Pipeline kann also immer durch das Schließen des done
Kanals vorzeitig beendet werden. Cool, oder?
Einige praktische Generatoren
Ich habe vorhin versprochen, dass ich über einige lustige Generatoren sprechen werde, die sehr nützlich sein können. Zur Erinnerung: Ein Generator für eine Pipeline ist eine Funktion, die eine Reihe von diskreten Werten in einen Strom von Werten auf einem Kanal umwandelt. Schauen wir uns einen Generator namens repeat
an:
repeat
:=
func
(
done
<-
chan
interface
{},
values
...
interface
{},
)
<-
chan
interface
{}
{
valueStream
:=
make
(
chan
interface
{})
go
func
()
{
defer
close
(
valueStream
)
for
{
for
_
,
v
:=
range
values
{
select
{
case
<-
done
:
return
case
valueStream
<-
v
:
}
}
}
}()
return
valueStream
}
Diese Funktion wiederholt die Werte, die du ihr übergibst, unendlich oft, bis du ihr sagst, dass sie aufhören soll. Werfen wir einen Blick auf eine weitere generische Pipeline-Stufe, die in Kombination mit repeat
hilfreich ist, take
:
take
:=
func
(
done
<-
chan
interface
{},
valueStream
<-
chan
interface
{},
num
int
,
)
<-
chan
interface
{}
{
takeStream
:=
make
(
chan
interface
{})
go
func
()
{
defer
close
(
takeStream
)
for
i
:=
0
;
i
<
num
;
i
++
{
select
{
case
<-
done
:
return
case
takeStream
<-
<-
valueStream
:
}
}
}()
return
takeStream
}
Diese Pipeline-Phase nimmt nur die ersten num
Elemente aus ihrem Eingang valueStream
und verlässt sie dann. Zusammen können die beiden sehr leistungsfähig sein:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
for
num
:=
range
take
(
done
,
repeat
(
done
,
1
),
10
)
{
fmt
.
Printf
(
"%v "
,
num
)
}
Die Ausführung dieses Codes ergibt:
1 1 1 1 1 1 1 1 1 1
In diesem einfachen Beispiel erstellen wir einen repeat
Generator, der eine unendliche Anzahl von Einsen erzeugt, aber nur die ersten 10 nimmt. Da das Senden des repeat
Generators das Empfangen der take
Stufe blockiert, ist der repeat
Generator sehr effizient. Obwohl wir die Möglichkeit haben, einen unendlichen Strom von Einsen zu erzeugen, erzeugen wir nur N+1
Instanzen, bei denen N
die Zahl ist, die wir an die take
Stufe übergeben.
Wir können das weiter ausbauen. Erstellen wir einen weiteren sich wiederholenden Generator, aber dieses Mal einen, der wiederholt eine Funktion aufruft. Nennen wir ihn repeatFn
:
repeatFn
:=
func
(
done
<-
chan
interface
{},
fn
func
()
interface
{},
)
<-
chan
interface
{}
{
valueStream
:=
make
(
chan
interface
{})
go
func
()
{
defer
close
(
valueStream
)
for
{
select
{
case
<-
done
:
return
case
valueStream
<-
fn
():
}
}
}()
return
valueStream
}
Benutzen wir sie, um 10 Zufallszahlen zu generieren:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
rand
:=
func
()
interface
{}
{
return
rand
.
Int
()}
for
num
:=
range
take
(
done
,
repeatFn
(
done
,
rand
),
10
)
{
fmt
.
Println
(
num
)
}
Das ergibt:
5577006791947779410 8674665223082153551 6129484611666145821 4037200794235010051 3916589616287113937 6334824724549167320 605394647632969758 1443635317331776148 894385949183117216 2775422040480279449
Das ist ziemlich cool - ein unendlicher Kanal mit zufälligen ganzen Zahlen, die nach Bedarf generiert werden!
Du fragst dich vielleicht, warum all diese Generatoren und Stufen auf den Kanälen von interface{}
empfangen und senden. Wir hätten diese Funktionen genauso gut für einen bestimmten Typ schreiben können oder einen Go-Generator.
Leere Schnittstellen sind in Go ein bisschen tabu, aber für Pipeline-Stufen ist es meiner Meinung nach in Ordnung, in Kanälen von interface{}
zu arbeiten, damit du eine Standardbibliothek von Pipeline-Mustern verwenden kannst. Wie wir bereits besprochen haben, kommt ein großer Teil des Nutzens einer Pipeline von wiederverwendbaren Stufen. Dies ist am besten möglich, wenn die Stufen auf der für sie geeigneten Ebene arbeiten. Bei den Generatoren repeat
und repeatFn
geht es darum, einen Datenstrom zu erzeugen, indem man eine Liste oder einen Operator in einer Schleife durchläuft. Bei der Stufe take
geht es darum, unsere Pipeline zu begrenzen. Keine dieser Operationen erfordert Informationen über die Typen, mit denen sie arbeiten, sondern nur die Kenntnis der Arität ihrer Parameter.
Wenn du mit bestimmten Typen arbeiten musst, kannst du eine Stufe einfügen, die die Typüberprüfung für dich durchführt. Der Leistungsaufwand für eine zusätzliche Pipeline-Stufe (und damit Goroutine) und die Typüberprüfung ist vernachlässigbar, wie wir gleich sehen werden. Hier ist ein kleines Beispiel, das eine toString
Pipeline-Stufe einführt:
toString
:=
func
(
done
<-
chan
interface
{},
valueStream
<-
chan
interface
{},
)
<-
chan
string
{
stringStream
:=
make
(
chan
string
)
go
func
()
{
defer
close
(
stringStream
)
for
v
:=
range
valueStream
{
select
{
case
<-
done
:
return
case
stringStream
<-
v
.(
string
):
}
}
}()
return
stringStream
}
Und ein Beispiel, wie du es verwenden kannst:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
var
message
string
for
token
:=
range
toString
(
done
,
take
(
done
,
repeat
(
done
,
"I"
,
"am."
),
5
))
{
message
+=
token
}
fmt
.
Printf
(
"message: %s..."
,
message
)
Dieser Code ergibt:
message: Iam.Iam.I...
Wir wollen uns also selbst beweisen, dass der Leistungsverlust durch die Generierung von Teilen unserer Pipeline vernachlässigbar ist. Wir schreiben zwei Benchmarking-Funktionen: eine, um die generischen Phasen zu testen, und eine, um die typspezifischen Phasen zu testen:
func
BenchmarkGeneric
(
b
*
testing
.
B
)
{
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
b
.
ResetTimer
()
for
range
toString
(
done
,
take
(
done
,
repeat
(
done
,
"a"
),
b
.
N
))
{
}
}
func
BenchmarkTyped
(
b
*
testing
.
B
)
{
repeat
:=
func
(
done
<-
chan
interface
{},
values
...
string
)
<-
chan
string
{
valueStream
:=
make
(
chan
string
)
go
func
()
{
defer
close
(
valueStream
)
for
{
for
_
,
v
:=
range
values
{
select
{
case
<-
done
:
return
case
valueStream
<-
v
:
}
}
}
}()
return
valueStream
}
take
:=
func
(
done
<-
chan
interface
{},
valueStream
<-
chan
string
,
num
int
,
)
<-
chan
string
{
takeStream
:=
make
(
chan
string
)
go
func
()
{
defer
close
(
takeStream
)
for
i
:=
num
;
i
>
0
||
i
==
-
1
;
{
if
i
!=
-
1
{
i
--
}
select
{
case
<-
done
:
return
case
takeStream
<-
<-
valueStream
:
}
}
}()
return
takeStream
}
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
b
.
ResetTimer
()
for
range
take
(
done
,
repeat
(
done
,
"a"
),
b
.
N
)
{
}
}
Die Ergebnisse der Ausführung dieses Codes sind:
BenchmarkGeneric-4 |
1000000 |
2266 |
ns/op |
BenchmarkTyped-4 |
1000000 |
1181 |
ns/op |
PASS |
|||
ok |
Befehlszeilen-Argumente |
3.486s |
Du kannst sehen, dass die typspezifischen Stufen doppelt so schnell sind, aber nur geringfügig schneller. In der Regel ist der begrenzende Faktor in deiner Pipeline entweder dein Generator oder eine der rechenintensiven Phasen. Wenn der Generator keinen Stream aus dem Speicher erzeugt, wie es bei den Generatoren repeat
und repeatFn
der Fall ist, bist du wahrscheinlich an die E/A-Grenze gestoßen. Das Lesen von der Festplatte oder aus dem Netzwerk wird den hier gezeigten geringen Leistungszuwachs wahrscheinlich in den Schatten stellen.
Wenn eine deiner Stufen sehr rechenintensiv ist, wird dies den Performance-Overhead sicherlich in den Schatten stellen. Wenn dir diese Technik immer noch nicht zusagt, kannst du auch einen Go-Generator für die Erstellung deiner Generatorstufen schreiben. Apropos rechenintensiv: Wie können wir das abmildern? Wird dadurch nicht die gesamte Pipeline verlangsamt?
Wie du das verhindern kannst, erfährst du in der Fan-out, Fan-in Technik.
Fan-Out, Fan-In
Du hast also eine Pipeline eingerichtet. Die Daten fließen wunderbar durch dein System und wandeln sich auf ihrem Weg durch die miteinander verknüpften Phasen. Es ist wie ein schöner Strom, ein schöner, langsamer Strom, und oh mein Gott, warum dauert das so lange?
Manchmal sind die Schritte in deiner Pipeline besonders rechenintensiv. In diesem Fall können vorgelagerte Phasen in deiner Pipeline blockiert werden, während du auf die Fertigstellung deiner teuren Phasen wartest. Und nicht nur das, auch die Pipeline selbst kann sehr lange brauchen, um als Ganzes ausgeführt zu werden. Wie können wir das Problem lösen?
Eine der interessanten Eigenschaften von Pipelines ist die Möglichkeit, den Datenstrom mit einer Kombination aus separaten, oft neu anordbaren Stufen zu bearbeiten. Du kannst die Stufen der Pipeline sogar mehrfach verwenden. Wäre es nicht interessant, eine einzelne Stufe unserer Pipeline in mehreren Goroutinen wiederzuverwenden, um die Abrufe aus einer vorgelagerten Stufe zu parallelisieren? Vielleicht würde das die Leistung der Pipeline verbessern.
Tatsächlich ist das möglich und dieses Muster hat einen Namen: fan-out, fan-in.
Fan-out ist ein Begriff, der den Prozess beschreibt, bei dem mehrere Goroutinen gestartet werden, um Eingaben aus der Pipeline zu verarbeiten, und Fan-in ist ein Begriff, der den Prozess beschreibt, bei dem mehrere Ergebnisse in einem Kanal kombiniert werden.
Was macht also eine Stufe einer Pipeline geeignet, um dieses Muster zu nutzen? Du könntest in Erwägung ziehen, eine deiner Phasen aufzufächern, wenn beide der folgenden Punkte zutreffen:
-
Sie verlässt sich nicht auf Werte, die die Bühne vorher berechnet hat.
-
Es dauert sehr lange, bis es läuft.
Die Eigenschaft der Ordnungsunabhängigkeit ist wichtig, weil du keine Garantie dafür hast, in welcher Reihenfolge die gleichzeitigen Kopien deiner Stufe ausgeführt werden und in welcher Reihenfolge sie zurückkehren werden.
Werfen wir einen Blick auf ein Beispiel. Im folgenden Beispiel habe ich eine sehr ineffiziente Methode entwickelt, um Primzahlen zu finden. Wir werden viele der Schritte verwenden, die wir in "Pipelines" erstellt haben :
rand
:=
func
()
interface
{}
{
return
rand
.
Intn
(
50000000
)
}
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
start
:=
time
.
Now
()
randIntStream
:=
toInt
(
done
,
repeatFn
(
done
,
rand
))
fmt
.
Println
(
"Primes:"
)
for
prime
:=
range
take
(
done
,
primeFinder
(
done
,
randIntStream
),
10
)
{
fmt
.
Printf
(
"\t%d\n"
,
prime
)
}
fmt
.
Printf
(
"Search took: %v"
,
time
.
Since
(
start
))
Hier sind die Ergebnisse der Ausführung dieses Codes:
Primes: 24941317 36122539 6410693 10128161 25511527 2107939 14004383 7190363 45931967 2393161 Search took: 23.437511647s
Wir generieren einen Strom von Zufallszahlen, der auf 50.000.000 begrenzt ist, wandeln ihn in einen Integer-Stream um und geben ihn dann an unsere primeFinder
weiter. primeFinder
versucht naiv, die im Eingabestrom angegebene Zahl durch jede Zahl darunter zu teilen. Wenn das nicht gelingt, wird der Wert an die nächste Stufe weitergegeben. Das ist natürlich eine schreckliche Methode, um Primzahlen zu finden, aber sie erfüllt unsere Anforderung, dass es sehr lange dauert.
In unserer for
Schleife durchsuchen wir die gefundenen Primzahlen, geben sie nach und nach aus und - dank unserer take
Stufe - schließen wir die Pipeline, nachdem 10 Primzahlen gefunden wurden. Dann geben wir aus, wie lange die Suche gedauert hat, und der done
Kanal wird durch eine defer
Anweisung geschlossen und die Pipeline abgebaut.
Um Duplikate in unseren Ergebnissen zu vermeiden, könnten wir eine weitere Stufe in unserer Pipeline einführen, um die Primzahlen, die in einer Menge gefunden wurden, zwischenzuspeichern, aber der Einfachheit halber ignorieren wir diese einfach.
Du kannst sehen, dass es ungefähr 23 Sekunden gedauert hat, um 10 Primzahlen zu finden. Nicht gut. Normalerweise würden wir uns zuerst den Algorithmus selbst ansehen, vielleicht ein Algorithmus-Kochbuch zur Hand nehmen und schauen, ob wir die einzelnen Schritte verbessern können. Aber da der Zweck der Stufe hier ist, langsam zu sein, schauen wir uns stattdessen an, wie wir eine oder mehrere Stufen auffächern können, um langsame Operationen schneller zu erledigen.
Da es sich um ein relativ einfaches Beispiel handelt, haben wir nur zwei Stufen: die Zufallszahlengenerierung und die Primzahlensiebung. In einem größeren Programm könnte deine Pipeline aus viel mehr Phasen bestehen; woher wissen wir dann, welche wir auffächern sollen? Erinnere dich an unsere Kriterien von vorhin: Ordnungsunabhängigkeit und Dauer. Unser Zufallszahlengenerator ist zwar unabhängig von der Reihenfolge, aber er braucht nicht besonders lange, um zu laufen. Die primeFinder
Stufe ist ebenfalls unabhängig von der Reihenfolge - die Zahlen sind entweder Primzahlen oder nicht - und aufgrund unseres naiven Algorithmus dauert die Ausführung sehr lange. Das sieht nach einem guten Kandidaten für eine Auffächerung aus.
Zum Glück ist das Auffächern einer Stufe in einer Pipeline außerordentlich einfach. Alles, was wir tun müssen, ist, mehrere Versionen dieser Stufe zu starten. Also stattdessen:
primeStream
:=
primeFinder
(
done
,
randIntStream
)
Wir können das so machen:
numFinders
:=
runtime
.
NumCPU
()
finders
:=
make
([]
<-
chan
int
,
numFinders
)
for
i
:=
0
;
i
<
numFinders
;
i
++
{
finders
[
i
]
=
primeFinder
(
done
,
randIntStream
)
}
Hier starten wir so viele Kopien dieser Stufe, wie wir CPUs haben. Auf meinem Computer liefert runtime.NumCPU()
acht, also werde ich diese Zahl in unserer Diskussion weiter verwenden. In der Produktion würden wir wahrscheinlich ein paar empirische Tests durchführen, um die optimale Anzahl von CPUs zu ermitteln, aber hier bleiben wir einfach und gehen davon aus, dass eine CPU nur durch eine Kopie der Stufe findPrimes
beschäftigt wird.
Und das war's! Wir haben jetzt acht Goroutinen, die vom Zufallszahlengenerator abrufen und versuchen zu bestimmen, ob die Zahl eine Primzahl ist. Das Generieren von Zufallszahlen sollte nicht viel Zeit in Anspruch nehmen, so dass jede Goroutine für die Phase findPrimes
in der Lage sein sollte, festzustellen, ob ihre Zahl eine Primzahl ist, und dann sofort eine andere Zufallszahl zur Verfügung zu haben.
Wir haben aber immer noch ein Problem: Da wir jetzt vier Goroutinen haben, haben wir auch vier Kanäle, aber unser Bereich über Primzahlen erwartet nur einen Kanal. Das bringt uns zum Fan-in-Teil des Musters.
Wie wir bereits besprochen haben, bedeutet "fanning in", dass mehrere Datenströme zu einem einzigen zusammengeführt werden. Der Algorithmus dafür ist relativ einfach:
fanIn
:=
func
(
done
<-
chan
interface
{
}
,
channels
...
<-
chan
interface
{
}
,
)
<-
chan
interface
{
}
{
var
wg
sync
.
WaitGroup
multiplexedStream
:=
make
(
chan
interface
{
}
)
multiplex
:=
func
(
c
<-
chan
interface
{
}
)
{
defer
wg
.
Done
(
)
for
i
:=
range
c
{
select
{
case
<-
done
:
return
case
multiplexedStream
<-
i
:
}
}
}
// Select from all the channels
wg
.
Add
(
len
(
channels
)
)
for
_
,
c
:=
range
channels
{
go
multiplex
(
c
)
}
// Wait for all the reads to complete
go
func
(
)
{
wg
.
Wait
(
)
close
(
multiplexedStream
)
}
(
)
return
multiplexedStream
}
Hier nehmen wir unseren Standard
done
Kanal auf, damit unsere Goroutinen abgerissen werden können, und dann einen variablen Teil derinterface{}
Kanäle, um sie aufzufächern.Auf dieser Linie erstellen wir eine
sync.WaitGroup
, damit wir warten können, bis alle Kanäle geleert wurden.Hier erstellen wir eine Funktion,
multiplex
, die, wenn sie einen Kanal übergeben bekommt, aus dem Kanal liest und den gelesenen Wert an den KanalmultiplexedStream
weitergibt.Diese Zeile erhöht die
sync.WaitGroup
um die Anzahl der Kanäle, die wir multiplexen.Hier erstellen wir eine Goroutine, die darauf wartet, dass alle Kanäle, die wir multiplexen, entleert werden, damit wir den Kanal
multiplexedStream
schließen können.
Kurz gesagt: Beim Fanning wird der Multiplex-Kanal erstellt, aus dem die Verbraucher lesen werden, und dann wird eine Goroutine für jeden eingehenden Kanal und eine Goroutine zum Schließen des Multiplex-Kanals erstellt, wenn alle eingehenden Kanäle geschlossen worden sind. Da wir eine Goroutine erstellen werden, die darauf wartet, dass N
andere Goroutinen beendet werden, ist es sinnvoll, eine sync.WaitGroup
zu erstellen, um die Dinge zu koordinieren. Die Funktion multiplex
benachrichtigt auch die Funktion WaitGroup
, dass sie fertig ist.
Nehmen wir all das zusammen und schauen wir, ob sich die Laufzeit verkürzt:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
start
:=
time
.
Now
()
rand
:=
func
()
interface
{}
{
return
rand
.
Intn
(
50000000
)
}
randIntStream
:=
toInt
(
done
,
repeatFn
(
done
,
rand
))
numFinders
:=
runtime
.
NumCPU
()
fmt
.
Printf
(
"Spinning up %d prime finders.\n"
,
numFinders
)
finders
:=
make
([]
<-
chan
interface
{},
numFinders
)
fmt
.
Println
(
"Primes:"
)
for
i
:=
0
;
i
<
numFinders
;
i
++
{
finders
[
i
]
=
primeFinder
(
done
,
randIntStream
)
}
for
prime
:=
range
take
(
done
,
fanIn
(
done
,
finders
...
),
10
)
{
fmt
.
Printf
(
"\t%d\n"
,
prime
)
}
fmt
.
Printf
(
"Search took: %v"
,
time
.
Since
(
start
))
Hier sind die Ergebnisse:
Spinning up 8 prime finders. Primes: 6410693 24941317 10128161 36122539 25511527 2107939 14004383 7190363 2393161 45931967 Search took: 5.438491216s
Also runter von ~23 Sekunden auf ~5 Sekunden, nicht schlecht! Das zeigt deutlich den Vorteil des Fan-out- und Fan-in-Musters und unterstreicht den Nutzen von Pipelines. Wir haben unsere Ausführungszeit um ~78% reduziert, ohne die Struktur unseres Programms drastisch zu verändern.
Der oder-erledigt-Kanal
Manchmal wirst du mit Kanälen aus verschiedenen Teilen deines Systems arbeiten. Anders als bei Pipelines kannst du keine Aussagen darüber machen, wie sich ein Kanal verhält, wenn der Code, mit dem du arbeitest, über seinen done
Kanal abgebrochen wird. Das heißt, du weißt nicht, ob die Tatsache, dass deine Goroutine abgebrochen wurde, auch bedeutet, dass der Kanal, aus dem du liest, ebenfalls abgebrochen wurde. Aus diesem Grund müssen wir, wie in "Goroutine-Lecks verhindern" beschrieben , das Lesen aus dem Kanal mit einer select
Anweisung einschließen, die auch aus einem done
Kanal auswählt. Das ist völlig in Ordnung, aber dafür brauchen wir einen Code, der sich leicht wie folgt lesen lässt:
for
val
:=
range
myChan
{
// Do something with val
}
Und dann explodiert es in das hier:
loop
:
for
{
select
{
case
<-
done
:
break
loop
case
maybeVal
,
ok
:=
<-
myChan
:
if
ok
==
false
{
return
// or maybe break from for
}
// Do something with val
}
}
Das kann ziemlich schnell sehr anstrengend werden - vor allem, wenn du verschachtelte Schleifen hast. Da wir Goroutinen verwenden, um übersichtlicheren, nebenläufigen Code zu schreiben und nicht zu früh zu optimieren, können wir dies mit einer einzigen Goroutine lösen. Wir kapseln die Ausführlichkeit ein, damit andere das nicht tun müssen:
orDone
:=
func
(
done
,
c
<-
chan
interface
{})
<-
chan
interface
{}
{
valStream
:=
make
(
chan
interface
{})
go
func
()
{
defer
close
(
valStream
)
for
{
select
{
case
<-
done
:
return
case
v
,
ok
:=
<-
c
:
if
ok
==
false
{
return
}
select
{
case
valStream
<-
v
:
case
<-
done
:
}
}
}
}()
return
valStream
}
Auf diese Weise können wir zu einfachen for
Schleifen zurückkommen:
for
val
:=
range
orDone
(
done
,
myChan
)
{
// Do something with val
}
Vielleicht findest du in deinem Code Kanten, in denen du eine enge Schleife mit einer Reihe von select
Anweisungen brauchst, aber ich würde dir raten, dich zuerst um die Lesbarkeit zu bemühen und eine vorzeitige Optimierung zu vermeiden.
Der T-Stück-Kanal
Manchmal möchtest du die von einem Kanal eingehenden Werte aufteilen, damit du sie an zwei verschiedene Bereiche deiner Codebasis weiterleiten kannst. Stell dir einen Kanal mit Benutzerkommandos vor: Du möchtest vielleicht einen Strom von Benutzerkommandos auf einem Kanal empfangen, sie an etwas senden, das sie ausführt, und sie auch an etwas senden, das die Kommandos für spätere Prüfungen protokolliert.
In Anlehnung an den Befehl tee
in Unix-ähnlichen Systemen macht der tee-channel genau das. Du kannst ihm einen Kanal übergeben, von dem er lesen soll, und er gibt zwei separate Kanäle zurück, die denselben Wert erhalten:
tee
:=
func
(
done
<-
chan
interface
{
}
,
in
<-
chan
interface
{
}
,
)
(
_
,
_
<-
chan
interface
{
}
)
{
<-
chan
interface
{
}
)
{
out1
:=
make
(
chan
interface
{
}
)
out2
:=
make
(
chan
interface
{
}
)
go
func
(
)
{
defer
close
(
out1
)
defer
close
(
out2
)
for
val
:=
range
orDone
(
done
,
in
)
{
var
out1
,
out2
=
out1
,
out2
for
i
:=
0
;
i
<
2
;
i
++
{
select
{
case
<-
done
:
case
out1
<-
val
:
out1
=
nil
case
out2
<-
val
:
out2
=
nil
}
}
}
}
(
)
return
out1
,
out2
}
Wir werden lokale Versionen von
out1
undout2
verwenden wollen, also schatten wir diese Variablen.Wir verwenden eine
select
Anweisung, damit sich die Schreibvorgänge aufout1
undout2
nicht gegenseitig blockieren. Um sicherzustellen, dass beide Adressen erreicht werden, führen wir zwei Iterationen der Anweisungselect
durch: eine für jeden Ausgangskanal.Sobald wir in einen Kanal geschrieben haben, setzen wir seine shadowed copy auf
nil
, damit weitere Schreibvorgänge blockiert werden und der andere Kanal weiterlaufen kann.
Beachte, dass die Schreibvorgänge auf out1
und out2
eng miteinander verbunden sind. Die Iteration über in
kann erst fortgesetzt werden, wenn sowohl out1
als auch out2
beschrieben worden sind. Normalerweise ist das kein Problem, da der Durchsatz des Prozesses, der von den einzelnen Kanälen liest, ohnehin von etwas anderem als dem Tee-Befehl gesteuert werden sollte, aber es ist dennoch erwähnenswert. Hier ist ein kurzes Beispiel zur Veranschaulichung:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
out1
,
out2
:=
tee
(
done
,
take
(
done
,
repeat
(
done
,
1
,
2
),
4
))
for
val1
:=
range
out1
{
fmt
.
Printf
(
"out1: %v, out2: %v\n"
,
val1
,
<-
out2
)
}
Mit diesem Muster ist es einfach, die Kanäle als Verbindungspunkte deines Systems zu verwenden.
Der Brücken-Kanal
Es kann vorkommen, dass du Werte aus einer Reihe von Kanälen konsumieren möchtest:
<-chan <-chan interface{}
Das ist etwas anders als das Zusammenführen einer Reihe von Kanälen zu einem einzigen Kanal, wie wir in "Der oder-Kanal" oder "Fan-Out, Fan-In" gesehen haben . Eine Folge von Kanälen deutet auf ein geordnetes Schreiben hin, wenn auch aus unterschiedlichen Quellen. Ein Beispiel wäre eine Pipeline-Stufe, deren Lebensdauer unregelmäßig ist. Wenn wir den Mustern folgen, die wir in "Confinement" festgelegt haben, und sicherstellen, dass die Kanäle den Goroutinen gehören, die in sie schreiben, würde jedes Mal, wenn eine Pipelinestufe innerhalb einer neuen Goroutine neu gestartet wird, ein neuer Kanal erstellt werden. Das bedeutet, dass wir tatsächlich eine Reihe von Kanälen haben. Wir werden dieses Szenario in "Ungesunde Goroutinen heilen" genauer untersuchen .
Als Verbraucher kann es dem Code egal sein, dass seine Werte aus einer Reihe von Kanälen stammen. In diesem Fall kann der Umgang mit einem Kanal von Kanälen mühsam sein. Wenn wir stattdessen eine Funktion definieren, die den Kanal der Kanäle in einen einfachen Kanal umstrukturieren kann - eine Technik, die als Überbrückung der Kanäle bezeichnet wird -, wird es für den Verbraucher viel einfacher, sich auf das eigentliche Problem zu konzentrieren. So können wir das erreichen:
bridge
:=
func
(
done
<-
chan
interface
{
}
,
chanStream
<-
chan
<-
chan
interface
{
}
,
)
<-
chan
interface
{
}
{
valStream
:=
make
(
chan
interface
{
}
)
go
func
(
)
{
defer
close
(
valStream
)
for
{
var
stream
<-
chan
interface
{
}
select
{
case
maybeStream
,
ok
:=
<-
chanStream
:
if
ok
==
false
{
return
}
stream
=
maybeStream
case
<-
done
:
return
}
for
val
:=
range
orDone
(
done
,
stream
)
{
select
{
case
valStream
<-
val
:
case
<-
done
:
}
}
}
}
(
)
return
valStream
}
Dies ist der Kanal, der alle Werte von
bridge
zurückgibt.Diese Schleife ist dafür verantwortlich, Kanäle aus
chanStream
zu ziehen und sie einer verschachtelten Schleife zur Verwendung zur Verfügung zu stellen.Diese Schleife ist dafür verantwortlich, Werte aus dem Kanal zu lesen, der ihr zugewiesen wurde, und diese Werte auf
valStream
zu wiederholen. Wenn der Stream, über den wir gerade eine Schleife laufen, geschlossen ist, brechen wir die Schleife ab, um die Werte aus diesem Kanal zu lesen, und fahren mit der nächsten Iteration der Schleife fort, indem wir Kanäle auswählen, aus denen wir lesen. So erhalten wir einen ununterbrochenen Strom von Werten.
Das ist ein ziemlich unkomplizierter Code. Jetzt können wir bridge
verwenden, um eine Fassade aus einem einzelnen Kanal über einen Kanal von Kanälen zu präsentieren. Hier ist ein Beispiel, das eine Reihe von 10 Kanälen erstellt, in die jeweils ein Element geschrieben wird, und diese Kanäle an die Funktion bridge
übergibt:
genVals
:=
func
()
<-
chan
<-
chan
interface
{}
{
chanStream
:=
make
(
chan
(
<-
chan
interface
{}))
go
func
()
{
defer
close
(
chanStream
)
for
i
:=
0
;
i
<
10
;
i
++
{
stream
:=
make
(
chan
interface
{},
1
)
stream
<-
i
close
(
stream
)
chanStream
<-
stream
}
}()
return
chanStream
}
for
v
:=
range
bridge
(
nil
,
genVals
())
{
fmt
.
Printf
(
"%v "
,
v
)
}
Das Ergebnis ist:
0 1 2 3 4 5 6 7 8 9
Dank bridge
können wir den Kanal der Kanäle innerhalb einer einzigen Bereichsanweisung verwenden und uns auf die Logik unserer Schleife konzentrieren. Die Destrukturierung des Kanals der Kanäle wird dem Code überlassen, der speziell für dieses Anliegen bestimmt ist.
Warteschlange
Manchmal ist es sinnvoll, Arbeit für deine Pipeline anzunehmen, obwohl die Pipeline noch nicht bereit für mehr ist. Dieser Prozess wird Warteschlange genannt.
Das bedeutet nur, dass deine Stage nach getaner Arbeit die Daten an einem temporären Ort im Speicher ablegt, so dass andere Stages sie später abrufen können und deine Stage keinen Verweis auf sie halten muss. Im Abschnitt "Kanäle" haben wir gepufferte Kanäle, eine Art Warteschlange, besprochen, aber seitdem haben wir sie nicht mehr wirklich genutzt - aus gutem Grund.
Die Einführung von Warteschlangen in dein System ist zwar sehr nützlich, gehört aber normalerweise zu den letzten Techniken, die du bei der Optimierung deines Programms einsetzen solltest. Wenn du Warteschlangen zu früh hinzufügst, können sich Synchronisationsprobleme wie Deadlocks und Livelocks verbergen. Außerdem kannst du feststellen, dass du mehr oder weniger Warteschlangen brauchst, wenn sich dein Programm der Korrektheit annähert.
Wozu ist Queuing also gut? Beginnen wir mit der Beantwortung dieser Frage, indem wir uns mit einem der häufigsten Fehler befassen, den Menschen machen, wenn sie versuchen, die Leistung eines Systems zu verbessern: die Einführung von Warteschlangen, um Leistungsprobleme zu lösen. Warteschlangen werden die Gesamtlaufzeit deines Programms fast nie beschleunigen; sie sorgen nur dafür, dass sich das Programm anders verhält.
Um zu verstehen, warum, lass uns einen Blick auf eine einfache Pipeline werfen:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
zeros
:=
take
(
done
,
3
,
repeat
(
done
,
0
))
short
:=
sleep
(
done
,
1
*
time
.
Second
,
zeros
)
long
:=
sleep
(
done
,
4
*
time
.
Second
,
short
)
pipeline
:=
long
Diese Pipeline verknüpft vier Stufen miteinander:
-
Eine Wiederholungsstufe, die einen endlosen Strom von 0s erzeugt.
-
Eine Stufe, die die vorherigen Stufen abbricht, nachdem du drei Gegenstände gesehen hast.
-
Eine "kurze" Stufe, die eine Sekunde schläft.
-
Eine "lange" Etappe, die vier Sekunden schläft.
Für dieses Beispiel gehen wir davon aus, dass die Phasen 1 und 2 sofort ablaufen, und konzentrieren uns darauf, wie sich die Phasen, die schlafen, auf die Laufzeit der Pipeline auswirken.
Hier ist eine Tabelle, die die Zeit t
, die Iteration i
und die verbleibende Zeit für die langen und kurzen Stufen untersucht.
Zeit(t) | i | Lange Etappe | Kurze Etappe |
---|---|---|---|
0 |
0 |
1s |
|
1 |
0 |
4s |
1s |
2 |
0 |
3s |
(blockiert) |
3 |
0 |
2s |
(blockiert) |
4 |
0 |
1s |
(blockiert) |
5 |
1 |
4s |
1s |
6 |
1 |
3s |
(blockiert) |
7 |
1 |
2s |
(blockiert) |
8 |
1 |
1s |
(blockiert) |
9 |
2 |
4s |
(schließen) |
10 |
2 |
3s |
|
11 |
2 |
2s |
|
12 |
2 |
1s |
|
13 |
3 |
(schließen) |
Du kannst sehen, dass diese Pipeline ungefähr 13 Sekunden braucht, um zu laufen. Die kurze Phase dauert etwa 9 Sekunden.
Was passiert, wenn wir die Pipeline so verändern, dass sie einen Puffer enthält? Untersuchen wir dieselbe Pipeline mit einem Puffer von 2 zwischen der langen und der kurzen Stufe:
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
zeros
:=
take
(
done
,
3
,
repeat
(
done
,
0
))
short
:=
sleep
(
done
,
1
*
time
.
Second
,
zeros
)
buffer
:=
buffer
(
done
,
2
,
short
)
// Buffers sends from short by 2
long
:=
sleep
(
done
,
4
*
time
.
Second
,
short
)
pipeline
:=
long
Hier ist die Laufzeit:
Zeit(t) | i | Lange Etappe | Puffer | Kurze Etappe |
---|---|---|---|---|
0 |
0 |
0/2 |
1s |
|
1 |
0 |
4s |
0/2 |
1s |
2 |
0 |
3s |
1/2 |
1s |
3 |
0 |
2s |
2/2 |
(schließen) |
4 |
0 |
1s |
2/2 |
|
5 |
1 |
4s |
1/2 |
|
6 |
1 |
3s |
1/2 |
|
7 |
1 |
2s |
1/2 |
|
8 |
1 |
1s |
1/2 |
|
9 |
2 |
4s |
0/2 |
|
10 |
2 |
3s |
0/2 |
|
11 |
2 |
2s |
0/2 |
|
12 |
2 |
1s |
0/2 |
|
13 |
3 |
(schließen) |
Die gesamte Pipeline brauchte immer noch 13 Sekunden! Aber schau dir die Laufzeit der kurzen Phase an. Sie ist nach nur 3 Sekunden abgeschlossen, im Gegensatz zu den 9 Sekunden, die sie vorher brauchte. Wir haben die Laufzeit dieser Phase um zwei Drittel verkürzt! Aber was nützt uns das, wenn die gesamte Pipeline immer noch 13 Sekunden für die Ausführung braucht?
Stelle dir stattdessen die folgende Pipeline vor:
p
:=
processRequest
(
done
,
acceptConnection
(
done
,
httpHandler
))
Hier wird die Pipeline erst beendet, wenn sie abgebrochen wird, und die Stufe, die Verbindungen annimmt, hört erst auf, Verbindungen anzunehmen, wenn die Pipeline abgebrochen wird. In diesem Szenario würdest du nicht wollen, dass die Verbindungen zu deinem Programm abbrechen, weil deine processRequest
Stage deine acceptConnection
Stage blockiert hat. Du möchtest, dass deine acceptConnection
Stage so weit wie möglich freigegeben wird. Andernfalls könnten die Nutzer deines Programms feststellen, dass ihre Anfragen komplett abgelehnt werden.
Die Antwort auf unsere Frage nach dem Nutzen der Einführung einer Warteschlange ist also nicht, dass die Laufzeit einer Stage verkürzt wird, sondern dass die Zeit, in der sie sich in einem blockierenden Zustand befindet, reduziert wird. Dadurch kann die Stufe weiterhin ihre Arbeit erledigen. In diesem Beispiel würden die Nutzerinnen und Nutzer wahrscheinlich eine Verzögerung bei ihren Anfragen erleben, aber ihnen würde der Service nicht komplett verweigert werden.
Der wahre Nutzen von Warteschlangen besteht also darin, Phasen zu entkoppeln, so dass die Laufzeit einer Phase keinen Einfluss auf die Laufzeit einer anderen Phase hat. Eine solche Entkopplung von Stufen verändert dann kaskadenartig das Laufzeitverhalten des gesamten Systems, was je nach System gut oder schlecht sein kann.
Dann kommen wir zu der Frage, wie du deine Warteschlangen abstimmen kannst. Wo sollen die Warteschlangen platziert werden? Wie groß sollte der Puffer sein? Die Antworten auf diese Fragen hängen von der Art deiner Pipeline ab.
Beginnen wir mit der Analyse von Situationen, in denen Warteschlangen die Gesamtleistung deines Systems steigern können. Die einzigen anwendbaren Situationen sind:
-
Wenn die Bündelung von Anfragen in einer Phase Zeit spart.
-
Wenn Verzögerungen in einer Stufe eine Rückkopplungsschleife in das System erzeugen.
Ein Beispiel für die erste Situation ist eine Stufe, die Eingaben an einem Ort puffert, der schneller ist (z. B. der Speicher) als der, an den sie gesendet werden soll (z. B. die Festplatte). Das ist natürlich der ganze Zweck des bufio
Pakets von Go. Hier ist ein Beispiel, das einen einfachen Vergleich zwischen einem gepufferten Schreiben in eine Warteschlange und einem ungepufferten Schreiben zeigt:
func
BenchmarkUnbufferedWrite
(
b
*
testing
.
B
)
{
performWrite
(
b
,
tmpFileOrFatal
())
}
func
BenchmarkBufferedWrite
(
b
*
testing
.
B
)
{
bufferredFile
:=
bufio
.
NewWriter
(
tmpFileOrFatal
())
performWrite
(
b
,
bufio
.
NewWriter
(
bufferredFile
))
}
func
tmpFileOrFatal
()
*
os
.
File
{
file
,
err
:=
ioutil
.
TempFile
(
""
,
"tmp"
)
if
err
!=
nil
{
log
.
Fatal
(
"error: %v"
,
err
)
}
return
file
}
func
performWrite
(
b
*
testing
.
B
,
writer
io
.
Writer
)
{
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
b
.
ResetTimer
()
for
bt
:=
range
take
(
done
,
repeat
(
done
,
byte
(
0
)),
b
.
N
)
{
writer
.
Write
([]
byte
{
bt
.(
byte
)})
}
}
gotest
-bench=
. src/concurrency-patterns-in-go/queuing/buffering_test.go
Und hier sind die Ergebnisse dieses Benchmarks:
BenchmarkUnbufferedWrite-8 |
500000 |
3969 |
ns/op |
BenchmarkBufferedWrite-8 |
1000000 |
1356 |
ns/op |
PASS |
|||
ok |
Befehlszeilen-Argumente |
3.398s |
Wie erwartet, ist das gepufferte Schreiben schneller als das ungepufferte Schreiben. Das liegt daran, dass in bufio.Writer
die Schreibvorgänge intern in eine Warteschlange gestellt werden, bis ein ausreichender Chunk angehäuft wurde, und dann wird der Chunk herausgeschrieben. Dieser Vorgang wird aus offensichtlichen Gründen oft als Chunking bezeichnet.
Chunking ist schneller, weil bytes.Buffer
seinen zugewiesenen Speicher vergrößern muss, um die zu speichernden Bytes unterzubringen. Aus verschiedenen Gründen ist die Vergrößerung des Speichers kostspielig. Je weniger wir also den Speicher vergrößern müssen, desto effizienter arbeitet unser System als Ganzes. Die Warteschlange hat also die Leistung unseres Systems insgesamt erhöht.
Dies ist nur ein einfaches Beispiel für Chunking im Arbeitsspeicher, aber du wirst in der Praxis häufig auf Chunking treffen. Immer dann, wenn die Durchführung einer Operation einen Overhead erfordert, kann Chunking die Systemleistung erhöhen. Einige Beispiele dafür sind das Öffnen von Datenbanktransaktionen, die Berechnung von Prüfsummen für Nachrichten und die Zuweisung von zusammenhängendem Speicherplatz.
Neben dem Chunking kann Queuing auch helfen, wenn dein Algorithmus durch die Unterstützung von Lookbehinds oder Ordering optimiert werden kann.
Das zweite Szenario, bei dem eine Verzögerung in einer Phase zu mehr Input in der Pipeline führt, ist etwas schwieriger zu erkennen, aber auch wichtiger, weil es zu einem systemischen Zusammenbruch deiner vorgelagerten Systeme führen kann.
Diese Idee wird oft als negative Rückkopplungsschleife, Abwärtsspirale oder sogar Todesspirale bezeichnet. Das liegt daran, dass zwischen der Pipeline und ihren vorgelagerten Systemen eine wiederkehrende Beziehung besteht: Die Geschwindigkeit, mit der die vorgelagerten Stufen oder Systeme neue Anfragen stellen, ist in gewisser Weise damit verbunden, wie effizient die Pipeline ist.
Wenn die Effizienz der Pipeline unter eine bestimmte kritische Schwelle fällt, erhöhen die der Pipeline vorgelagerten Systeme ihren Input in die Pipeline, wodurch die Pipeline weiter an Effizienz verliert und die Todesspirale beginnt. Ohne eine Art Fail-Safe wird sich das System, das die Pipeline nutzt, niemals erholen.
Indem du eine Warteschlange am Eingang der Pipeline einführst, kannst du die Rückkopplungsschleife durchbrechen, allerdings auf Kosten einer Verzögerung der Anfragen. Aus der Perspektive des Aufrufers in der Pipeline scheint die Anfrage bearbeitet zu werden, aber sie braucht sehr lange. Solange der Aufrufer keine Zeitüberschreitung hat, bleibt deine Pipeline stabil. Wenn der Aufrufer eine Zeitüberschreitung erleidet, musst du sicherstellen, dass du eine Art Bereitschaftsprüfung für das Dequeuing unterstützt. Wenn du das nicht tust, kannst du ungewollt eine Rückkopplungsschleife erzeugen, indem du tote Anfragen bearbeitest und so die Effizienz deiner Pipeline verringerst.
Aus unseren Beispielen lässt sich also ein Muster ableiten: Warteschlangen sollten entweder implementiert werden:
-
Am Eingang zu deiner Pipeline.
-
In Phasen, in denen die Dosierung zu einer höheren Effizienz führt.
Du könntest versucht sein, Queuing an anderer Stelle einzufügen - z. B. nach einer rechenintensiven Phase -, aber lass dich nicht dazu verleiten! Wie wir gelernt haben, gibt es nur wenige Situationen, in denen die Warteschlangenbildung die Laufzeit deiner Pipeline verkürzt, und wenn du versuchst, dies zu umgehen, kann das katastrophale Folgen haben.
Das ist zunächst nicht intuitiv; um zu verstehen, warum, müssen wir den Durchsatz der Pipeline diskutieren. Keine Sorge, das ist gar nicht so schwierig und hilft uns auch bei der Beantwortung der Frage, wie wir die Größe unserer Warteschlangen bestimmen können.
In der Warteschlangentheorie gibt es ein Gesetz, das - bei ausreichenden Stichproben - den Durchsatz deiner Pipeline vorhersagt. Es heißt Little's Law, und du musst nur ein paar Dinge wissen, um es zu verstehen und zu nutzen.
Definieren wir zunächst das Little'sche Gesetz algebraisch. Es wird im Allgemeinen wie folgt ausgedrückt: L=λW
, wobei:
-
L
= die durchschnittliche Anzahl der Einheiten im System. -
λ
= die durchschnittliche Ankunftsrate der Einheiten. -
W
= die durchschnittliche Zeit, die eine Einheit im System verbringt.
Diese Gleichung gilt nur für sogenannte stabile Systeme. In einer Pipeline ist ein stabiles System ein System, in dem die Geschwindigkeit, mit der Arbeit in die Pipeline eintritt ( Ingress), der Geschwindigkeit entspricht, mit der sie das System verlässt ( Egress). Wenn die Rate des Ingresses die Rate des Egresses übersteigt, ist dein System instabil und befindet sich in einer Todesspirale. Wenn die Ingress-Rate geringer ist als die Egress-Rate, ist dein System zwar immer noch instabil, aber deine Ressourcen werden nicht vollständig genutzt. Das ist zwar nicht das Schlimmste, aber wenn die Unterauslastung in großem Umfang auftritt (z. B. in Clustern oder Rechenzentren), ist das vielleicht nicht so schlimm.
Gehen wir also davon aus, dass unsere Pipeline stabil ist. Wenn wir W
, die durchschnittliche Zeit, die eine Einheit im System verbringt, um den Faktor n
verringern wollen, haben wir nur eine Möglichkeit: die durchschnittliche Anzahl der Einheiten im System zu verringern: L/n = λ * W/n
. Und wir können die durchschnittliche Anzahl der Einheiten im System nur verringern, wenn wir die Ausstiegsrate erhöhen. Beachte auch, dass wir, wenn wir Warteschlangen zu unseren Stufen hinzufügen, L
erhöhen, was entweder die Ankunftsrate der Einheiten (nL = nλ * W
) oder die durchschnittliche Zeit, die eine Einheit im System verbringt, erhöht (nL = λ * nW
). Mit dem Little'schen Gesetz haben wir bewiesen, dass Warteschlangen nicht dazu beitragen, die Verweildauer in einem System zu verringern.
Da wir unsere Pipeline als Ganzes betrachten, ist die Verringerung von W
um den Faktor n
auf alle Stufen unserer Pipeline verteilt. In unserem Fall sollte das Little'sche Gesetz eigentlich so definiert werden:
- L = λΣiWi
Das ist eine andere Art zu sagen, dass deine Pipeline nur so schnell sein wird, wie deine langsamste Phase. Optimiere wahllos!
Das Little'sche Gesetz ist also klasse! Diese einfache Gleichung eröffnet uns viele Möglichkeiten, unsere Pipeline zu analysieren. Wir wollen sie nutzen, um einige interessante Fragen zu stellen. Bei unserer Analyse gehen wir davon aus, dass unsere Pipeline drei Stufen hat.
Versuchen wir herauszufinden, wie viele Anfragen pro Sekunde unsere Pipeline verarbeiten kann. Nehmen wir an, wir aktivieren das Sampling in unserer Pipeline und finden heraus, dass eine Anfrage (r
) etwa 1 Sekunde braucht, um die Pipeline zu durchlaufen. Geben wir diese Zahlen ein!
3r = λr/s * 1s
3r/s = λr/s
λr/s = 3r/s
Wir setzen L
auf 3, weil jede Stufe in unserer Pipeline eine Anfrage bearbeitet. Dann setzen wir W
auf 1 Sekunde, machen ein bisschen Algebra und voilà! In dieser Pipeline können wir drei Anfragen pro Sekunde bearbeiten.
Wie können wir bestimmen, wie groß unsere Warteschlange sein muss, um eine bestimmte Anzahl von Anfragen zu bearbeiten? Kann uns das Little'sche Gesetz dabei helfen?
Nehmen wir an, unsere Stichproben zeigen, dass eine Anfrage 1 ms zur Bearbeitung braucht. Wie groß müsste unsere Warteschlange sein, um 100.000 Anfragen pro Sekunde zu bearbeiten? Noch einmal: Gib die Zahlen ein!
Lr-3r = 100,000r/s * 0.0001s
Lr-3r = 10r
Lr = 7r
Auch hier hat unsere Pipeline drei Stufen, also verringern wir L
um 3. Wir setzen λ
auf 100.000 r/s und stellen fest, dass unsere Warteschlange eine Kapazität von 7 haben sollte, wenn wir so viele Anfragen bearbeiten wollen. Erinnere dich daran, dass deine Arbeit länger braucht, um das System zu durchlaufen, je größer die Warteschlange ist! Du tauschst also Systemauslastung gegen Verzögerung.
Was Little's Law nicht beantworten kann, ist der Umgang mit Fehlern. Denk daran, dass du alle Anfragen in deiner Warteschlange verlierst, wenn deine Pipeline aus irgendeinem Grund in Panik gerät. Davor solltest du dich hüten, wenn es schwierig ist oder nicht möglich ist, die Anfragen neu zu erstellen. Um dies zu vermeiden, kannst du entweder eine Warteschlangengröße von Null festlegen oder eine persistente Warteschlange verwenden, d. h. eine Warteschlange, die irgendwo gespeichert wird und aus der später bei Bedarf gelesen werden kann.
Warteschlangen können in deinem System nützlich sein, aber aufgrund ihrer Komplexität ist sie normalerweise eine der letzten Optimierungen, die ich vorschlagen würde.
Das Kontext-Paket
Wie wir gesehen haben, ist es in nebenläufigen Programmen oft notwendig, Operationen aufgrund von Zeitüberschreitungen, Abbrüchen oder Fehlern in anderen Teilen des Systems zu unterbrechen. Wir haben uns mit dem Idiom beschäftigt, einen done
Kanal zu erstellen, der durch dein Programm fließt und alle blockierenden gleichzeitigen Operationen aufhebt. Das funktioniert gut, ist aber auch etwas eingeschränkt.
Es wäre nützlich, wenn wir neben der einfachen Benachrichtigung über den Abbruch zusätzliche Informationen übermitteln könnten: warum der Abbruch erfolgt oder ob unsere Funktion eine Frist hat, bis zu der sie abgeschlossen sein muss.
Es hat sich herausgestellt, dass die Notwendigkeit, einen done
Kanal mit diesen Informationen zu umhüllen, in Systemen jeder Größe sehr häufig vorkommt, und deshalb haben die Go-Autoren beschlossen, ein Standardmuster dafür zu erstellen. Zunächst war es ein Experiment, das außerhalb der Standardbibliothek stattfand, aber in Go 1.7 wurde das Paket context
in die Standardbibliothek aufgenommen, so dass dieses Idiom nun zum Standard in Go gehört, wenn man mit nebenläufigem Code arbeitet.
Wenn wir einen Blick auf das context
Paket werfen, sehen wir, dass es sehr einfach ist:
var
Canceled
=
errors
.
New
(
"context canceled"
)
var
DeadlineExceeded
error
=
deadlineExceededError
{}
type
CancelFunc
type
Context
func
Background
()
Context
func
TODO
()
Context
func
WithCancel
(
parent
Context
)
(
ctx
Context
,
cancel
CancelFunc
)
func
WithDeadline
(
parent
Context
,
deadline
time
.
Time
)
(
Context
,
CancelFunc
)
func
WithTimeout
(
parent
Context
,
timeout
time
.
Duration
)
(
Context
,
CancelFunc
)
func
WithValue
(
parent
Context
,
key
,
val
interface
{})
Context
Wir werden uns diese Typen und Funktionen später noch einmal ansehen, aber jetzt konzentrieren wir uns erst einmal auf den Typ Context
. Das ist der Typ, der ähnlich wie ein done
Kanal durch dein System fließen wird. Wenn du das Paket context
verwendest, erhält jede Funktion, die deinem gleichzeitigen Aufruf auf oberster Ebene nachgeschaltet ist, ein Context
als erstes Argument. Der Typ sieht folgendermaßen aus:
type
Context
interface
{
// Deadline returns the time when work done on behalf of this
// context should be canceled. Deadline returns ok==false when no
// deadline is set. Successive calls to Deadline return the same
// results.
Deadline
()
(
deadline
time
.
Time
,
ok
bool
)
// Done returns a channel that's closed when work done on behalf
// of this context should be canceled. Done may return nil if this
// context can never be canceled. Successive calls to Done return
// the same value.
Done
()
<-
chan
struct
{}
// Err returns a non-nil error value after Done is closed. Err
// returns Canceled if the context was canceled or
// DeadlineExceeded if the context's deadline passed. No other
// values for Err are defined. After Done is closed, successive
// calls to Err return the same value.
Err
()
error
// Value returns the value associated with this context for key,
// or nil if no value is associated with key. Successive calls to
// Value with the same key returns the same result.
Value
(
key
interface
{})
interface
{}
}
Auch das sieht ziemlich einfach aus. Es gibt eine Done
Methode, die einen Kanal zurückgibt, der geschlossen wird, wenn unsere Funktion vorzeitig beendet werden soll. Außerdem gibt es einige neue, aber leicht verständliche Methoden: eine Deadline
Funktion, die anzeigt, ob eine Goroutine nach einer bestimmten Zeit abgebrochen wird, und eine Err
Methode, die einen Wert ungleich Null zurückgibt, wenn die Goroutine abgebrochen wurde. Aber die Methode Value
sieht ein bisschen fehl am Platz aus. Wozu ist sie da?
Die Go-Autoren haben festgestellt, dass eine der Hauptanwendungen von Goroutinen Programme sind, die Anfragen bedienen. In diesen Programmen müssen in der Regel neben den Informationen über die Vorkaufsrechte auch anfragespezifische Informationen weitergegeben werden. Das ist der Zweck der Funktion Value
. Wir werden später noch mehr darüber sprechen, aber im Moment müssen wir nur wissen, dass das Paket context
zwei Hauptzwecke erfüllt:
-
Um eine API für das Abbrechen von Zweigen deines Call-Graphs bereitzustellen.
-
Um einen Data-Bag für den Transport von anforderungsspezifischen Daten durch deinen Call-Graph bereitzustellen.
Konzentrieren wir uns auf den ersten Aspekt: die Kündigung.
Wie wir in "Goroutine-Lecks verhindern" gelernt haben , hat der Abbruch in einer Funktion drei Aspekte:
-
Der Elternteil einer Goroutine möchte sie vielleicht abbrechen.
-
Eine Goroutine möchte vielleicht ihre Kinder abbrechen.
-
Alle blockierenden Operationen innerhalb einer Goroutine müssen preemptable sein, damit sie abgebrochen werden können.
Das Paket context
hilft dabei, alle drei Punkte zu verwalten.
Wie bereits erwähnt, ist der Typ Context
das erste Argument für deine Funktion. Wenn du dir die Methoden der Schnittstelle Context
ansiehst, wirst du feststellen, dass es nichts gibt, was den Zustand der zugrunde liegenden Struktur verändern kann. Außerdem gibt es nichts, was es der Funktion, die Context
annimmt, erlaubt, sie abzubrechen. Dies schützt Funktionen auf dem Aufrufstapel davor, dass Kinder den Kontext aufheben. In Verbindung mit der Methode Done
, die einen Kanal done
bereitstellt, kann der Typ Context
die Stornierung durch seine Vorgänger sicher verwalten.
Das wirft eine Frage auf: Wenn Context
unveränderlich ist, wie beeinflussen wir dann das Verhalten von Abbrüchen in Funktionen unterhalb einer aktuellen Funktion im Aufrufstapel?
An dieser Stelle werden die Funktionen im Paket context
wichtig. Schauen wir uns ein paar von ihnen noch einmal an, um unser Gedächtnis aufzufrischen:
func
WithCancel
(
parent
Context
)
(
ctx
Context
,
cancel
CancelFunc
)
func
WithDeadline
(
parent
Context
,
deadline
time
.
Time
)
(
Context
,
CancelFunc
)
func
WithTimeout
(
parent
Context
,
timeout
time
.
Duration
)
(
Context
,
CancelFunc
)
Beachte, dass alle diese Funktionen eine Context
aufnehmen und auch eine zurückgeben. Einige von ihnen nehmen auch andere Argumente entgegen, wie deadline
und timeout
. Die Funktionen erzeugen alle neue Instanzen von Context
mit den jeweiligen Optionen dieser Funktionen.
WithCancel
gibt eine neue Context
zurück, die ihren done
Kanal schließt, wenn die zurückgegebene cancel
Funktion aufgerufen wird. WithDeadline
gibt eine neue Context
zurück, die ihren done
Kanal schließt, wenn die Uhr der Maschine über die angegebene deadline
hinausgeht. WithTimeout
gibt eine neue Context
zurück, die ihren done
Kanal nach der angegebenen timeout
Dauer schließt.
Wenn deine Funktion Funktionen, die unter ihr im Aufrufdiagramm stehen, auf irgendeine Weise abbrechen muss, ruft sie eine dieser Funktionen auf und übergibt die Context
, die ihr gegeben wurde, und leitet dann die zurückgegebene Context
an ihre Kinder weiter. Wenn deine Funktion das Abbruchverhalten nicht ändern muss, gibt die Funktion einfach die Context
weiter, die sie erhalten hat.
Auf diese Weise können aufeinanderfolgende Schichten des Aufrufgraphen eine Context
erstellen, die ihren Bedürfnissen entspricht, ohne ihre Eltern zu beeinflussen. Dies ist eine sehr kompatible, elegante Lösung für die Verwaltung von Verzweigungen deines Aufrufgraphen.
In diesem Sinne sollen die Instanzen von Context
durch den Aufrufgraphen deines Programms fließen. In einem objektorientierten Paradigma ist es üblich, Verweise auf häufig verwendete Daten als Mitgliedsvariablen zu speichern, aber es ist wichtig, dies nicht mit Instanzen von context.Context
. Instanzen von context.Context
sehen von außen vielleicht gleich aus, aber intern können sie sich bei jedem Stack-Frame ändern. Aus diesem Grund ist es wichtig, Instanzen von Context
immer an deine Funktionen zu übergeben. Auf diese Weise erhalten Funktionen die Context
, die für sie bestimmt ist, und nicht die Context
, die für einen Stack-Frame N
eine Ebene höher im Stack bestimmt ist.
Am Anfang deines asynchronen Aufrufgraphen wird deinem Code wahrscheinlich kein Context
übergeben worden sein. Um die Kette zu starten, stellt dir das Paket context
zwei Funktionen zur Verfügung, um leere Instanzen von Context
zu erzeugen:
func
Background
()
Context
func
TODO
()
Context
Background
gibt einfach einen leeren Context
zurück. TODO
ist nicht für den Einsatz in der Produktion gedacht, gibt aber auch einen leeren Context
zurück. TODO
soll als Platzhalter dienen, wenn du nicht weißt, welchen Context
du verwenden sollst, oder wenn du erwartest, dass dein Code mit einem Context
versehen wird, aber der vorgelagerte Code noch keinen geliefert hat.
Wenden wir also all dies an. Schauen wir uns ein Beispiel an, das das Kanalmuster done
verwendet, und sehen wir uns an, welche Vorteile wir durch die Verwendung des Pakets context
erzielen können. Hier ist ein Programm, das gleichzeitig eine Begrüßung und eine Verabschiedung druckt:
func
main
()
{
var
wg
sync
.
WaitGroup
done
:=
make
(
chan
interface
{})
defer
close
(
done
)
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
if
err
:=
printGreeting
(
done
);
err
!=
nil
{
fmt
.
Printf
(
"%v"
,
err
)
return
}
}()
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
if
err
:=
printFarewell
(
done
);
err
!=
nil
{
fmt
.
Printf
(
"%v"
,
err
)
return
}
}()
wg
.
Wait
()
}
func
printGreeting
(
done
<-
chan
interface
{})
error
{
greeting
,
err
:=
genGreeting
(
done
)
if
err
!=
nil
{
return
err
}
fmt
.
Printf
(
"%s world!\n"
,
greeting
)
return
nil
}
func
printFarewell
(
done
<-
chan
interface
{})
error
{
farewell
,
err
:=
genFarewell
(
done
)
if
err
!=
nil
{
return
err
}
fmt
.
Printf
(
"%s world!\n"
,
farewell
)
return
nil
}
func
genGreeting
(
done
<-
chan
interface
{})
(
string
,
error
)
{
switch
locale
,
err
:=
locale
(
done
);
{
case
err
!=
nil
:
return
""
,
err
case
locale
==
"EN/US"
:
return
"hello"
,
nil
}
return
""
,
fmt
.
Errorf
(
"unsupported locale"
)
}
func
genFarewell
(
done
<-
chan
interface
{})
(
string
,
error
)
{
switch
locale
,
err
:=
locale
(
done
);
{
case
err
!=
nil
:
return
""
,
err
case
locale
==
"EN/US"
:
return
"goodbye"
,
nil
}
return
""
,
fmt
.
Errorf
(
"unsupported locale"
)
}
func
locale
(
done
<-
chan
interface
{})
(
string
,
error
)
{
select
{
case
<-
done
:
return
""
,
fmt
.
Errorf
(
"canceled"
)
case
<-
time
.
After
(
1
*
time
.
Minute
):
}
return
"EN/US"
,
nil
}
Die Ausführung dieses Codes ergibt:
goodbye world! hello world!
Wenn wir die Wettlaufbedingung ignorieren (wir könnten uns verabschieden, bevor wir begrüßt werden!), können wir sehen, dass zwei Zweige unseres Programms gleichzeitig laufen. Wir haben die Standard-Vorkaufsmethode eingerichtet, indem wir einen done
Kanal erstellen und ihn durch unseren Call-Graphen weiterleiten. Wenn wir den Kanal done
an einem beliebigen Punkt in main
schließen, werden beide Zweige abgebrochen.
Durch die Einführung von Goroutinen in main
haben wir die Möglichkeit, dieses Programm auf verschiedene und interessante Arten zu steuern. Vielleicht wollen wir, dass genGreeting
eine Zeitüberschreitung verursacht, wenn es zu lange dauert. Vielleicht wollen wir nicht, dass genFarewell
locale
aufruft, wenn wir wissen, dass sein Elternteil bald abgebrochen wird. In jedem Stack-Frame kann eine Funktion den gesamten darunter liegenden Call Stack beeinflussen.
Mit dem Kanalmuster done
könnten wir dies erreichen, indem wir den eingehenden Kanal done
in andere Kanäle done
wickeln und dann zurückkehren, wenn einer von ihnen auslöst, aber wir hätten nicht die zusätzlichen Informationen über Fristen und Fehler, die uns ein Context
gibt.
Um den Vergleich zwischen dem done
Kanalmuster und der Verwendung des context
Pakets zu erleichtern, stellen wir dieses Programm als Baum dar. Jeder Knoten im Baum stellt einen Funktionsaufruf dar.
Ändern wir unser Programm so, dass wir das Paket context
anstelle eines done
Kanals verwenden. Da wir jetzt die Flexibilität eines context.Context
haben, können wir ein lustiges Szenario einführen.
Nehmen wir an, dass genGreeting
nur eine Sekunde warten will, bevor es den Aufruf an locale
abbricht - ein Timeout von einer Sekunde. Wir wollen auch eine intelligente Logik in main
einbauen. Wenn printGreeting
nicht erfolgreich ist, wollen wir auch unseren Aufruf an printFarewell
abbrechen. Schließlich macht es keinen Sinn, auf Wiedersehen zu sagen, wenn wir nicht auch Hallo sagen!
Die Umsetzung mit dem Paket context
ist trivial:
func
main
(
)
{
var
wg
sync
.
WaitGroup
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
(
)
)
defer
cancel
(
)
wg
.
Add
(
1
)
go
func
(
)
{
defer
wg
.
Done
(
)
if
err
:=
printGreeting
(
ctx
)
;
err
!=
nil
{
fmt
.
Printf
(
"cannot print greeting: %v\n"
,
err
)
cancel
(
)
}
}
(
)
wg
.
Add
(
1
)
go
func
(
)
{
defer
wg
.
Done
(
)
if
err
:=
printFarewell
(
ctx
)
;
err
!=
nil
{
fmt
.
Printf
(
"cannot print farewell: %v\n"
,
err
)
}
}
(
)
wg
.
Wait
(
)
}
func
printGreeting
(
ctx
context
.
Context
)
error
{
greeting
,
err
:=
genGreeting
(
ctx
)
if
err
!=
nil
{
return
err
}
fmt
.
Printf
(
"%s world!\n"
,
greeting
)
return
nil
}
func
printFarewell
(
ctx
context
.
Context
)
error
{
farewell
,
err
:=
genFarewell
(
ctx
)
if
err
!=
nil
{
return
err
}
fmt
.
Printf
(
"%s world!\n"
,
farewell
)
return
nil
}
func
genGreeting
(
ctx
context
.
Context
)
(
string
,
error
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
1
*
time
.
Second
)
defer
cancel
(
)
switch
locale
,
err
:=
locale
(
ctx
)
;
{
case
err
!=
nil
:
return
""
,
err
case
locale
==
"EN/US"
:
return
"hello"
,
nil
}
return
""
,
fmt
.
Errorf
(
"unsupported locale"
)
}
func
genFarewell
(
ctx
context
.
Context
)
(
string
,
error
)
{
switch
locale
,
err
:=
locale
(
ctx
)
;
{
case
err
!=
nil
:
return
""
,
err
case
locale
==
"EN/US"
:
return
"goodbye"
,
nil
}
return
""
,
fmt
.
Errorf
(
"unsupported locale"
)
}
func
locale
(
ctx
context
.
Context
)
(
string
,
error
)
{
select
{
case
<-
ctx
.
Done
(
)
:
return
""
,
ctx
.
Err
(
)
case
<-
time
.
After
(
1
*
time
.
Minute
)
:
}
return
"EN/US"
,
nil
}
Hier erstellt
main
ein neuesContext
mitcontext.Background()
und umhüllt es mitcontext.WithCancel
, um Stornierungen zu ermöglichen.In dieser Zeile bricht
main
dieContext
ab, wenn ein Fehler vonprintGreeting
zurückgegeben wird.Hier wickelt
genGreeting
seineContext
mitcontext.WithTimeout
ein. Dadurch wird das zurückgegebeneContext
nach 1 Sekunde automatisch annulliert und damit auch alle Kinder, an die esContext
weitergibt, nämlichlocale
.Diese Zeile gibt den Grund zurück, warum
Context
abgebrochen wurde. Dieser Fehler bläht sich bis zumain
auf, was den Abbruch bei verursacht.
Hier sind die Ergebnisse der Ausführung dieses Codes:
cannot print greeting: context deadline exceeded cannot print farewell: context canceled
Benutzen wir unser Aufrufdiagramm, um zu verstehen, was vor sich geht. Die Zahlen hier entsprechen den Codeaufrufen im vorangegangenen Beispiel.
An unserer Ausgabe sehen wir, dass das System perfekt funktioniert. Da wir sicherstellen, dass locale
mindestens eine Minute braucht, um zu laufen, wird unser Aufruf in genGreeting
immer eine Zeitüberschreitung haben, was bedeutet, dass main
den Aufruf unter printFarewell
immer abbrechen wird.
Beachte, dass genGreeting
in der Lage war, eine benutzerdefinierte context.Context
zu erstellen, die seinen Bedürfnissen entsprach, ohne die Context
seines Elternteils zu beeinflussen. Wenn genGreeting
erfolgreich zurückkehrte und printGreeting
einen weiteren Aufruf tätigen musste, konnte es dies tun, ohne Informationen über die Funktionsweise von genGreeting
preiszugeben. Diese Kompositionsfähigkeit ermöglicht es dir, große Systeme zu schreiben, ohne dass sich deine Anliegen in deinem Aufrufdiagramm vermischen.
Wir können dieses Programm noch weiter verbessern: Da wir wissen, dass locale
ungefähr eine Minute braucht, um zu laufen, können wir in locale
überprüfen, ob uns eine Frist gesetzt wurde, und wenn ja, ob wir sie einhalten werden. Dieses Beispiel zeigt, wie wir die context.Context
Methode Deadline
zu verwenden:
func
main
(
)
{
var
wg
sync
.
WaitGroup
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
(
)
)
defer
cancel
(
)
wg
.
Add
(
1
)
go
func
(
)
{
defer
wg
.
Done
(
)
if
err
:=
printGreeting
(
ctx
)
;
err
!=
nil
{
fmt
.
Printf
(
"cannot print greeting: %v\n"
,
err
)
cancel
(
)
}
}
(
)
wg
.
Add
(
1
)
go
func
(
)
{
defer
wg
.
Done
(
)
if
err
:=
printFarewell
(
ctx
)
;
err
!=
nil
{
fmt
.
Printf
(
"cannot print farewell: %v\n"
,
err
)
}
}
(
)
wg
.
Wait
(
)
}
func
printGreeting
(
ctx
context
.
Context
)
error
{
greeting
,
err
:=
genGreeting
(
ctx
)
if
err
!=
nil
{
return
err
}
fmt
.
Printf
(
"%s world!\n"
,
greeting
)
return
nil
}
func
printFarewell
(
ctx
context
.
Context
)
error
{
farewell
,
err
:=
genFarewell
(
ctx
)
if
err
!=
nil
{
return
err
}
fmt
.
Printf
(
"%s world!\n"
,
farewell
)
return
nil
}
func
genGreeting
(
ctx
context
.
Context
)
(
string
,
error
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
1
*
time
.
Second
)
defer
cancel
(
)
switch
locale
,
err
:=
locale
(
ctx
)
;
{
case
err
!=
nil
:
return
""
,
err
case
locale
==
"EN/US"
:
return
"hello"
,
nil
}
return
""
,
fmt
.
Errorf
(
"unsupported locale"
)
}
func
genFarewell
(
ctx
context
.
Context
)
(
string
,
error
)
{
switch
locale
,
err
:=
locale
(
ctx
)
;
{
case
err
!=
nil
:
return
""
,
err
case
locale
==
"EN/US"
:
return
"goodbye"
,
nil
}
return
""
,
fmt
.
Errorf
(
"unsupported locale"
)
}
func
locale
(
ctx
context
.
Context
)
(
string
,
error
)
{
if
deadline
,
ok
:=
ctx
.
Deadline
(
)
;
ok
{
if
deadline
.
Sub
(
time
.
Now
(
)
.
Add
(
1
*
time
.
Minute
)
)
<=
0
{
return
""
,
context
.
DeadlineExceeded
}
}
select
{
case
<-
ctx
.
Done
(
)
:
return
""
,
ctx
.
Err
(
)
case
<-
time
.
After
(
1
*
time
.
Minute
)
:
}
return
"EN/US"
,
nil
}
Hier prüfen wir, ob unser
Context
eine Frist angegeben hat. Wenn dies der Fall ist und die Uhr unseres Systems die Frist überschritten hat, kehren wir einfach mit einem speziellen Fehler zurück, der im Paketcontext
definiert ist:DeadlineExceeded
.
Obwohl der Unterschied in dieser Iteration des Programms gering ist, kann die Funktion locale
dadurch schnell fehlschlagen. In Programmen, bei denen der Aufruf des nächsten Teils der Funktion hohe Kosten verursacht, kann dies eine erhebliche Zeitersparnis bedeuten. Zumindest aber kann die Funktion sofort fehlschlagen, anstatt auf die tatsächliche Zeitüberschreitung warten zu müssen. Der einzige Haken an der Sache ist, dass du eine Vorstellung davon haben musst, wie lange dein untergeordneter Aufrufdiagramm dauern wird - eine Übung, die sehr schwierig sein kann.
Das bringt uns zu der anderen Hälfte dessen, was das Paket context
bietet: eine Datentasche für Context
zum Speichern und Abrufen von Daten, die von einer Anfrage abhängen. Erinnere dich daran, dass eine Funktion, die eine Goroutine und Context
erstellt, oft einen Prozess startet, der Anfragen bearbeitet, und dass Funktionen weiter unten im Stack Informationen über die Anfrage benötigen. Hier ist ein Beispiel dafür, wie man Daten in der Context
speichert und wie man sie abruft:
func
main
()
{
ProcessRequest
(
"jane"
,
"abc123"
)
}
func
ProcessRequest
(
userID
,
authToken
string
)
{
ctx
:=
context
.
WithValue
(
context
.
Background
(),
"userID"
,
userID
)
ctx
=
context
.
WithValue
(
ctx
,
"authToken"
,
authToken
)
HandleResponse
(
ctx
)
}
func
HandleResponse
(
ctx
context
.
Context
)
{
fmt
.
Printf
(
"handling response for %v (%v)"
,
ctx
.
Value
(
"userID"
),
ctx
.
Value
(
"authToken"
),
)
}
Das ergibt:
handling response for jane (abc123)
Ziemlich einfaches Zeug. Die einzigen Voraussetzungen sind, dass:
-
Der Schlüssel, den du verwendest, muss Go's Vorstellung von Vergleichbarkeit entsprechen; das heißt, die Gleichheitsoperatoren
==
und!=
müssen korrekte Ergebnisse liefern, wenn sie verwendet werden. -
Auf die zurückgegebenen Werte muss von mehreren Goroutinen aus sicher zugegriffen werden können.
Da sowohl der Schlüssel als auch der Wert von Context
als interface{}
definiert sind, verlieren wir die Typsicherheit von Go, wenn wir versuchen, Werte abzurufen. Der Schlüssel könnte von einem anderen Typ sein oder sich leicht von dem Schlüssel unterscheiden, den wir angeben. Der Wert könnte von einem anderen Typ sein, als wir erwarten. Aus diesen Gründen empfehlen die Go-Autoren, beim Speichern und Abrufen von Werten in Context
einige Regeln zu beachten.
Zunächst wird empfohlen, einen eigenen Schlüsseltyp in deinem Paket zu definieren. Solange andere Pakete das Gleiche tun, verhindert dies Kollisionen innerhalb der Context
. Zur Erinnerung: Schauen wir uns ein kurzes Programm an, das versucht, Schlüssel in einer Map zu speichern, die zwar unterschiedliche Typen, aber den gleichen Wert haben:
type
foo
int
type
bar
int
m
:=
make
(
map
[
interface
{}]
int
)
m
[
foo
(
1
)]
=
1
m
[
bar
(
1
)]
=
2
fmt
.
Printf
(
"%v"
,
m
)
Das ergibt:
map[1:1 1:2]
Du siehst, dass die zugrundeliegenden Werte zwar dieselben sind, aber durch die unterschiedlichen Typinformationen innerhalb einer Map unterschieden werden. Da der Typ, den du für die Schlüssel deines Pakets definierst, nicht exportiert wird, können andere Pakete nicht mit den Schlüsseln kollidieren, die du innerhalb deines Pakets erzeugst.
Da wir die Schlüssel, die wir zum Speichern der Daten verwenden, nicht exportieren, müssen wir Funktionen exportieren, die die Daten für uns abrufen. Das ist eine gute Lösung, denn so können die Verbraucher dieser Daten statische, typsichere Funktionen verwenden.
Wenn du das alles zusammennimmst, erhältst du etwa das folgende Beispiel:
func
main
()
{
ProcessRequest
(
"jane"
,
"abc123"
)
}
type
ctxKey
int
const
(
ctxUserID
ctxKey
=
iota
ctxAuthToken
)
func
UserID
(
c
context
.
Context
)
string
{
return
c
.
Value
(
ctxUserID
).(
string
)
}
func
AuthToken
(
c
context
.
Context
)
string
{
return
c
.
Value
(
ctxAuthToken
).(
string
)
}
func
ProcessRequest
(
userID
,
authToken
string
)
{
ctx
:=
context
.
WithValue
(
context
.
Background
(),
ctxUserID
,
userID
)
ctx
=
context
.
WithValue
(
ctx
,
ctxAuthToken
,
authToken
)
HandleResponse
(
ctx
)
}
func
HandleResponse
(
ctx
context
.
Context
)
{
fmt
.
Printf
(
"handling response for %v (auth: %v)"
,
UserID
(
ctx
),
AuthToken
(
ctx
),
)
}
Die Ausführung dieses Codes ergibt:
handling response for jane (auth: abc123)
Jetzt haben wir eine typsichere Methode, um Werte aus dem Context
abzurufen, und - wenn die Verbraucher in einem anderen Paket wären - wüssten sie nicht, welche Schlüssel verwendet wurden, um die Informationen zu speichern, und es wäre ihnen auch egal. Diese Technik wirft jedoch ein Problem auf.
Nehmen wir an, das Paket HandleResponse
befände sich in einem anderen Paket namens response
, und das Paket ProcessRequest
in einem Paket namens process
. Das Paket process
müsste das Paket response
importieren, um den Aufruf von HandleResponse
zu tätigen, aber HandleResponse
hätte keine Möglichkeit, auf die im Paket process
definierten Zugriffsfunktionen zuzugreifen, da der Import von process
eine zirkuläre Abhängigkeit bilden würde. Da die Typen, die zum Speichern der Schlüssel in Context
verwendet werden, für das Paket process
privat sind, hat das Paket response
keine Möglichkeit, diese Daten abzurufen!
Das zwingt die Architektur dazu, Pakete zu erstellen, die sich um Datentypen drehen, die von verschiedenen Orten importiert werden. Das ist sicherlich nichts Schlechtes, aber es ist etwas, das man beachten sollte.
Das Paket context
ist ziemlich nett, aber es wurde nicht einheitlich gelobt. Innerhalb der Go-Gemeinschaft ist das Paket context
etwas umstritten. Der Annullierungsaspekt des Pakets wurde ziemlich gut aufgenommen, aber die Möglichkeit, beliebige Daten in einem Context
zu speichern, und die typunsichere Art, in der die Daten gespeichert werden, haben zu einigen Meinungsverschiedenheiten geführt. Obwohl wir die fehlende Typsicherheit mit unseren Accessor-Funktionen teilweise behoben haben, können wir immer noch Fehler durch das Speichern falscher Typen einführen. Das größere Problem ist jedoch definitiv die Art dessen , was Entwickler in Instanzen von Context
speichern sollten.
Die am weitesten verbreitete Anleitung, was angemessen ist, ist dieser etwas zweideutige Kommentar im context
Paket:
Use context values only for request-scoped data that transits processes and API boundaries, not for passing optional parameters to functions.
Es ist ziemlich klar, was ein optionaler Parameter ist (du solltest nicht Context
benutzen, um deinen geheimen Wunsch zu erfüllen, dass Go optionale Parameter unterstützt), aber was sind "Request-scoped data"? Es heißt, dass sie "Prozesse und API-Grenzen überschreiten", aber das könnte viele Dinge beschreiben. Der beste Weg, das zu definieren, ist, gemeinsam mit deinem Team einige Heuristiken zu entwickeln und sie in Codeüberprüfungen zu bewerten. Hier sind meine Heuristiken:
- 1) Die Daten sollten Prozess- oder API-Grenzen überschreiten.
-
Wenn du die Daten im Speicher deines Prozesses generierst, sind sie wahrscheinlich kein guter Kandidat für anforderungsspezifische Daten, es sei denn, du übergibst sie auch über eine API-Grenze.
- 2) Die Daten sollten unveränderlich sein.
-
Wenn das nicht der Fall ist, stammt das, was du speicherst, per Definition nicht aus der Anfrage.
- 3) Die Daten sollten zu einfachen Typen tendieren.
-
Wenn anforderungsspezifische Daten Prozess- und API-Grenzen überschreiten sollen, ist es für die andere Seite viel einfacher, diese Daten herauszuziehen, wenn sie nicht auch noch einen komplexen Graphen von Paketen importieren muss.
- 4) Die Daten sollten Daten sein, keine Typen mit Methoden.
-
Operationen sind logisch und gehören zu den Dingen, die diese Daten verbrauchen.
- 5) Die Daten sollten bei der Gestaltung von Abläufen helfen, nicht sie steuern.
-
Wenn sich dein Algorithmus unterschiedlich verhält, je nachdem, was in seinen Parametern enthalten ist oder nicht, hast du wahrscheinlich den Bereich der optionalen Parameter überschritten.
Context
enthält, hast du wahrscheinlich das Gebiet der optionalen Parameter betreten.
Dies sind keine festen Regeln, sondern Heuristiken. Wenn du jedoch feststellst, dass die Daten, die du auf Context
speicherst, gegen alle fünf Richtlinien verstoßen, solltest du dir deine Entscheidung gut überlegen.
Ein weiterer Aspekt ist die Frage, wie viele Ebenen die Daten durchlaufen müssen, bevor sie genutzt werden können. Wenn zwischen der Annahme der Daten und ihrer Verwendung ein paar Frameworks und Dutzende von Funktionen liegen, möchtest du dann eher ausführliche, selbstdokumentierende Funktionssignaturen verwenden und die Daten als Parameter hinzufügen? Oder würdest du sie lieber in Context
platzieren und damit eine unsichtbare Abhängigkeit schaffen? Jeder Ansatz hat seine Vorzüge, und am Ende müssen du und dein Team diese Entscheidung treffen.
Selbst mit diesen Heuristiken bleibt die Frage, ob es sich bei einem Wert um anforderungsspezifische Daten handelt oder nicht, schwierig zu beantworten. Wirf einen Blick auf die folgende Tabelle. Sie listet meine Meinung darüber auf, ob die einzelnen Datentypen die fünf Heuristiken erfüllen. Stimmst du mir zu?
Daten | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|
ID anfordern |
✓ |
✓ |
✓ |
✓ |
✓ |
Benutzer-ID |
✓ |
✓ |
✓ |
✓ |
|
URL |
✓ |
✓ |
|||
API Server Verbindung |
|||||
Autorisierungs-Token |
✓ |
✓ |
✓ |
✓ |
|
Token anfordern |
✓ |
✓ |
✓ |
Manchmal ist es klar, dass etwas nicht in einem Kontext gespeichert werden sollte, wie bei API-Serververbindungen, aber manchmal ist es nicht so klar. Was ist mit einem Autorisierungs-Token? Es ist unveränderlich und besteht wahrscheinlich nur aus ein paar Bytes, aber werden die Empfänger dieser Daten es nicht verwenden, um zu entscheiden, ob sie die Anfrage bearbeiten sollen? Gehören diese Daten in einen Kontext? Was für das eine Team akzeptabel ist, kann für ein anderes nicht akzeptabel sein, um die Sache noch komplizierter zu machen.
Letztlich gibt es hier keine einfachen Antworten. Da das Paket in die Standardbibliothek aufgenommen wurde, musst du dir eine Meinung über seine Verwendung bilden, aber diese Meinung könnte (und sollte wahrscheinlich) sich ändern, je nachdem, mit welchem Projekt du zu tun hast. Der letzte Rat, den ich dir mit auf den Weg geben möchte, ist, dass die Stornofunktionalität, die von Context
sehr nützlich ist, und deine Meinung über die Datentasche sollte dich nicht davon abhalten, sie zu benutzen.
Zusammenfassung
In diesem Kapitel haben wir eine Menge Stoff behandelt. Wir haben die Gleichzeitigkeits-Primitive von Go zu Mustern kombiniert, die dabei helfen, wartbaren gleichzeitigen Code zu schreiben. Jetzt, da du mit diesen Mustern vertraut bist, können wir besprechen, wie wir diese Muster in andere Muster einbinden können, die dir helfen, große Systeme zu schreiben. Das nächste Kapitel gibt dir einen Überblick über Techniken, mit denen du genau das tun kannst.
1 Ich ignoriere die Möglichkeit, den Speicher manuell über das Paket unsafe
zu manipulieren. Es heißt nicht umsonst unsafe
!
2 Im Kontext von Sprachen bedeutet Verdinglichung, dass die Sprache ein Konzept für die Entwickler offenlegt, damit sie direkt damit arbeiten können. Funktionen in Go werden als verifiziert bezeichnet, weil du Variablen definieren kannst, die den Typ einer Funktionssignatur haben. Das bedeutet auch, dass du Funktionen in deinem Programm weitergeben kannst.
Get Gleichzeitigkeit in Go now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.