Kapitel 1. Grundlagen der Rust-Gleichzeitigkeit
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Lange bevor Multi-Core-Prozessoren gang und gäbe waren, ermöglichten es die Betriebssysteme, dass auf einem einzigen Computer viele Programme gleichzeitig laufen. Dies wird erreicht, indem schnell zwischen den Prozessen gewechselt wird, so dass jeder Prozess immer wieder ein kleines Stückchen vorankommt. Heutzutage haben fast alle unsere Computer und sogar unsere Handys und Uhren Prozessoren mit mehreren Kernen, die wirklich mehrere Prozesseparallel ausführen können.
Betriebssysteme isolieren Prozesse so weit wie möglich voneinander, so dass ein Programm seine Arbeit tun kann, ohne zu wissen, was andere Prozesse tun.Ein Prozess kann zum Beispiel normalerweise nicht auf den Speicher eines anderen Prozesses zugreifen oder mit ihm kommunizieren, ohne den Kernel des Betriebssystems zu fragen.
Ein Programm kann jedoch als Teil desselben Prozesses zusätzliche Threads zur Ausführung erzeugen. Threads innerhalb desselben Prozesses sind nicht voneinander isoliert. Threads teilen sich den Speicher und können über diesen Speicher miteinander interagieren.
In diesem Kapitel wird erklärt, wie Threads in Rust erzeugt werden und welche grundlegenden Konzepte es gibt, z.B. wie man Daten sicher zwischen mehreren Threads austauscht. Die Konzepte, die in diesem Kapitel erklärt werden, sind die Grundlage für den Rest des Buches.
Hinweis
Wenn du mit diesen Teilen von Rust bereits vertraut bist, kannst du einfach weiterspringen. Bevor du jedoch mit den nächsten Kapiteln weitermachst, solltest du sicherstellen, dass du ein gutes Verständnis von Threads, innerer Veränderbarkeit, Send
und Sync
hast und weißt, was eine Mutex, eine Bedingungsvariable und Thread-Parking sind.
Fäden in Rust
Jedes Programm startet mit genau einem Thread: dem Hauptthread. Dieser Thread führt deine main
Funktion aus und kann bei Bedarf weitere Threads spawnen.
In Rust werden neue Threads mit der Funktion std::thread::spawn
aus der Standardbibliothek erzeugt. Sie benötigt ein einziges Argument: die Funktion, die der neue Thread ausführen soll. Der Thread stoppt, sobald diese Funktion zurückkehrt.
Schauen wir uns ein Beispiel an:
use
std
::thread
;
fn
main
()
{
thread
::spawn
(
f
);
thread
::spawn
(
f
);
println!
(
"Hello from the main thread."
);
}
fn
f
()
{
println!
(
"Hello from another thread!"
);
let
id
=
thread
::current
().
id
();
println!
(
"This is my thread id: {id:?}"
);
}
Wir legen zwei Threads an, die beide f
als Hauptfunktion ausführen. Beide Threads geben eine Nachricht aus und zeigen ihre Thread-ID an, während der Hauptthread auch seine eigene Nachricht ausgibt.
Wenn du unser obiges Beispielprogramm mehrmals ausführst, wirst du feststellen, dass die Ausgabe von Lauf zu Lauf variiert. Das ist die Ausgabe, die ich bei einembestimmten Lauf auf meinem Rechner erhalten habe:
Hello from the main thread. Hello from another thread! This is my thread id:
Überraschenderweise scheint ein Teil der Ausgabe zu fehlen.
Was hier passiert ist, ist, dass der Hauptthread die Funktion main
beendet hat, bevor die neu gespawnten Threads ihre Funktionen ausgeführt haben.
Wenn du von main
zurückkehrst, wird das gesamte Programm beendet, auch wenn andere Threads noch laufen.
In diesem Beispiel hatte einer der neu gestarteten Threads gerade genug Zeit, um bis zur Hälfte der zweiten Nachricht zu kommen, bevor das Programm vom Hauptthread beendet wurde.
Wenn wir sicherstellen wollen, dass die Threads fertig sind, bevor wir von main
zurückkehren, können wir auf sie warten, indem wir sie verbinden. Dazu müssen wir JoinHandle
verwenden, das von der Funktion spawn
zurückgegeben wird:
fn
main
()
{
let
t1
=
thread
::spawn
(
f
);
let
t2
=
thread
::spawn
(
f
);
println!
(
"Hello from the main thread."
);
t1
.
join
().
unwrap
();
t2
.
join
().
unwrap
();
}
Die Methode .join()
wartet, bis der Thread seine Ausführung beendet hat und gibt std::thread::Result
zurück. Wenn der Thread seine Funktion nicht erfolgreich beendet hat, weil er in Panik geraten ist, enthält dies die Panikmeldung. Wir könnten versuchen, diese Situation zu bewältigen, oder einfach .unwrap()
aufrufen, um in Panik zu geraten, wenn wir einem in Panik geratenen Thread beitreten.
Wenn du diese Version unseres Programms ausführst, wird die Ausgabe nicht mehr abgeschnitten:
Hello from the main thread. Hello from another thread! This is my thread id: ThreadId(3) Hello from another thread! This is my thread id: ThreadId(2)
Das Einzige, was sich zwischen den Durchläufen noch ändert, ist die Reihenfolge, in der die Nachrichtengedruckt werden:
Hello from the main thread. Hello from another thread! Hello from another thread! This is my thread id: ThreadId(2) This is my thread id: ThreadId(3)
Anstatt den Namen einer Funktion an std::thread::spawn
zu übergeben, wie in unserem Beispiel oben, ist es viel üblicher, ihr eine Closure zu übergeben.
So können wir Werte erfassen, die in den neuen Thread verschoben werden:
let
numbers
=
vec!
[
1
,
2
,
3
];
thread
::spawn
(
move
||
{
for
n
in
&
numbers
{
println!
(
"{n}"
);
}
}).
join
().
unwrap
();
Hier wird der Besitz von numbers
an den neu gestarteten Thread übertragen, da wir eine move
Closure verwendet haben. Hätten wir das Schlüsselwort move
nicht verwendet, hätte die Closure numbers
per Referenz erfasst. Dies hätte zu einem Compilerfehler geführt, da der neue Thread diese Variable überleben könnte.
Da ein Thread bis zum Ende der Programmausführung laufen kann, hat die Funktion spawn
eine 'static
Lebensdauer, die an ihren Argumenttyp gebunden ist.
Mit anderen Worten, es werden nur Funktionen akzeptiert, die für immer bestehen bleiben können. Eine Closure, die eine lokale Variable per Referenz erfasst, kann nicht für immer bestehen bleiben, da diese Referenz ungültig wird, sobald die lokale Variable nicht mehr existiert.
Um einen Wert aus dem Thread zurückzubekommen, musst du ihn aus der Closure zurückgeben. Diesen Rückgabewert erhältst du über die Result
, die von der Methode join
zurückgegeben wird:
let
numbers
=
Vec
::
from_iter
(
0
..=
1000
)
;
let
t
=
thread
::
spawn
(
move
|
|
{
let
len
=
numbers
.
len
(
)
;
let
sum
=
numbers
.
iter
(
)
.
sum
:
:
<
usize
>
(
)
;
sum
/
len
}
)
;
let
average
=
t
.
join
(
)
.
unwrap
(
)
;
println!
(
"
average: {average}
"
)
;
Hier wird der Wert, der von der Schließung des Threads () zurückgegebene Wert wird über die Methode join
an den Hauptthread zurückgeschickt ().
Wäre numbers
leer gewesen, wäre der Thread in Panik geraten, als er versuchte, durch Null zu dividieren (), und join
hätte stattdessen diese Panikmeldung zurückgegeben, was dazu geführt hätte, dass der Hauptthread aufgrund von unwrap
ebenfalls in Panik geraten wäre ().
Eingeschränkte Fäden
Wenn wir sicher wissen, dass ein gespawnter Thread einen bestimmten Bereich definitiv nicht überleben wird, kann dieser Thread sicher Dinge ausleihen, die nicht ewig leben, wie z.B. lokale Variablen, solange sie diesen Bereich überdauern.
Die Rust-Standardbibliothek stellt die Funktion std::thread::scope
zur Verfügung, mit der wir solche Scoped-Threads erzeugen können. Sie ermöglicht es uns, Threads zu erzeugen, die den Gültigkeitsbereich der Closure, die wir an die Funktion übergeben, nicht überschreiten können.
Wie das funktioniert, lässt sich am besten anhand eines Beispiels zeigen:
let
numbers
=
vec!
[
1
,
2
,
3
]
;
thread
::
scope
(
|
s
|
{
s
.
spawn
(
|
|
{
println!
(
"
length: {}
"
,
numbers
.
len
(
)
)
;
}
)
;
s
.
spawn
(
|
|
{
for
n
in
&
numbers
{
println!
(
"
{n}
"
)
;
}
}
)
;
}
)
;
Wir rufen die Funktion
std::thread::scope
mit einer Closure auf. Unsere Closure wird direkt ausgeführt und erhält ein Argument,s
, das den Geltungsbereich darstellt.Wir verwenden
s
, um Threads zu starten.Die Closures können lokale Variablen wienumbers
ausleihen.Wenn der Geltungsbereich endet, werden alle Threads, die noch nicht verbunden wurden, automatisch verbunden.
Dieses Muster garantiert, dass keiner der im Scope gespawnten Threads den Scope überleben kann. Aus diesem Grund hat diese scoped spawn
Methode keine 'static
Begrenzung auf ihren Argumenttyp, so dass wir alles referenzieren können, solange es den Scope überlebt, wie z.B. numbers
.
Im obigen Beispiel greifen beide neuen Threads gleichzeitig auf numbers
zu. Das ist in Ordnung, weil keiner von ihnen (und auch nicht der Hauptthread) den Thread verändert. Wenn wir den ersten Thread so ändern würden, dass er numbers
verändert, wie unten gezeigt, würde der Compiler nicht zulassen, dass wir einen weiteren Thread erzeugen, der ebenfalls numbers
verwendet:
let
mut
numbers
=
vec!
[
1
,
2
,
3
];
thread
::scope
(
|
s
|
{
s
.
spawn
(
||
{
numbers
.
push
(
1
);
});
s
.
spawn
(
||
{
numbers
.
push
(
2
);
// Error!
});
});
Die genaue Fehlermeldung hängt von der Version des Rust-Compilers ab, da dieser oft verbessert wurde, um bessere Diagnosen zu erstellen, aber wenn du versuchst, den obigen Code zu kompilieren, erhältst du etwa die folgende Meldung:
error[E0499]: cannot borrow `numbers` as mutable more than once at a time --> example.rs:7:13 | 4 | s.spawn(|| { | -- first mutable borrow occurs here 5 | numbers.push(1); | ------- first borrow occurs due to use of `numbers` in closure | 7 | s.spawn(|| { | ^^ second mutable borrow occurs here 8 | numbers.push(2); | ------- second borrow occurs due to use of `numbers` in closure
Geteiltes Eigentum und Referenzzählung
Bisher haben wir uns angeschaut, wie man das Eigentum an einem Wert mit Hilfe einer move
Closure ("Threads in Rust") auf einen Thread überträgt und wie man Daten von länger lebenden Eltern-Threads ausleiht ("Scoped Threads"). Wenn zwei Threads Daten gemeinsam nutzen, bei denen keiner der beiden garantiert überlebt, kann keiner der beiden der Eigentümer dieser Daten sein. Alle Daten, die sie gemeinsam nutzen, müssen so lange leben wie der am längsten lebende Thread.
Statik
Es gibt mehrere Möglichkeiten, etwas zu erstellen, das nicht einem einzelnen Thread gehört. Die einfachste ist ein static
Wert, der dem gesamten Programm und nicht einem einzelnen Thread "gehört". Im folgenden Beispiel können beide Threads auf X
zugreifen, aber keiner von ihnen besitzt ihn:
static
X
:[
i32
;
3
]
=
[
1
,
2
,
3
];
thread
::spawn
(
||
dbg!
(
&
X
));
thread
::spawn
(
||
dbg!
(
&
X
));
Ein static
Element hat einen konstanten Initialisierer, wird nie fallen gelassen und ist bereits vorhanden, bevor die Hauptfunktion des Programms überhaupt startet. Jeder Thread kann es ausleihen, da es garantiert immer vorhanden ist.
Auslaufende
Eine andere Möglichkeit, den Besitz zu teilen, ist das Lecken einer Zuweisung. Mit Box::leak
kann man den Besitz einer Box
freigeben und versprechen, sie nie wieder fallen zu lassen. Von diesem Zeitpunkt an lebt Box
für immer, ohne einen Besitzer, und kann von jedem Thread ausgeliehen werden, solange das Programm läuft.
let
x
:&
'
static
[
i32
;
3
]
=
Box
::leak
(
Box
::new
([
1
,
2
,
3
]));
thread
::spawn
(
move
||
dbg!
(
x
));
thread
::spawn
(
move
||
dbg!
(
x
));
Durch die move
Closure sieht es vielleicht so aus, als würden wir das Eigentum an den Threads übertragen, aber ein genauerer Blick auf den Typ von x
zeigt, dass wir den Threads nur eine Referenz auf die Daten geben.
Tipp
Referenzen sind Copy
, d.h. wenn du sie "verschiebst", bleibt das Original erhalten, genau wie bei einem Integer oder Boolean.
Beachte, dass die 'static
Lebensdauer nicht bedeutet, dass der Wert seit Beginn des Programms lebt, sondern nur, dass er bis zum Ende des Programms lebt. Die Vergangenheit ist einfach nicht relevant.
Der Nachteil von Box
ist, dass wir Speicher lecken. Wir weisen etwas zu, aber geben es nie wieder frei. Das kann in Ordnung sein, wenn es nur eine begrenzte Anzahl von Malen passiert. Aber wenn wir das immer wieder tun, wird dem Programm langsam der Speicher ausgehen.
Referenzzählung
Um sicherzustellen, dass gemeinsam genutzte Daten gelöscht und freigegeben werden, können wir den Besitz nicht vollständig aufgeben. Stattdessen können wir den Besitz teilen. Indem wir die Anzahl der Besitzer im Auge behalten, können wir sicherstellen, dass der Wert nur gelöscht wird, wenn es keine Besitzer mehr gibt.
Die Rust-Standardbibliothek bietet diese Funktionalität über den Typ std::rc::Rc
, kurz für "reference counted". Er ist einem Box
sehr ähnlich, mit dem Unterschied, dass beim Klonen kein neuer Wert zugewiesen wird, sondern ein Zähler, der neben dem enthaltenen Wert gespeichert ist, inkrementiert wird. Sowohl das Original als auch der geklonte Rc
verweisen auf dieselbe Zuweisung; sie teilen sich den Besitz.
use
std
::rc
::Rc
;
let
a
=
Rc
::new
([
1
,
2
,
3
]);
let
b
=
a
.
clone
();
assert_eq!
(
a
.
as_ptr
(),
b
.
as_ptr
());
// Same allocation!
Wenn du eine Rc
fallen lässt, wird der Zähler dekrementiert. Nur die letzte Rc
, bei der der Zähler auf Null fällt, ist diejenige, die die enthaltenen Daten fallen lässt und freigibt.
Wenn wir jedoch versuchen würden, eine Rc
an einen anderen Thread zu senden, würden wir auf den folgenden Compilerfehler stoßen:
error[E0277]: `Rc` cannot be sent between threads safely | 8 | thread::spawn(move || dbg!(b)); | ^^^^^^^^^^^^^^^
Wie sich herausstellte, ist Rc
nicht thread-sicher (mehr dazu in "Thread Safety: Send and Sync"). Wenn mehrere Threads eine Rc
auf dieselbe Allokation hätten, könnten sie versuchen, den Referenzzähler gleichzeitig zu ändern, was zu unvorhersehbaren Ergebnissen führen kann.
Stattdessen können wir std::sync::Arc
verwenden, was für "atomically reference counted" steht. Es ist identisch mit Rc
, außer dass es garantiert, dass Änderungen am Referenzzähler unteilbare atomare Operationen sind, was es sicher macht, es mit mehreren Threads zu verwenden. (Mehr dazu in Kapitel 2.)
use
std
::
sync
::
Arc
;
let
a
=
Arc
::
new
(
[
1
,
2
,
3
]
)
;
let
b
=
a
.
clone
(
)
;
thread
::
spawn
(
move
|
|
dbg!
(
a
)
)
;
thread
::
spawn
(
move
|
|
dbg!
(
b
)
)
;
Wir setzen ein Array in eine neue Zuordnung zusammen mit einem Referenzzähler, der bei eins beginnt.
Durch das Klonen von
Arc
wird die Anzahl der Referenzen auf zwei erhöht und wir erhalten eine zweiteArc
für dieselbe Zuordnung.Beide Threads erhalten ihre eigene
Arc
, über die sie auf das gemeinsame Array zugreifen können. Beide dekrementieren den Referenzzähler, wenn sie ihreArc
ablegen. Der letzte Thread, der seineArc
ablegt, sieht den Zähler auf Null fallen und ist derjenige, der das Array ablegt und deallokiert.
Da das Eigentum gemeinsam genutzt wird, haben referenzzählende Zeiger (Rc<T>
und Arc<T>
) die gleichen Einschränkungen wie gemeinsam genutzte Referenzen (&T
). Sie geben dir keinen veränderbaren Zugriff auf den enthaltenen Wert, da der Wertgleichzeitig von anderem Code ausgeliehen werden könnte.
Wenn wir zum Beispiel versuchen würden, die Ganzzahlenslice in einer Arc<[i32]>
zu sortieren, würde der Compiler uns davon abhalten und uns sagen, dass wirdie Daten nicht verändern dürfen:
error[E0596]: cannot borrow data in an `Arc` as mutable | 6 | a.sort(); | ^^^^^^^^
Ausleihen und Datenwettläufe
In Rust können Werte auf zwei Arten ausgeliehen werden:
- Unveränderliche Kreditaufnahme
-
Wenn du etwas mit
&
ausleihst, erhältst du eine unveränderliche Referenz. Eine solche Referenz kann kopiert werden. Der Zugriff auf die Daten, auf die sie verweist, wird von allen Kopien einer solchen Referenz gemeinsam genutzt. Wie der Name schon sagt, lässt der Compiler normalerweise nicht zu, dass du etwas durch eine solche Referenz änderst, da sich das auf anderen Code auswirken könnte, der gerade dieselben Daten ausleiht. - Mutable borrowing
-
Wenn du etwas mit
&mut
ausleihst, erhältst du eine veränderbare Referenz. Ein veränderbares Ausleihen garantiert, dass es das einzige aktive Ausleihen dieser Daten ist. Dadurch wird sichergestellt, dass das Ändern der Daten nichts ändert, was andere Codes gerade betrachten.
Diese beiden Konzepte zusammen verhindern Datenwettläufe: Situationen, in denen ein Thread Daten verändert, während ein anderer gleichzeitig darauf zugreift. Datenwettläufe sind in der Regel ein undefiniertes Verhalten, was bedeutet, dass der Compiler diese Situationen nicht zu berücksichtigen braucht. Er geht einfach davon aus, dass sie nicht vorkommen.
Um zu verdeutlichen, was das bedeutet, schauen wir uns ein Beispiel an, bei dem der Compiler mithilfe der Borrowing-Regeln eine sinnvolle Annahme treffen kann:
fn
f
(
a
:&
i32
,
b
:&
mut
i32
)
{
let
before
=
*
a
;
*
b
+=
1
;
let
after
=
*
a
;
if
before
!=
after
{
x
();
// never happens
}
}
Hier erhalten wir einen unveränderlichen Verweis auf eine Ganzzahl und speichern den Wert der Ganzzahl sowohl vor als auch nach der Inkrementierung der Ganzzahl, auf die b
verweist. Der Compiler kann davon ausgehen, dass die grundlegenden Regeln für Ausleihen und Datenrennen eingehalten werden, was bedeutet, dass b
unmöglich auf dieselbe Ganzzahl verweisen kann wie a
.
Tatsächlich kann nichts im gesamten Programm die Ganzzahl, auf die sich a
bezieht, verändern, solange a
sie ausleiht. Daher kann der Compiler leicht feststellen, dass sich *a
nicht ändert und die Bedingung der Anweisung if
niemals wahr sein wird, und kann den Aufruf von x
als Optimierung vollständig aus dem Programm entfernen.
Es ist unmöglich, ein Rust-Programm zu schreiben, das gegen die Annahmen des Compilers verstößt, es sei denn, du verwendest einen unsafe
Block, um einige der Sicherheitsüberprüfungen des Compilers zu deaktivieren.
Innere Wandlungsfähigkeit
Die im vorigen Abschnitt vorgestellten Regeln für das Ausleihen von Daten sind zwar einfach, können aber sehr einschränkend sein - vor allem, wenn mehrere Threads beteiligt sind. Wenn du diese Regeln befolgst, ist die Kommunikation zwischen den Threads extrem eingeschränkt und fast unmöglich, da keine Daten geändert werden können, auf die mehrere Threads Zugriff haben.
Zum Glück gibt es einen Ausweg: die interne Mutabilität. Ein Datentyp mit interner Mutabilität biegt die Regeln für das Borgen leicht um. Unter bestimmten Bedingungen können diese Typen eine Mutation durch eine "unveränderliche" Referenz erlauben.
In "Referenzzählung" haben wir bereits ein subtiles Beispiel für die innere Veränderbarkeit gesehen. Sowohl Rc
als auch Arc
verändern einen Referenzzähler, auch wenn es mehrere Klone gibt, die alle denselben Referenzzähler verwenden.
Sobald es um innere veränderliche Typen geht, wird die Bezeichnung "unveränderlich" oder "veränderlich" verwirrend und ungenau, da manche Dinge durch beide verändert werden können. Die genaueren Begriffe sind "gemeinsam genutzt" und "exklusiv": Eine gemeinsam genutzte Referenz (&T
) kann kopiert und mit anderen geteilt werden, während eine exklusive Referenz (&mut T
) garantiert, dass sie die einzige exklusive Ausleihe dieses T
ist.
Bei den meisten Typen erlauben gemeinsame Referenzen keine Mutation, aber es gibt Ausnahmen. Da wir in diesem Buch hauptsächlich mit diesen Ausnahmen arbeiten werden, verwenden wir im weiteren Verlauf des Buches die genaueren Begriffe.
Vorsicht
Bedenke, dass die innere Mutabilität nur die Regeln für gemeinsames Borgen verbiegt, um Mutation bei gemeinsamer Nutzung zu ermöglichen. Sie ändert nichts am exklusiven Borgen. Exklusives Borgen garantiert immer noch, dass es keine anderen aktiven Borgen gibt. Unsicherer Code, der zu mehr als einer aktiven exklusiven Referenz auf etwas führt, ruft immer undefiniertes Verhalten hervor, unabhängig von der inneren Mutabilität.
Schauen wir uns ein paar Typen mit innerer Veränderbarkeit an und wie sie Mutationen durch gemeinsame Referenzen ermöglichen können, ohne undefiniertes Verhalten zu verursachen.
Zelle
Ein std::cell::Cell<T>
umhüllt einfach ein T
, erlaubt aber Mutationen durch eine gemeinsame Referenz. Um undefiniertes Verhalten zu vermeiden, erlaubt es nur das Kopieren des Wertes nach außen (wenn T
Copy
ist) oder das Ersetzen durch einen anderen Wert als Ganzes. Außerdem kann es nur innerhalb eines einzigen Threads verwendet werden.
Schauen wir uns ein ähnliches Beispiel an wie das im vorherigen Abschnitt, aber diesmal mit Cell<i32>
statt i32
:
use
std
::cell
::Cell
;
fn
f
(
a
:&
Cell
<
i32
>
,
b
:&
Cell
<
i32
>
)
{
let
before
=
a
.
get
();
b
.
set
(
b
.
get
()
+
1
);
let
after
=
a
.
get
();
if
before
!=
after
{
x
();
// might happen
}
}
Anders als beim letzten Mal ist es jetzt möglich, dass die Bedingung if
wahr ist. Da Cell<i32>
intern veränderbar ist, kann der Compiler nicht mehr davon ausgehen, dass sich sein Wert nicht ändert, solange wir einen gemeinsamen Verweis darauf haben. Sowohl a
als auch b
könnten sich auf denselben Wert beziehen, so dass sich eine Veränderung durch b
auch auf a
auswirken könnte. Er kann jedoch weiterhin davon ausgehen, dass keine anderen Threads gleichzeitig auf die Zellen zugreifen.
Die Beschränkungen einer Cell
sind nicht immer einfach zu handhaben. Da wir den Wert, den sie enthält, nicht direkt ausleihen können, müssen wir einen Wert hinausschieben (und etwas an seiner Stelle lassen), ihn ändern und dann zurückschieben, um seinen Inhalt zu verändern:
fn
f
(
v
:&
Cell
<
Vec
<
i32
>>
)
{
let
mut
v2
=
v
.
take
();
// Replaces the contents of the Cell with an empty Vec
v2
.
push
(
1
);
v
.
set
(
v2
);
// Put the modified Vec back
}
RefCell
Im Gegensatz zu einem regulären Cell
erlaubt std::cell::RefCell
das Ausleihen seines Inhalts zu geringen Laufzeitkosten. Ein RefCell<T>
enthält nicht nur ein T
, sondern auch einen Zähler, der alle ausstehenden Ausleihen verfolgt.Wenn du versuchst, ihn auszuleihen, während er bereits veränderbar ausgeliehen ist (oder umgekehrt), gerät er in Panik, wodurch undefiniertes Verhalten vermieden wird. Genau wie ein Cell
kann ein RefCell
nur innerhalb eines einzigen Threads verwendet werden.
Das Ausleihen des Inhalts von RefCell
erfolgt durch den Aufruf von borrow
oder borrow_mut
:
use
std
::cell
::RefCell
;
fn
f
(
v
:&
RefCell
<
Vec
<
i32
>>
)
{
v
.
borrow_mut
().
push
(
1
);
// We can modify the `Vec` directly.
}
Während Cell
und RefCell
sehr nützlich sein können, werden sie ziemlich nutzlos, wenn wir etwas mit mehreren Threads machen müssen. Also gehen wir zu den Typen über, die für die Gleichzeitigkeit relevant sind.
Mutex und RwLock
Eine RwLock
oder Leser-Schreiber-Sperre ist die gleichzeitige Version einer RefCell
. Eine RwLock<T>
hält eine T
und verfolgt alle ausstehenden Ausleihen. Im Gegensatz zu einer RefCell
gerät sie jedoch nicht in Panik, wenn es zu Konflikten bei den Ausleihen kommt. Stattdessen blockiert sie den aktuellen Thread und versetzt ihn in den Ruhezustand, während sie darauf wartet, dass die konfliktbehafteten Ausleihen verschwinden. Wir müssen einfach geduldig warten, bis wir mit den Daten an der Reihe sind, nachdem die anderen Threads mit ihnen fertig sind.
Das Ausleihen des Inhalts einer RwLock
wird als Sperren bezeichnet. Durch das Sperren blockieren wir vorübergehend konkurrierende Ausleihen, sodass wir sie ausleihen können, ohne Datenwettläufe zu verursachen.
Eine Mutex
ist sehr ähnlich, aber konzeptionell etwas einfacher. Anstatt die Anzahl der gemeinsamen und exklusiven Ausleihen wie eine RwLock
zu verfolgen, erlaubt sie nur exklusive Ausleihen.
Wir gehen im Abschnitt "Sperren" näher auf diese Typen ein : Mutexe und RwLocks".
Atomics
Die atomaren Typen stellen die nebenläufige Version eines Cell
dar und sind das Hauptthema der Kapitel 2 und 3. Wie ein Cell
vermeiden sie undefiniertes Verhalten, indem sie uns Werte als Ganzes hinein- und herauskopieren lassen, ohne dass wir den Inhalt direkt ausleihen können.
Im Gegensatz zu Cell
können sie jedoch nicht beliebig groß sein. Deshalb gibt es keinen generischen Atomic<T>
Typ für alle T
, sondern nur spezielle atomare Typen wie AtomicU32
und AtomicPtr<T>
. Welche davon verfügbar sind, hängt von der Plattform ab, da sie vom Prozessor unterstützt werden müssen, um Datenwettläufe zu vermeiden. (Darauf gehen wir in Kapitel 7 ein).
Da sie so klein sind, enthalten Atomics oft nicht direkt die Informationen, die zwischen Threads ausgetauscht werden müssen. Stattdessen werden sie oft als Werkzeug verwendet, um andere - oft größere - Dinge zwischen Threads auszutauschen. Wenn Atomics verwendet werden, um etwas über andere Daten zu sagen, können die Dinge erstaunlich kompliziert werden.
UnsafeCell
Ein UnsafeCell
ist der primitive Baustein für innere Veränderbarkeit.
Ein UnsafeCell<T>
umhüllt einen T
, enthält aber keine Bedingungen oder Einschränkungen, um undefiniertes Verhalten zu vermeiden. Stattdessen gibt seine get()
Methode nur einen Rohzeiger auf den Wert, den sie umhüllt, der nur in unsafe
Blöcken sinnvoll verwendet werden kann. Es bleibt dem Benutzer überlassen, ihn so zu verwenden, dass er kein undefiniertes Verhalten verursacht.
Meistens wird UnsafeCell
nicht direkt verwendet, sondern von einem anderen Typ umhüllt, der Sicherheit durch eine begrenzte Schnittstelle bietet, wie Cell
oder Mutex
. Alle Typen mit innerer Veränderbarkeit - einschließlich aller oben besprochenen Typen - bauen auf UnsafeCell
auf.
Themensicherheit: Senden und Synchronisieren
In diesem Kapitel haben wir mehrere Typen kennengelernt, die nicht thread-sicher sind, d.h. nur in einem einzigen Thread verwendet werden können, wie z.B. Rc
, Cell
und andere. Da diese Einschränkung notwendig ist, um undefiniertes Verhalten zu vermeiden, muss der Compiler sie verstehen und für dich überprüfen, damit du diese Typen verwenden kannst, ohne unsafe
Blöcke verwenden zu müssen.
Die Sprache verwendet zwei spezielle Traits, um festzuhalten, welche Typen sicher über Threads hinweg verwendet werden können:
- Sende
-
Ein Typ ist
Send
, wenn er an einen anderen Thread gesendet werden kann. Mit anderen Worten, wenn das Eigentum an einem Wert dieses Typs an einen anderen Thread übertragen werden kann.Arc<i32>
ist zum BeispielSend
,Rc<i32>
aber nicht. - Sync
-
Ein Typ ist
Sync
, wenn er mit einem anderen Thread geteilt werden kann. Mit anderen Worten, ein TypT
istSync
, wenn und nur wenn eine geteilte Referenz auf diesen Typ,&T
,Send
ist. Zum Beispiel ist eini32
Sync
, aber einCell<i32>
nicht. (EinCell<i32>
ist jedochSend
.)
Alle primitiven Typen wie i32
, bool
und str
sind sowohl Send
als auch Sync
.
Beide Eigenschaften sind Auto-Eigenschaften, was bedeutet, dass sie automatisch für deine Typen auf der Grundlage ihrer Felder implementiert werden. Ein struct
mit Feldern, die alle Send
und Sync
sind, ist selbst auch Send
und Sync
.
Du kannst beides vermeiden, indem du deinem Typ ein Feld hinzufügst, das den Trait nicht implementiert. Zu diesem Zweck ist der spezielle Typ std::marker::PhantomData<T>
oft sehr nützlich. Dieser Typ wird vom Compiler wie ein T
behandelt, nur dass er zur Laufzeit nicht wirklich existiert. Es ist ein Typ mit der Größe Null, der keinen Platz benötigt.
Werfen wir einen Blick auf die folgende struct
:
use
std
::marker
::PhantomData
;
struct
X
{
handle
:i32
,
_not_sync
:PhantomData
<
Cell
<
()
>>
,
}
In diesem Beispiel wäre X
sowohl Send
als auch Sync
, wenn handle
sein einziges Feld wäre. Wir haben jedoch ein PhantomData<Cell<()>>
-Feld mit der Größe Null hinzugefügt, das so behandelt wird, als wäre es Cell<()>
. Da Cell<()>
nicht Sync
ist, ist es auch nicht X
. Es ist aber immer noch Send
, da alle seine Felder Send
implementieren.
Rohe Zeiger (*const T
und *mut T
) sind weder Send
noch Sync
, da der Compiler nicht viel darüber weiß, was sie darstellen.
Die Art und Weise, wie du dich für einen der Traits entscheidest, ist dieselbe wie bei jedem anderen Trait: Verwende einen impl
Block, um den Trait für deinen Typ zu implementieren:
struct
X
{
p
:*
mut
i32
,
}
unsafe
impl
Send
for
X
{}
unsafe
impl
Sync
for
X
{}
Beachte, dass die Implementierung dieser Eigenschaften das Schlüsselwort unsafe
erfordert, da der Compiler nicht für dich prüfen kann, ob sie korrekt sind. Es ist ein Versprechen, das du dem Compiler gibst, dem er einfach vertrauen muss.
Wenn du versuchst, etwas in einen anderen Thread zu verschieben, der nicht Send
ist, wird der Compiler dich höflich davon abhalten. Hier ist ein kleines Beispiel, um das zu demonstrieren:
fn
main
()
{
let
a
=
Rc
::new
(
123
);
thread
::spawn
(
move
||
{
// Error!
dbg!
(
a
);
});
}
Hier versuchen wir, eine Rc<i32>
an einen neuen Thread zu senden, aber Rc<i32>
implementiert im Gegensatz zu Arc<i32>
keine Send
.
Wenn wir versuchen, das obige Beispiel zu kompilieren, werden wir mit einer Fehlermeldung konfrontiert, die in etwa so aussieht:
error[E0277]: `Rc<i32>` cannot be sent between threads safely --> src/main.rs:3:5 | 3 | thread::spawn(move || { | ^^^^^^^^^^^^^ `Rc<i32>` cannot be sent between threads safely | = help: within `[closure]`, the trait `Send` is not implemented for `Rc<i32>` note: required because it's used within this closure --> src/main.rs:3:19 | 3 | thread::spawn(move || { | ^^^^^^^ note: required by a bound in `spawn`
Die Funktion thread::spawn
setzt voraus, dass ihr Argument Send
ist, und eine Closure ist nur dann Send
, wenn alle ihre Captures dies sind. Wenn wir versuchen, etwas zu fangen, das nicht Send
ist, wird unser Fehler aufgefangen, was uns vor undefiniertem Verhalten schützt.
Sperren: Mutexe und RwLocks
Das am häufigsten verwendete Werkzeug für die gemeinsame Nutzung von (veränderbaren) Daten zwischen Threads ist ein Mutex, eine Abkürzung für "Mutual Exclusion" (gegenseitiger Ausschluss). Die Aufgabe eines Mutex ist es, sicherzustellen, dass Threads exklusiven Zugriff auf bestimmte Daten haben, indem sie andere Threads, die zur gleichen Zeit versuchen, darauf zuzugreifen, vorübergehend blockieren.
Ein Mutex hat konzeptionell nur zwei Zustände: gesperrt und entsperrt. Wenn ein Thread einen entsperrten Mutex sperrt, wird der Mutex als gesperrt markiert und der Thread kann sofort weiterarbeiten. Wenn ein Thread dann versucht, einen bereits gesperrten Mutex zu sperren, wird dieser Vorgang blockiert. Der Thread wird in den Ruhezustand versetzt, während er darauf wartet, dass der Mutex entsperrt wird. Das Entsperren ist nur bei einem gesperrten Mutex möglich und sollte von demselben Thread durchgeführt werden, der ihn gesperrt hat. Wenn andere Threads darauf warten, den Mutex zu sperren, führt das Entsperren dazu, dass einer dieser Threads aufgeweckt wird, so dass er versuchen kann, den Mutex erneut zu sperren und seinen Weg fortzusetzen.
Der Schutz von Daten mit einer Mutex ist einfach die Vereinbarung zwischen allen Threads, dass sie nur dann auf die Daten zugreifen, wenn sie die Mutex gesperrt haben. Auf diese Weise können niemals zwei Threads gleichzeitig auf diese Daten zugreifen und einen Datenwettlauf verursachen.
Rusts Mutex
Die Rust-Standardbibliothek bietet diese Funktionalität durch std::sync::Mutex<T>
. Sie ist generisch über einen Typ T
, der der Typ der Daten ist, die der Mutex schützt. Indem T
Teil des Mutex ist, kann auf die Daten nur über den Mutex zugegriffen werden, was eine sichere Schnittstelle ermöglicht, die garantiert, dass alle Threads die Vereinbarung einhalten.
Um sicherzustellen, dass ein gesperrter Mutex nur von dem Thread entsperrt werden kann, der ihn gesperrt hat, verfügt er nicht über eine unlock()
Methode.
Stattdessen gibt seine lock()
Methode einen speziellen Typ zurück, der MutexGuard
genannt wird. Dieser Guard stellt die Garantie dar, dass wir den Mutex gesperrt haben.
Sie verhält sich wie eine exklusive Referenz durch die DerefMut
Eigenschaft und gibt uns exklusiven Zugriff auf die Daten, die die Mutex schützt.Das Entsperren der Mutex erfolgt durch das Aufheben der Wache. Wenn wir die Wache aufheben, geben wir unsere Fähigkeit auf, auf die Daten zuzugreifen, und die Drop
Implementierung der Wache wird die Mutex entsperren.
Schauen wir uns ein Beispiel an, um eine Mutex in der Praxis zu sehen:
use
std
::sync
::Mutex
;
fn
main
()
{
let
n
=
Mutex
::new
(
0
);
thread
::scope
(
|
s
|
{
for
_
in
0
..
10
{
s
.
spawn
(
||
{
let
mut
guard
=
n
.
lock
().
unwrap
();
for
_
in
0
..
100
{
*
guard
+=
1
;
}
});
}
});
assert_eq!
(
n
.
into_inner
().
unwrap
(),
1000
);
}
In diesem Fall haben wir eine Mutex<i32>
, eine Mutex, die eine ganze Zahl schützt, und wir starten zehn Threads, die die ganze Zahl jeweils hundertmal inkrementieren. Jeder Thread sperrt zunächst die Mutex, um eine MutexGuard
zu erhalten, und verwendet dann diese guard
, um auf die ganze Zahl zuzugreifen und sie zu ändern. guard
wird implizit sofort wieder gelöscht, wenn die Variable den Gültigkeitsbereich verlässt.
Nachdem die Threads fertig sind, können wir den Schutz der Ganzzahl mit into_inner()
sicher aufheben. Die Methode into_inner
übernimmt den Besitz des Mutex, was garantiert, dass nichts anderes mehr einen Verweis auf den Mutex haben kann, wodurch Sperren unnötig wird.
Auch wenn die Inkremente in Einerschritten erfolgen, würde ein Thread, der den Integer beobachtet, immer nur Vielfache von 100 sehen, da er den Integer nur betrachten kann, wenn der Mutex entsperrt ist. Dank des Mutex sind die hundert Inkremente nun eine einzige unteilbare atomare Operation.
Um den Effekt des Mutex deutlich zu sehen, können wir jeden Thread eine Sekunde warten lassen, bevor wir den Mutex entsperren:
use
std
::time
::Duration
;
fn
main
()
{
let
n
=
Mutex
::new
(
0
);
thread
::scope
(
|
s
|
{
for
_
in
0
..
10
{
s
.
spawn
(
||
{
let
mut
guard
=
n
.
lock
().
unwrap
();
for
_
in
0
..
100
{
*
guard
+=
1
;
}
thread
::sleep
(
Duration
::from_secs
(
1
));
// New!
});
}
});
assert_eq!
(
n
.
into_inner
().
unwrap
(),
1000
);
}
Wenn du das Programm jetzt ausführst, wirst du sehen, dass es etwa 10 Sekunden braucht, um fertig zu werden. Jeder Thread wartet nur eine Sekunde lang, aber der Mutex sorgt dafür, dass immer nur ein Thread gleichzeitig warten kann.
Wenn wir die Wache - und damit auch den Mutex - aufheben, bevor wir eine Sekunde schlafen, wird der Vorgang stattdessen parallel ablaufen:
fn
main
()
{
let
n
=
Mutex
::new
(
0
);
thread
::scope
(
|
s
|
{
for
_
in
0
..
10
{
s
.
spawn
(
||
{
let
mut
guard
=
n
.
lock
().
unwrap
();
for
_
in
0
..
100
{
*
guard
+=
1
;
}
drop
(
guard
);
// New: drop the guard before sleeping!
thread
::sleep
(
Duration
::from_secs
(
1
));
});
}
});
assert_eq!
(
n
.
into_inner
().
unwrap
(),
1000
);
}
Mit dieser Änderung dauert das Programm nur noch etwa eine Sekunde, da die 10 Threads ihren einsekündigen Sleep gleichzeitig ausführen können. Das zeigt, wie wichtig es ist, die Zeit, in der eine Mutex gesperrt ist, so kurz wie möglich zu halten. Wenn eine Mutex länger als nötig gesperrt bleibt, kann das die Vorteile der Parallelität komplett zunichte machen und zwingt alles dazu, seriell zu laufen.
Vergiftung durch Schlösser
Die unwrap()
Aufrufe in den obigen Beispielen beziehen sich auf Lock Poisoning.
Eine Mutex
in Rust wird als vergiftet markiert, wenn ein Thread in Panik gerät, während er die Sperre hält. Wenn das passiert, ist die Mutex
nicht mehr gesperrt, aber der Aufruf ihrer lock
Methode führt zu einer Err
, um anzuzeigen, dass sie vergiftet wurde.
Dies ist ein Mechanismus, der verhindern soll, dass Daten, die durch eine Mutex geschützt sind, in einem inkonsistenten Zustand bleiben. Wenn in unserem Beispiel oben ein Thread in Panik gerät, nachdem er die Ganzzahl weniger als 100 Mal erhöht hat, würde die Mutex entsperrt und die Ganzzahl in einem unerwarteten Zustand belassen, in dem sie nicht mehr ein Vielfaches von 100 ist, was die Annahmen anderer Threads zerstören könnte. Wenn die Mutex in diesem Fall automatisch als vergiftet markiert wird, muss der Benutzer diese Möglichkeit berücksichtigen.
Der Aufruf von lock()
auf einer vergifteten Mutex sperrt die Mutex immer noch. Err
, das von lock()
zurückgegeben wird, enthält die MutexGuard
, so dass wir einen inkonsistenten Zustand beiBedarf korrigieren können.
Obwohl Lock Poisoning ein mächtiger Mechanismus zu sein scheint, wird die Wiederherstellung eines potenziell inkonsistenten Zustands in der Praxis nicht oft durchgeführt. Der meiste Code ignoriert entweder Poisoning oder verwendet unwrap()
, um eine Panik auszulösen, wenn die Sperre vergiftet wurde, wodurch Panik auf alle Benutzer der Mutex übertragen wird.
Leser-Schreiber-Sperre
Bei einer Mutex geht es nur um den exklusiven Zugriff. Die MutexGuard
gibt uns einen exklusiven Verweis (&mut T
) auf die geschützten Daten, auch wenn wir die Daten nur ansehen wollten und ein gemeinsamer Verweis (&T
) ausgereicht hätte.
Eine Leser-Schreiber-Sperre ist eine etwas kompliziertere Version einer Mutex, die den Unterschied zwischen exklusivem und gemeinsamem Zugriff kennt und beides ermöglichen kann. Sie hat drei Zustände: entsperrt, gesperrt durch einen einzelnen Schreiber (für exklusiven Zugriff) und gesperrt durch eine beliebige Anzahl von Lesern (für gemeinsamen Zugriff). Sie wird häufig für Daten verwendet, die oft von mehreren Threads gelesen, aber nur ab und zu aktualisiert werden.
Die Rust-Standardbibliothek stellt diese Sperre über den Typ std::sync::RwLock<T>
zur Verfügung. Er funktioniert ähnlich wie der Standard Mutex
, nur dass seine Schnittstelle größtenteils in zwei Teile aufgeteilt ist. Statt einer einzigen Methode lock()
gibt es eine Methode read()
und eine write()
, um entweder als Leser oder als Schreiber zu sperren.
Es gibt zwei Guard-Typen, einen für Leser und einen für Schreiber: RwLockReadGuard
und RwLockWriteGuard
.
Ersterer implementiert nur Deref
, um sich wie eine gemeinsame Referenz auf die geschützten Daten zu verhalten, während letzterer auch DerefMut
implementiert, um sich wie eine exklusive Referenz zu verhalten.
Sie ist quasi die Multi-Thread-Version von RefCell
und verfolgt dynamisch die Anzahl der Verweise, um sicherzustellen, dass die Ausleihregeln eingehalten werden.
Sowohl Mutex<T>
als auch RwLock<T>
setzen voraus, dass T
Send
ist, da sie verwendet werden können, um eine T
an einen anderen Thread zu senden.
Ein RwLock<T>
setzt zusätzlich voraus, dass T
auch Sync
implementiert, da es mehreren Threads erlaubt, einen gemeinsamen Verweis (&T
) auf die geschützten Daten zu halten.
(Streng genommen kannst du eine Sperre für eine T
erstellen, die diese Anforderungen nicht erfüllt, aber du wärst nicht in der Lage, sie zwischen Threads zu teilen, da die Sperre selbst Sync
nicht implementiert).
Die Rust-Standardbibliothek stellt nur einen allgemeinen RwLock
Typ zur Verfügung, dessen Implementierung jedoch vom Betriebssystem abhängt. Es gibt viele feine Unterschiede zwischen den Implementierungen von Leser-Schreiber-Sperren. Die meisten Implementierungen blockieren neue Leser, wenn ein Schreiber wartet, selbst wenn die Sperre bereits lesend gesperrt ist.
Dies geschieht, um zu verhindern, dass der Schreiber verhungert, d. h., dass viele Leser gemeinsam verhindern, dass die Sperre jemals aufgehoben wird und kein Schreiber die Daten aktualisieren kann.
Warten: Parken und Bedingungsvariablen
Wenn Daten von mehreren Threads verändert werden, gibt es viele Situationen, in denen die Threads auf ein Ereignis warten müssen, damit eine Bedingung für die Daten wahr wird. Wenn wir zum Beispiel eine Mutex haben, die eine Vec
schützt, wollen wir vielleicht warten, bis sie etwas enthält.
Ein Mutex ermöglicht es Threads zwar zu warten, bis er entsperrt wird, bietet aber keine Funktion, um auf andere Bedingungen zu warten. Wäre ein Mutex alles, was wir hätten, müssten wir den Mutex immer wieder sperren, um zu überprüfen, ob sichnoch etwas aufVec
befindet.
Thema Parken
Eine Möglichkeit, auf die Benachrichtigung eines anderen Threads zu warten, ist das so genannte Thread-Parking. Ein Thread kann sich selbst parken, wodurch er in den Ruhezustand versetzt wird und keine CPU-Zyklen mehr verbraucht. Ein anderer Thread kann dann den geparkten Thread entparken und ihn aus seinem Nickerchen aufwecken.
Das Parken von Threads ist über die Funktion std::thread::park()
möglich. Zum Entparken rufst du die Methode unpark()
für ein Thread
Objekt auf, das den Thread repräsentiert, den du entparken möchtest. Ein solches Objekt kann aus dem Join-Handle, das von spawn
zurückgegeben wird, oder vom Thread selbst über std::thread::current()
bezogen werden.
Im folgenden Beispiel wird ein Mutex verwendet, um eine Warteschlange zwischen zwei Threads zu teilen. Im folgenden Beispiel konsumiert ein neu gespawnter Thread Elemente aus der Warteschlange, während der Hauptthread jede Sekunde ein neues Element in die Warteschlange einfügt. Thread-Parking wird verwendet, damit der konsumierende Thread wartet, wenn die Warteschlange leer ist.
use
std
::collections
::VecDeque
;
fn
main
()
{
let
queue
=
Mutex
::new
(
VecDeque
::new
());
thread
::scope
(
|
s
|
{
// Consuming thread
let
t
=
s
.
spawn
(
||
loop
{
let
item
=
queue
.
lock
().
unwrap
().
pop_front
();
if
let
Some
(
item
)
=
item
{
dbg!
(
item
);
}
else
{
thread
::park
();
}
});
// Producing thread
for
i
in
0
..
{
queue
.
lock
().
unwrap
().
push_back
(
i
);
t
.
thread
().
unpark
();
thread
::sleep
(
Duration
::from_secs
(
1
));
}
});
}
Der konsumierende Thread führt eine Endlosschleife aus, in der er mit dem Makro dbg
Elemente aus der Warteschlange zieht, um sie anzuzeigen. Wenn die Warteschlange leer ist, hält er an und geht mit der Funktion park()
in den Ruhezustand über. Wenn er entparkt wird, kehrt der Aufruf park()
zurück und loop
fährt fort, indem er erneut Elemente aus der Warteschlange zieht, bis diese leer ist. Und so weiter.
Der produzierende Thread produziert jede Sekunde eine neue Zahl, indem er sie in die Warteschlange schiebt. Jedes Mal, wenn er ein Element hinzufügt, verwendet er die Methode unpark()
für das Thread
Objekt, das auf den konsumierenden Thread verweist, um es zu entparken. Auf diese Weise wird der konsumierende Thread aufgeweckt, um das neue Element zu verarbeiten.
Eine wichtige Beobachtung an dieser Stelle ist, dass dieses Programm theoretisch immer noch korrekt, wenn auch ineffizient wäre, wenn wir das Parken abschaffen. Das ist wichtig, weil park()
nicht garantiert, dass es nur aufgrund eines passenden unpark()
zurückkehrt. Auch wenn es eher selten vorkommt, kann es zu ungewollten Aufweckungen kommen. Unser Beispiel kommt damit gut zurecht, weil der konsumierende Thread die Warteschlange sperrt, sieht, dass sie leer ist, und sie direkt wieder entsperrt und sich selbst wieder parkt.
Eine wichtige Eigenschaft des Thread-Parking ist, dass ein Aufruf von unpark()
, bevor der Thread sich selbst parkt, nicht verloren geht. Die Aufforderung zum Entparken wird immer noch aufgezeichnet, und wenn der Thread das nächste Mal versucht, sich selbst zu parken, löscht er diese Aufforderung und macht direkt weiter, ohne tatsächlich in den Ruhezustand zu gehen. Um zu sehen, warum das für den korrekten Betrieb entscheidend ist, gehen wir eine mögliche Reihenfolge der von beiden Threads ausgeführten Schritte durch:
-
Der konsumierende Thread - nennen wir ihn C - sperrt die Warteschlange.
-
C versucht, ein Element aus der Warteschlange zu entfernen, aber die Warteschlange ist leer, was zu
None
führt. -
C schaltet die Warteschlange frei.
-
Der produzierende Thread, den wir P nennen, sperrt die Warteschlange.
-
P schiebt ein neues Element in die Warteschlange.
-
P schaltet die Warteschlange wieder frei.
-
P ruft
unpark()
an, um C mitzuteilen, dass es neue Artikel gibt. -
C ruft
park()
auf, um sich schlafen zu legen, um auf weitere Artikel zu warten.
Während zwischen der Freigabe der Warteschlange in Schritt 3 und dem Parken in Schritt 8 wahrscheinlich nur ein sehr kurzer Moment liegt, könnten die Schritte 4 bis 7 in diesem Moment passieren, bevor der Thread sich selbst parkt. Wenn unpark()
nichts tun würde, wenn der Thread nicht geparkt ist, würde die Benachrichtigung verloren gehen. Der konsumierende Thread würde immer noch warten, selbst wenn sich ein Element in der Warteschlange befinden würde. Dank der Unpark-Anfragen, die für einen zukünftigen Aufruf von park()
gespeichert werden, müssen wir uns darüber keine Sorgen machen.
Allerdings stapeln sich die Unpark-Anfragen nicht. Wenn du unpark()
zweimal aufrufst und danach park()
zweimal aufrufst, geht der Thread trotzdem in den Schlaf. Die erste park()
löscht die Anfrage und kehrt direkt zurück, aber die zweite geht wie üblich in den Schlaf.
Das bedeutet, dass es in unserem obigen Beispiel wichtig ist, den Thread nur dann zu parken, wenn wir festgestellt haben, dass die Warteschlange leer ist, anstatt ihn nach jedem verarbeiteten Element zu parken. Obwohl es in diesem Beispiel aufgrund des riesigen (eine Sekunde) Sleep extrem unwahrscheinlich ist, ist es möglich, dass mehrere unpark()
Aufrufe nur einen einzigen park()
Aufruf aufwecken.
Leider bedeutet das, dass, wenn unpark()
direkt nach der Rückkehr vonpark()
aufgerufen wird, aber bevor die Warteschlange gesperrt und geleert wird, der Aufruf von unpark()
unnötig war, aber trotzdem dazu führt, dass der nächste Aufruf von park()
sofort zurückkehrt. Das führt dazu, dass die (leere) Warteschlange ein weiteres Mal gesperrt und entsperrt wird. Das beeinträchtigt zwar nicht die Korrektheit des Programms, aber seine Effizienz und Leistung.
Dieser Mechanismus funktioniert gut für einfache Situationen wie in unserem Beispiel, versagt aber schnell, wenn die Dinge komplizierter werden. Wenn wir zum Beispiel mehrere Verbraucher-Threads haben, die Elemente aus derselben Warteschlange nehmen, kann der Producer-Thread nicht wissen, welcher der Verbraucher tatsächlich wartet und aufgeweckt werden sollte. Der Producer muss genau wissen, wann ein Verbraucher wartet und auf welche Bedingung er wartet.
Bedingungsvariablen
Bedingungsvariablen sind eine häufig genutzte Option, um darauf zu warten, dass etwas mit Daten passiert, die durch einen Mutex geschützt sind. Sie haben zwei grundlegende Operationen: warten und benachrichtigen. Threads können auf eine Bedingungsvariable warten und dann aufgeweckt werden, wenn ein anderer Thread dieselbe Bedingungsvariable benachrichtigt. Mehrere Threads können auf dieselbe Bedingungsvariable warten, und die Benachrichtigungen können entweder an einen wartenden Thread oder an alle gesendet werden.
Das bedeutet, dass wir eine Bedingungsvariable für bestimmte Ereignisse oder Bedingungen erstellen können, an denen wir interessiert sind, z. B. wenn die Warteschlange nicht leer ist, und auf diese Bedingung warten. Jeder Thread, der dieses Ereignis oder diese Bedingung auslöst, benachrichtigt dann die Bedingungsvariable, ohne dass wir wissen müssen, welche oder wie viele Threads an dieser Benachrichtigung interessiert sind.
Um zu vermeiden, dass in dem kurzen Moment zwischen dem Entsperren eines Mutex und dem Warten auf eine Bedingungsvariable Benachrichtigungen verloren gehen, bieten Bedingungsvariablen eine Möglichkeit, den Mutex atomar zu entsperren und mit dem Warten zu beginnen. Das bedeutet, dass es einfach keinen Moment gibt, in dem Benachrichtigungen verloren gehen können.
Die Rust-Standardbibliothek stellt eine Bedingungsvariable als std::sync::Condvar
zur Verfügung. Die Methode wait
nimmt eine MutexGuard
entgegen, die beweist, dass wir den Mutex gesperrt haben. Sie hebt die Sperre des Mutex zunächst auf und geht schlafen. Später, wenn sie geweckt wird, sperrt sie den Mutex erneut und gibt eine neue MutexGuard
zurück (die beweist, dass der Mutex wieder gesperrt ist).
Es gibt zwei Benachrichtigungsfunktionen: notify_one
, um nur einen wartenden Thread (falls vorhanden) aufzuwecken, und notify_all
, um alle Threads aufzuwecken.
Ändern wir das Beispiel, das wir für das Fadenparken verwendet haben, und verwenden stattdessen Condvar
:
use
std
::sync
::Condvar
;
let
queue
=
Mutex
::new
(
VecDeque
::new
());
let
not_empty
=
Condvar
::new
();
thread
::scope
(
|
s
|
{
s
.
spawn
(
||
{
loop
{
let
mut
q
=
queue
.
lock
().
unwrap
();
let
item
=
loop
{
if
let
Some
(
item
)
=
q
.
pop_front
()
{
break
item
;
}
else
{
q
=
not_empty
.
wait
(
q
).
unwrap
();
}
};
drop
(
q
);
dbg!
(
item
);
}
});
for
i
in
0
..
{
queue
.
lock
().
unwrap
().
push_back
(
i
);
not_empty
.
notify_one
();
thread
::sleep
(
Duration
::from_secs
(
1
));
}
});
Wir mussten ein paar Dinge ändern:
-
Wir haben jetzt nicht nur eine
Mutex
, die die Warteschlange enthält, sondern auch eineCondvar
, um die Bedingung "nicht leer" zu kommunizieren. -
Wir müssen nicht mehr wissen, welcher Thread aufgeweckt werden soll, also speichern wir den Rückgabewert von
spawn
nicht mehr. Stattdessen benachrichtigen wir den Verbraucher über die Bedingungsvariable mit der Methodenotify_one
. -
Das Entriegeln, Warten und Wiederverriegeln wird von der Methode
wait
übernommen. Wir mussten den Kontrollfluss ein wenig umstrukturieren, um die Wache an die Methodewait
zu übergeben und sie trotzdem vor der Verarbeitung eines Items fallen zu lassen.
Jetzt können wir so viele konsumierende Threads spawnen, wie wir wollen, und später sogar noch mehr, ohne etwas ändern zu müssen. Die Bedingungsvariable kümmert sich darum, die Benachrichtigungen an den jeweiligen Thread weiterzuleiten, der daran interessiert ist.
Wenn wir ein komplizierteres System mit Threads hätten, die an verschiedenen Bedingungen interessiert sind, könnten wir für jede Bedingung eine Condvar
definieren. Wir könnten zum Beispiel eine definieren, die anzeigt, dass die Warteschlange nicht leer ist, und eine andere, die anzeigt, dass sie leer ist. Dann würde jeder Thread auf die Bedingung warten, die für das, was er tut, relevant ist.
Normalerweise wird ein Condvar
immer nur zusammen mit einem einzigen Mutex
verwendet. Wenn zwei Threads versuchen, gleichzeitig wait
auf eine Bedingungsvariable zuzugreifen und dabei zwei verschiedene Mutexe verwenden, kann dies zu einer Panik führen.
Ein Nachteil von Condvar
ist, dass es nur in Verbindung mit Mutex
funktioniert, aber für die meisten Anwendungsfälle ist das völlig in Ordnung, denn das ist genau das, was sowieso schon zum Schutz der Daten verwendet wird.
Sowohl thread::park()
als auch Condvar::wait()
haben auch eine Variante mit einem Zeitlimit:thread::park_timeout()
und Condvar::wait_timeout()
. Diese nehmen ein Duration
als zusätzliches Argument, das die Zeit angibt, nach der es das Warten auf eine Benachrichtigung aufgeben und bedingungslos aufwachen soll.
Zusammenfassung
-
Mehrere Threads können gleichzeitig im selben Programm laufen und jederzeit gestartet werden.
-
Wenn der Haupt-Thread endet, wird das gesamte Programm beendet.
-
Datenrennen sind undefiniertes Verhalten, das (in sicherem Code) durch das Typsystem von Rust vollständig verhindert wird.
-
Daten, die
Send
sind, können an andere Threads gesendet werden, und Daten, dieSync
sind, können zwischen Threads ausgetauscht werden. -
Reguläre Threads können so lange laufen wie das Programm selbst und können daher nur
'static
Daten wie statische Daten und geleakte Zuweisungen ausleihen. -
Referenzzählung (
Arc
) kann verwendet werden, um den Besitz zu teilen und sicherzustellen, dass Daten so lange leben, wie mindestens ein Thread sie benutzt. -
Scoped Threads sind nützlich, um die Lebensdauer eines Threads zu begrenzen, damit er nicht
'static
Daten, wie z. B. lokale Variablen, ausleihen kann. -
&T
ist eine gemeinsame Referenz.&mut T
ist eine exklusive Referenz. Reguläre Typen erlauben keine Mutation durch eine gemeinsame Referenz. -
Einige Typen haben dank
UnsafeCell
eine innere Veränderbarkeit, die eine Mutation durch gemeinsame Referenzen ermöglicht. -
Cell
undRefCell
sind die Standardtypen für Single-Thread-interne Veränderbarkeit.Mutex
undRwLock
sind die Multithread-Äquivalente. -
Cell
und atomics erlauben nur das Ersetzen des Wertes als Ganzes, währendRefCell
,Mutex
undRwLock
es dir ermöglichen, den Wert direkt zu verändern, indem du dynamisch Zugriffsregelnerzwingst. -
Fadenparken kann eine bequeme Möglichkeit sein, auf eine bestimmte Bedingung zu warten.
-
Wenn es bei einer Bedingung um Daten geht, die durch eine
Mutex
geschützt sind, ist die Verwendung einerCondvar
bequemer und kann effizienter sein als Thread Parking.
Get Rust Atomics und Schlösser now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.