Chapter 4. Concurrent Programming
Concurrency is the ability for different functions to execute in parallel without affecting each other unless explicitly programmed to do so. Each concurrent activity in Erlang is called a process. The only way for processes to interact with each other is through message passing, where data is sent from one process to another. The philosophy behind Erlang and its concurrency model is best described by Joe Armstrong’s tenets:
The world is concurrent.
Things in the world don’t share data.
Things communicate with messages.
Things fail.
The concurrency model and its error-handling mechanisms were built into Erlang from the start. With lightweight processes, it is not unusual to have hundreds of thousands, even millions, of processes running in parallel, often with a small memory footprint. The ability of the runtime system to scale concurrency to these levels directly affects the way programs are developed, differentiating Erlang from other concurrent programming languages.
What if you were to use Erlang to write an instant messaging (IM) server, supporting the transmission of messages between thousands of users in a system such as Google Talk or Facebook? The Erlang design philosophy is to spawn a new process for every event so that the program structure directly reflects the concurrency of multiple users exchanging messages. In an IM system, an event could be a presence update, a message being sent or received, or a login request. Each process will service the event it handles, and terminate when the request has been completed.
You could do the same in C or Java, but you would struggle when scaling the system to hundreds of thousands of concurrent events. An option might be to have a pool of processes handling specific event types or particular users, but certainly not a new process for every event. Erlang gets away with this because it does not use native threads to represent processes. It has its own scheduler in the virtual machine (VM), making the creation of processes very efficient while at the same time minimizing their memory footprint. This efficiency is maintained regardless of the number of concurrent processes in the system. The same argument applies for message passing, where the time to send a message is negligible and constant, regardless of the number of processes. This chapter introduces concurrent programming in Erlang, letting you in on one of the most powerful concurrency models available today.
Creating Processes
So far, we’ve looked at executing sequential code in a single
process. To run concurrent code, you have to create more processes. You do
this by spawning a process using the spawn(Module
, Function
, Arguments)
BIF. This BIF creates a new process that evaluates the Function
exported from the module Module
with the list of Arguments
as parameters. The spawn/3
BIF returns a process identifier, which from now on we will refer to as a
pid.
In Figure 4-1, the process we
call Pid1
executes the spawn
BIF somewhere in its program. This call
results in the new process with process identifier Pid2
being created. Process identifier Pid2
is returned as a result of the call to
spawn
, and will typically be bound to a
variable in an expression of the following format:
Pid2 = spawn(Module, Function, Arguments).
The pid of the new process, Pid2
,
at this point is known only within the process Pid1
, as it is a local variable that has not
been shared with anybody. The spawned process starts executing the
exported function passed as the second argument to the BIF, and the arity
of this function is dictated by the length of the
list passed as the third argument to the BIF.
Warning
A common error when you start programming Erlang is to forget that
the third argument to spawn
is a
list of arguments, so if you want to spawn the
function m:f/1
with the argument
a
, you need to call:
spawn(m, f, [a])
not:
not spawn(m, f, a).
Once spawned, a process will continue executing and remain alive until it terminates. If there is no more code to execute, a process is said to terminate normally. On the other hand, if a runtime error such as a bad match or a case failure occurs, the process is said to terminate abnormally.
Spawning a process will never fail, even if you are spawning a
nonexported or even a nonexistent function. As soon as the process is
created and spawn/3
returns the pid,
the newly created process will terminate with a runtime error:
1> spawn(no_module, nonexistent_function, []).
<0.32.0>
=ERROR REPORT==== 29-Feb-2008::21:48:29 ===
Error in process <0.32.0> with exit value:
{undef,[{no_module,nonexistent_function,[]}]}
In the preceding example, note how the error report is formatted. It
is different from the ones you saw previously, as the error does not take
place in the shell, but in the process with pid <0.32.0>
. If the error occurs in a spawned
process, it is detected by another part of the Erlang runtime system
called the error logger, which by default prints an
error report in the shell using the format shown earlier. Errors detected
by the shell are instead formatted in a more readable form.
The processes()
BIF returns
a list of all of the processes running in the system. In most cases, you
should have no problems using the BIF, but there have been extreme
situations in large systems where calling processes()
from the shell has been known to
result in the runtime system running out of memory![13] Don’t forget that in industrial applications, you might be
dealing with millions of processes running concurrently. In the current
implementation of the runtime system, the absolute limit is in the
hundreds of millions. Check the Erlang documentation for the latest
figures. The default number is much lower, but you can easily change it by
starting the Erlang shell with the command erl +P
MaxProcceses
, where MaxProcesses
is an integer.
You can use the shell command i()
to find out what the
currently executing processes in the runtime system are doing. It will
print the process identifier, the function used to spawn the process, the
function in which the process is currently executing, as well as other
information covered later in this chapter. Look at the example in the
following shell printout. Can you spot the process that is running as the
error logger?
2>processes().
[<0.0.0>,<0.2.0>,<0.4.0>,<0.5.0>,<0.7.0>,<0.8.0>,<0.9.0>, <0.10.0>,<0.11.0>,<0.12.0>,<0.13.0>,<0.14.0>,<0.15.0>, <0.17.0>,<0.18.0>,<0.19.0>,<0.20.0>,<0.21.0>,<0.22.0>, <0.23.0>,<0.24.0>,<0.25.0>,<0.26.0>,<0.30.0>] 3>i().
Pid Initial Call Heap Reds Msgs Registered Current Function Stack <0.0.0> otp_ring0:start/2 987 2684 0 init init:loop/1 2 <0.2.0> erlang:apply/2 2584 61740 0 erl_prim_loader erl_prim_loader:loop/3 5 <0.4.0> gen_event:init_it/6 610 219 0 error_logger gen_event:fetch_msg/5 11 <0.5.0> erlang:apply/2 1597 508 0 ...
If you are wondering why the processes()
BIF returned far more than 20
processes when you created only one that failed right after being spawned,
you are not alone. Large parts of the Erlang runtime system are
implemented in Erlang, the error_logger
and the Erlang shell being
two of the many examples. You will come across other system processes as
you work your way through the remaining chapters of this book.
Message Passing
Processes communicate with each other using message passing. Messages
are sent using the Pid ! Message
construct, where Pid
is a valid process
identifier and Message
is a value from
any Erlang data type (see Figure 4-2).
Each Erlang process has a mailbox in which incoming messages are stored. When a message is sent, it is copied from the sending process into the recipient’s mailbox for retrieval. Messages are stored in the mailbox in the order in which they are delivered. If two messages are sent from one process to another, the messages are guaranteed to be received in the same order in which they are sent. This guarantee is not extended to messages sent from different processes, however, and in this case the ordering is VM-dependent.
Sending a message will never fail; so if you try sending a message to a nonexistent process, it is thrown away without generating an error. Finally, message passing is asynchronous: a sending process will not be suspended after sending a message; it will instead immediately continue executing the next expression in its code.
To test sending messages in the shell, let’s use the self/0
BIF, which
returns the pid of the process in which it is evaluated. The Erlang shell is nothing other than an Erlang process in a read-evaluate-print loop, waiting for
you to type in an expression. When you terminate an expression followed by a
full stop (.) and press Enter, the shell evaluates what you typed in and
prints out a result. Since the shell is an Erlang process, there is
nothing stopping us from sending messages to it. To retrieve and display
all the messages sent to the shell process, and therefore currently held
in the process mailbox, you can use the shell command flush/0
, which also has
the effect of removing (or flushing) those messages
from the mailbox:
1>Pid = self().
<0.30.0> 2>Pid ! hello.
hello 3>flush().
Shell got hello ok 4><0.30.0> ! hello.
* 1: syntax error before: '<' 5>Pid2 = pid(0,30,0).
<0.30.0> 6>Pid2 ! hello2.
hello2 7>flush().
Shell got hello2 ok
What is happening in the preceding example? In command 1, the BIF
self()
returns a pid, which in the
shell is bound to the variable Pid
and
displayed as <0.30.0>
. In
commands 2 and 3 you see the message being sent to the Pid
, and then flushed from the mailbox, using
the flush()
command in the
shell.
You cannot type pids directly in a module or in the shell, as in
both cases, they result in a syntax error; this is shown for the shell in
command 4. You need either to bind the process identifiers to a variable
when BIFs such as self
and spawn
return them, or generate a pid using
the pid/3
shell function, as
shown in command 5 and used in command 6. The flush()
in command 7 shows that the message
indeed went to the shell process.
Pid ! Message
is a valid Erlang
expression, and as with all valid expressions in Erlang, it has to return
a value. The value, in this case, is the message
sent. So if, for example, you need to send the same message to
many processes, you can write either a sequence of message sends, such as
Pid1!Msg,Pid2!Msg,Pid3!Msg
, or a single
expression, such as Pid3!Pid2!Pid1!Message
, which is equivalent to
writing Pid3!(Pid2!(Pid1!Message))
,
where Pid1!Message
returns the message
to send to Pid2
, which in turn returns
the message to be sent to Pid3
.
As we already said, sending messages to nonexistent processes will
always succeed. To test this, let’s make the shell process crash with an
illegal operation. Crashing is the same as an abnormal process
termination, something that is considered normal in Erlang, in the sense
that Erlang provides mechanisms to deal with it. We will cover abnormal
process terminations in more detail in the next chapter, so until then, do
not get alarmed. Making the shell crash will automatically result in a new
shell process—in this example with pid <0.38.0>
—being spawned by the runtime
system.
With this in mind, we locate the shell pid, make the shell process terminate, and then send a message to it. Based on the semantics of message passing, this will result in the message being thrown away:
7>self().
<0.30.0> 8>1/0.
** exception error: bad argument in an arithmetic expression in operator '/'/2 called as 1 / 0 9>self().
<0.38.0> 10>pid(0,30,0) ! hello.
hello 11>flush().
ok
The reason that message passing and spawn
always succeed, even if the recipient
process does not exist or the spawned process crashes on creation, has to
do with process dependencies, or rather, their
deliberate lack of dependencies. We say that process A depends
on process B when the fact of B terminating can prevent A from
functioning correctly.
Process dependencies are very important and will often influence your design. In massively concurrent systems, you do not want processes to depend on each other unless explicitly specified, and in such cases, you want to have as few dependencies as possible. To give a concrete example of this, imagine an IM server concurrently handling thousands of messages being exchanged by its users. Each message is handled by a process spawned for that particular function. If, due to a bug, one of these processes terminates, you would lose that particular message. Ensuring a lack of dependency between this process and the processes handling all the other messages guarantees that these messages are safely processed and delivered to their recipients regardless of the bug.
Receiving Messages
Messages are retrieved from the process mailbox using the receive
clause. The
receive
clause is a construct delimited
by the reserved words receive
and
end
, and contains a number of
clauses. These clauses are similar to case
clauses, with a pattern in the
head (to the left of the arrow) and a sequence of
expressions in the body (to the right).
On executing the receive
statement, the first (and oldest) message in the mailbox is
pattern-matched against each pattern in the receive
expression in turn:
If a successful match occurs, the message is retrieved from the mailbox, the variables in the pattern are bound to the matching parts of the message, and the body of the clause is executed.
If none of the clauses matches, the subsequent messages in the mailbox are pattern-matched one by one against all of the clauses until either a message matches a clause or all of the messages have failed all of the possible pattern matches.
In the following example, if the message {reset
,
151}
is sent to the process executing the
receive
statement, the first clause
will pattern-match, resulting in the variable Board
being bound to the integer 151
. This will result in the function reset(151)
being called:
receive {reset, Board} -> reset(Board); _Other -> {error, unknown_msg} end
Assume now that two new messages—restart
and {reset
,
151}
—are received in that order by the executing
process. As soon as the execution flow of the process reaches the receive
statement, it will try to match the
oldest message in the mailbox, restart
.
Pattern matching will fail in the first clause, but will match
successfully in the second, binding the variable _Other
to the atom restart
. You can follow this example in Figure 4-3.
A receive
statement will return
the last evaluated expression in the body of the matched clause. In the
example, this will be either the return value of the reset/1
call or the tuple {error
,
unknown_msg}
. Although rarely done, it is
possible to write expressions such as Result =
receive Msg -> handle(Msg) end
, in which a variable (here,
Result
) is bound to the return value of
the receive
clause. It is considered a
better practice to make the receive
clause the body of a separate function, and to bind the return value of
the function to the variable.
Warning
Take a minute to think about the restart
message in the example. Other than
being a good practice, nothing is stopping us from sending the message
in the format {restart}
. It is a
common misconception that messages have to be sent in tuples: this is
not the case, as messages can consist of any valid Erlang
term.
Using tuples with only one element is unnecessary, as they consume more memory and are slower to process. It is considered a bad practice to use them not only as messages, but also as terms in your programs. The guideline is that if your tuple has only one element, use the element on its own, without the tuple construct.
When none of the clauses in a case
statement match, a runtime error occurs.
What happens in receive
statements? The
syntax and semantics of receive
are
very similar to those of case
, the main
difference being that the process is suspended in
receive
statements until a message is
matched, whereas in a case
statement a
runtime error occurs.
In general, a receive
statement
has the following form:
receivePattern1
whenGuard1
->exp11
, ..,exp1n
;Pattern2
whenGuard2
->exp21
, ..,exp2n
; ...Other
->expn1
, ..,expnn
end
The keywords used to delimit a receive
clause are receive
and end
. Each pattern consists of any valid Erlang
term, including bound and unbound variables as well as optional guards.
The expressions are valid Erlang terms or legal expressions that evaluate
to terms. The return value of the receive
clause will be the return value of the
last evaluated expression in the body executed, in this case,
expin
.
To ensure that the receive
statement always retrieves the first message in the mailbox you could use
an unbound variable (such as Other
in
the first example) or the “don’t care” variable if you are not interested
in its value.
Selective and Nonselective Receives
Look at Figure 4-4. What can you
deduce from the incoming messages in the receive
clause about Pid
being already bound? The variable Pid
is bound to the value passed to decode_digit
when it is called, so it is
already bound to this value in the pattern part of the receive
clause. On receiving a message, a
successful match will occur only if the first element of the tuple of
size 2 sent as a message is exactly equal to (remember the =:=
construct?) the value stored in the
variable Pid
.
We call this a selective receive, where we retrieve
only the messages we are explicitly interested in based on certain
criteria, leaving the remaining messages in the
mailbox. Selective receives often select on process identifiers,
but sequence references or other identifiers are also common, as are
tagged tuples and guards. Now, contrast the bound variable Pid
in Figure 4-4 with the unbound
variable DigitList
in Figure 4-5, which will be
bound only once a message has been received.
In concurrent systems, it is common for race conditions to occur. A race condition occurs when the behavior of a system depends on the order in which certain events occur: these events “race” to influence the behavior. Due to the indeterminate nature of concurrency it is not possible to determine the order in which messages sent by different processes will arrive at the receiving process. This is when selective receive becomes crucial.
In Figure 4-5,
Pid3
will receive the message
foo
followed by bar
regardless of the order in which they are
delivered. Selective receive of multiple messages is useful when synchronizing processes in a rendezvous,
or when data from several sources needs to be collected before it can be
processed. Contrast this with a programming language in which it is
possible to process messages only in the order in which they arrive: the
code for this would have to deal with the possibility of bar
preceding foo
and of foo
preceding bar
; the code becomes more complex, and much
more likely to contain potential flaws.
If the order of the messages is unimportant, you can just bind
them to a variable. In Figure 4-6, the first message
to arrive at the process Pid3
will be
processed, regardless of the order in which the messages were sent. The
variable Msg
in the receive
statement will be bound to one of the
atoms foo
or bar
, depending on which is delivered
first.
Figure 4-7 demonstrates
how processes share data with each other. The process with PidA
will send a tagged tuple with its own
process identifier, retrieved through a call to the BIF self()
, to the
process with PidB
. PidB
will receive it, binding PidA
’s value to the variable Pid
. A new tagged tuple is sent to PidC
, which also pattern-matches the message
in its receive
statement and binds
the value of PidA
to the variable
Pid
. PidC
now uses the value bound to Pid
to send the message foo
back to PidA
. In this way, processes can share
information about each other, allowing communication between processes
that initially did not have knowledge of each other.
As processes do not share memory, the only way for them to share data is through message passing. Passing a message results in the data in the message being copied from the heap of the sending process to the heap of the receiving one, so this does not result in the two processes sharing a storage location (which each might read or write) but only in them each having their own copy of the data.
An Echo Example
Now that we have covered process creation and message passing,
let’s use spawn
, send
, and receive
, in a small program. Open your editor
and copy the contents of Example 4-1 or download
it from the book’s website. When doing so,
do not forget to export the function you are spawning, in this
case loop/0
. In the
example, pay particular notice to the fact that two different processes
will be executing and interacting with each other using code defined in
the same module.
-module(echo). -export([go/0, loop/0]). go() -> Pid = spawn(echo, loop, []), Pid ! {self(), hello}, receive {Pid, Msg} -> io:format("~w~n",[Msg]) end, Pid ! stop. loop() -> receive {From, Msg} -> From ! {self(), Msg}, loop(); stop -> true end.
So, what does this program do? Calling the function go/0
will initiate a
process whose first action is to spawn a child process. This child
process starts executing in the loop/0
function and is immediately
suspended in the receive
clause, as
its mailbox is empty. The parent, still executing in go/0
, uses the Pid
for the child process, which is bound as a
return value from the spawn
BIF, to
send the child a message containing a tuple with the parent’s process
identifier (given as a result of calling self()
) and the atom hello
.
As soon as the message is sent, the parent is suspended in a
receive
clause. The child, which is
waiting for an incoming message, successfully pattern-matches the
{Pid, Msg}
tuple where Pid
is matched to the process identifier
belonging to the parent and Msg
is
matched to the atom hello
. The child
process uses the Pid
to return the
message {self(), hello}
back to the
parent, where this call to self()
returns the pid of the child. See Figure 4-8 for a visual depiction of
this process.
At this point, the parent is suspended in the receive
clause, and is waiting for a message.
Note that it will only pattern-match on the tuple {Pid, Msg}
, where the variable Pid
is already bound (as a result of the
spawn
BIF) to the pid of the child
process. This is a good (but not entirely secure) way to ensure that the
message you receive is, in fact, a message you are expecting, and not
just any message consisting of a tuple with two elements sent by another
process. The message arrives and is successfully pattern-matched. The
atom hello
is bound to the Msg
variable, which is passed as an argument
to the io:format/2
call,
printing it out in the shell. As soon as the parent has printed the atom
hello
in the shell, it sends the atom
stop
back to the child.
What has the child been doing while the parent was busy receiving
the reply and printing it? Remember that processes will terminate if
they have no more code to execute, so to avoid terminating, the child
called the loop/0
function
recursively, suspending it in the receive
clause. It receives the stop
message sent to it by its parent, returns
the atom true
as its result, and
terminates normally.
Try running the program in the shell and see what happens:
1>c(echo).
{ok,echo} 2>echo:go().
hello stop
The atom hello
is clearly the
result of the io:format/2
call, but
where does the atom stop
come from?
It is the value returned as the result of calling echo:go/0
. To further familiarize yourself
with concurrency, experiment with the echo example, putting io:format/2
statements in the loop/0
process and sending different messages
to it. You could also experiment with the go/0
process, allowing it to send and receive
more than one message. When experimenting, you will most likely get the
shell to hang in a receive
clause
that will not match. If this happens, you will need to kill the shell
and start again.
Registered Processes
It is not always practical to use pids to communicate with processes. To use
a pid, a process needs to be notified of it and store its
value. It is common to register processes that offer
specific services with an alias, a
name that can be used instead of the pid. You register a process with the
register(Alias, Pid)
BIF, where
Alias
is an atom and Pid
is the process identifier. You do not have
to be a parent or a child of the process to call the register
BIF; you just need to know its process
identifier.
Once a process has been registered, any process can send a message
to it without having to be aware of its identifier (see Figure 4-9). All the process
needs to do is use the Alias ! Message
construct. In programs, the alias is usually hardcoded in. Other BIFs
which are directly related to process registration include unregister(Pid)
; registered()
, which returns a list of
registered names; and whereis(Alias)
, which
returns the pid associated with the Alias
.
Look at Example 4-2, which is a
variant of Example 4-1. We have removed the
Pid!stop
expression at the end of the go/0
function, and instead of binding the return
value of spawn/3
, we pass it as the
second argument to the register
BIF.
The first argument to register
is
echo
, the atom we use to name the
process. This alias is used to send the message to the newly spawned
child.
-module(echo). -export([go/0, loop/0]). go() -> register(echo, spawn(echo, loop, [])), echo ! {self(), hello}, receive {_Pid, Msg} -> io:format("~w~n",[Msg]) end. loop() -> receive {From, Msg} -> From ! {self(), Msg}, loop(); stop -> true end.
It is not mandatory, but it is considered a good practice to give your process the same name as the module in which it is defined.
Update your echo
module with the
changes we just discussed and try out the new BIFs you have just read
about in the shell. Test the new implementation of echo
, inspecting its state with the i()
and regs()
shell commands. Note how the shell
process sends the stop
message to the
echo process without knowing its pid, and how whereis/1
returns undefined
if the process does not exist:
1>c(echo).
{ok,echo} 2>echo:go().
hello ok 3>whereis(echo).
<0.37.0> 4> echo ! stop. stop 5>whereis(echo).
undefined 6>regs().
** Registered procs on node nonode@nohost ** Name Pid Initial Call Reds Msgs application_controlle <0.5.0> erlang:apply/2 4426 0 code_server <0.20.0> erlang:apply/2 112203 0 ddll_server <0.10.0> erl_ddll:init/1 32 0 erl_prim_loader <0.2.0> erlang:apply/2 206631 0 error_logger <0.4.0> gen_event:init_it/6 209 0 file_server <0.19.0> erlang:apply/2 12 0 file_server_2 <0.18.0> file_server:init/1 25411 0 global_group <0.17.0> global_group:init/1 71 0 global_name_server <0.12.0> global:init/1 60 0 inet_db <0.15.0> inet_db:init/1 103 0 init <0.0.0> otp_ring0:start/2 5017 0 kernel_safe_sup <0.26.0> supervisor:kernel/1 61 0 kernel_sup <0.9.0> supervisor:kernel/1 1377 0 rex <0.11.0> rpc:init/1 44 0 user <0.23.0> user:server/2 1459 0 ** Registered ports on node nonode@nohost ** Name Id Command ok
The shell command regs()
prints
out all the registered processes. It might be an alternative to i()
when retrieving system information in a
system with large quantities of processes. In the preceding example, the
echo
process is not among the processes
listed, as we have stopped it. Instead, you are seeing all of the
registered system processes.
Warning
It is a feature of Erlang memory management that atoms are not
garbage collected. Once you’ve created an atom, it remains in the atom
table regardless of whether it is referenced in the code. This can be a
potential problem if you decide to register
transient processes with an alias derived from
converting a string to an atom with the list_to_atom/1
BIF. If you have millions of
users logging on to your system every day and you create a registered
process for the duration of their sessions, don’t be surprised if you
end up running out of memory.
You would be much better off storing the mapping of users to pids
in a session table. It is best to register only
processes with a long life span, and if you really must convert a string
to use as an alias, use list_to_existing_atom/1
to ensure that your
system does not suffer memory leakages.
Sending messages to nonexistent registered processes causes the
calling process to terminate with a badarg
(see Figure 4-10). This behavior is
different from sending a message to a process identifier for a nonexistent
process, as registered processes are assumed to provide a service. The absence of
a registered process is therefore treated as a bug. If your program might
be sending messages to nonexistent registered processes and you do not
want the calling process to terminate, wrap a try ... catch
around the call.
Timeouts
You saw that if a process enters a receive
statement and none of the messages
matches, the process will get suspended. This could be similar to you
going to your mailbox at home, discovering there is no mail, and being
forced to wait there until the mailman arrives. It might be an option if
you are waiting for very urgent mail or have nothing better to do. In most
cases, though, all you want to do is check the mailbox, and if nothing has
arrived, continue with your household chores. Erlang processes can do just
that by using the receive ... after
construct:
receivePattern1
whenGuard1
->exp11
, ..,exp1n
;Pattern2
whenGuard2
->exp21
, ..,exp2n
; ...Other
->expn1
, ..,expnn
after Timeout ->exp1
, ..,expn
end
When a process reaches the receive
statement and no
messages pattern-match, it will wait for Timeout
milliseconds. If after Timeout
milliseconds no message has arrived, the
expressions in the body of the after
clause are executed. Timeout
is an
integer denoting the time in milliseconds, or the atom infinity
. Using infinity
as a timeout value is the same as not
including the after
construct. It is
included, as Timeout
can be a variable
set every time the function is called, allowing the receive ... after
clause to behave as desired in
each call (see Figure 4-11).
Assume you have a process registered with the alias db
, which is acting as a database server. Every
time you want to look up an item, you send the database a message and wait
for a response. At busy times, however, the request might take too long to
be processed, so you return a timeout error by using the receive ... after
construct. When doing so, however, you will end up receiving the response
from the server after the timeout, risking that your replies get out of
sync with the sequence of requests sent to the database server. The next
time you send the database a request, you will match the oldest message in
your receive
clause. This message will
be the response sent back after the timeout, and not the response to the
request you just sent. When using receive
...
after
, you need to cater to these cases by
flushing your mailbox and ensuring it is empty. In doing so, your code
might look something like this:
read(Key) ->
flush(),
db ! {self(),{read, Key}},
receive
{read,R} -> {ok, R};
{error, Reason} -> {error, Reason}
after
1000 -> {error, timeout}
end.
flush() ->
receive
{read, _} -> flush();
{error, _} -> flush()
after 0 -> ok
end.
Another use for the receive ...
after
clause is to suspend a process for a period in
milliseconds, or to send messages delayed by a certain amount of time. The
definition of sleep/1
in the following
code is taken directly from the timer
library module, while send_after
will
send a message to the calling process after Time
milliseconds:
-module(my_timer). -export([send_after/2, sleep/1, send/3]). send_after(Time, Msg) -> spawn(my_timer, send, [self(),Time,Msg]). send(Pid, Time, Msg) -> receive after Time -> Pid ! Msg end. sleep(T) -> receive after T -> true end.
Benchmarking
In this chapter, we have been talking about the low process creation and message passing times in Erlang. To demonstrate them, let’s run a benchmark in which the parent spawns a child and sends a message to it. Upon being spawned, the child creates a new process and waits for a message from its parent. Upon receiving the message, it terminates normally. The child’s child creates yet another process, resulting in hundreds, thousands, and even millions of processes.
This is a sequential benchmark that will barely take advantage of SMP on a multicore system, because at any one time, only a couple of processes will be executing in parallel:
-module(myring). -export([start/1, start_proc/2]). start(Num) -> start_proc(Num, self()). start_proc(0, Pid) -> Pid ! ok; start_proc(Num, Pid) -> NPid = spawn(?MODULE, start_proc, [Num-1, Pid]), NPid ! ok, receive ok -> ok end.
Let’s test the preceding example for 100,000, 1 million, and 10 million processes. To benchmark the program, we use the function call:
timer:tc(Module, Function, Arguments)
which takes a function and its arguments and executes it. It returns a tuple containing the time in microseconds it took to run the function alongside the return value of the function. Testing the program shows that it takes 0.48 seconds to spawn 100,000 processes, 4.2 seconds to spawn 1 million processes, and about 40 seconds to spawn 10 million processes. Try it out on your computer:
1>c(myring).
{ok,myring} 2>timer:tc(myring, start, [100000]).
{484000,ok} 3>timer:tc(myring, start, [1000000]).
{4289360,ok} 4>timer:tc(myring, start, [10000000]).
{40572800,ok}
Process Skeletons
There is a common pattern to the behavior of
processes, regardless of their particular purpose. Processes have to be
spawned and their aliases registered. The first action of newly spawned
processes is to initialize the process loop data. The loop data is often
the result of arguments passed to the spawn
BIF and the
initialization of the process. It is stored in a variable we refer to as
the process state. The state is passed to
a receive-evaluate function, which receives a message, handles it, updates
the state, and passes it back as an argument to a tail-recursive call. If
one of the messages it handles is a stop
message, the receiving process will clean
up after itself and terminate. This is a recurring theme among processes
that we usually refer to as a design pattern, and it will occur
regardless of the task the process has been assigned to perform. Figure 4-12 shows an example skeleton.
From reoccurring patterns, let’s now look at differences among processes:
The arguments passed to the
spawn
BIF calls will differ from one process to another.You have to decide whether you should register a process, and, if you do register it, what alias should be used.
In the function that initializes the process state, the actions taken will differ based on the tasks the process will perform.
The storing of the process state might be generic, but its contents will vary among processes.
When in the receive-evaluate loop, processes will receive different messages and handle them in different ways.
And finally, on termination, the cleanup will vary from process to process.
So, even if a skeleton of generic actions exists, these actions are complemented by specific ones that are directly related to the specific tasks assigned to the process.
Tail Recursion and Memory Leaks
We mentioned earlier that if processes have no more code to execute, they
terminate. Suppose you want to write an echo process that indefinitely
continues to send back the message it has received (or that does this
until you explicitly send it a message to stop). You would keep the Erlang
process alive using a tail-recursive call to the function that contains
the receive
statement. We often call
this function the receive/evaluate loop of the process. Its task is to
receive a message, handle it, and then recursively call itself.
This is where the importance of tail recursion in concurrent programming becomes evident. As you do not know how many times the function is going to be called, you must ensure that it executes in constant memory space without increasing the recursive call stack every time a message is handled. It is common to have processes handling thousands of messages per second over sustained periods of not only hours, days, or months, but also years! Using tail recursion, where the very last thing the receive/evaluate function does is to call itself, you ensure that this nonstop operation is possible without memory leakages.
What happens when a message doesn’t match any of the clauses in a
receive
statement? It remains in the
process mailbox indefinitely, causing a memory leakage that over time
could also cause the runtime system to run out of memory and crash. Not
handling unknown messages should therefore be treated as a bug. Either
these messages should not have been sent to this process in the first
place, or they should be handled, possibly just by being retrieved from
the mailbox and ignored.
The defensive approach of ignoring unknown messages with a “don’t
care” variable in the receive
clause,
even if convenient, might not always be the best approach. Messages not
being handled should probably not have been sent to the process in the
first place. And if they were sent on purpose, they were probably not
matched because of a bug in one of the receive
clauses. Throwing these messages away
would only make the bug harder to detect. If you do throw unknown messages
away, make sure you log their occurrence so that the bugs can be found and
corrected.
A Case Study on Concurrency-Oriented Programming
When on consulting assignments around the world working with developers coming from a C++ and Java background who have learned Erlang on their own, a common theme we have come across is the use of processes. This theme is irrespective of the experience level of the developers, and of what their system does. Instead of creating a process for every truly concurrent activity in the system, they tend to create one for every task. Programming concurrent applications in Erlang requires a different strategy for processes, which in turn means reasoning in a different way to what one may be accustomed to. The main difference from other concurrent languages is that with Erlang, processes are so cheap it is best to use a process for each truly concurrent activity in your system, not for every task. This case study is from one of Francesco’s first consulting assignments outside of Ericsson, soon after Erlang had been released as open source, and it illustrates clearly what we mean by the difference between a task and an activity.
He worked with a group of engineers who were developing an IM proxy for Jabber. The system received a packet through a socket, decoded it, and took certain actions based on its content. Once the actions were completed, it encoded a reply and sent it to a different socket to be forwarded to the recipient. Only one packet at a time could come through a socket, but many sockets could simultaneously be receiving and handling packets.
As described in Figure 4-13, the original system did not have a process for every truly concurrent activity—the processing of a packet from end to end—but instead used a process for every different task—decoding, encoding, and so forth. Each open socket in Erlang was associated with a process that was receiving and sending data through this socket. Once the packet was received, it was forwarded to a process that handled the decoding. Once decoded, the decoding process forwarded it to the handler that processed it. The result was sent to an encoding process, which after formatting it, forwarded the reply to the socket process that held the open connection belonging to the recipient.
At its best performance, the system could process five simultaneous messages, with the decoding, handling, and encoding being the bottleneck, regardless of the number of simultaneously connected sockets. There were two other processes, one used for error handling, where all errors were passed, and one managing a database, where data reads, writes, and deletes were executed.
When reviewing the system, we identified what we believed was a truly concurrent activity in the system. It was not the action of decoding, handling, and encoding that was the answer, but the handling of the individual packets themselves. Having a process for every packet and using that process to decode, handle, and encode the packet meant that if thousands of packets were received simultaneously, they would all be processed in parallel. Knowing that a socket can receive only one packet at any one time meant that we could use this socket process to handle the call. Once the packet was received, a function call ensured that it was decoded and handled. The result (possibly an error) was encoded and sent to the socket process managing the connection belonging to the final recipient. The error handler and database processes were not needed, as the consistency of data through the serialization of destructive database operations could have been achieved in the handler process, as could the management of errors.
If you look at Figure 4-14, you will notice that on top of the socket processes, a database process was added to the rewritten program. This was to ensure that data consistency was maintained, as many processes accessing the same data might corrupt it as a result of a race condition. All destructive database operations such as write and delete were serialized through this process. Even if you can execute most of your activities in parallel, it is essential to identify activities that need serializing and place them in a process of their own. By taking care in identifying truly concurrent activites in your Erlang system, and spawning a process for each, you will ensure that you maximize the throughput while reducing the risk of bottlenecks.
Race Conditions, Deadlocks, and Process Starvation
Anyone who has programmed concurrent applications before moving to Erlang will have his favorite horror stories on memory corruption, deadlocks, race conditions, and process starvation. Some of these conditions arise as a result of shared memory and the need for semaphores to access them. Others are as a result of priorities. Having a “no shared data” approach, where the only way for processes to share data is by copying data from one process to another, removes the need for locks, and as a result, the vast majority of bugs related to memory corruption deadlocks and race conditions.
Problems in concurrent programs may also arise as a result of
synchronous message passing, especially if the communication is across a
network. Erlang solves this through asynchronous message passing. And
finally, the scheduler, the per-process garbage collection mechanisms, and
the massive level of concurrency that can be supported in Erlang systems
ensure that all processes get a relatively fair time slice when executing.
In most systems, you can expect a majority of the processes to be
suspended in a receive
statement,
waiting for an event to trigger a chain of actions.
That being said, Erlang is not completely problem-free. You can avoid these problems, however, through careful and well-thought-out design. Let’s start with race conditions. If two processes are executing the following code in parallel, what can go wrong?
start() -> case whereis(db_server) of undefined -> Pid = spawn(db_server, init, []), register(db_server, Pid), {ok, Pid}; Pid when is_pid(Pid) -> {error, already_started} end.
Assume that the database server process has not been started and two
processes simultaneously start executing the start/0
function. The
first process calls whereis(db_server)
,
which returns the atom undefined
. This
pattern-matches the first clause, and as a result, a new database server
is spawned. Its process identifier is bound to the variable Pid
. If, after spawning the database server, the
process runs out of reductions and is preempted, this will allow the
second process to start executing. The call whereis(db_server)
by the second process also
returns undefined
, as the first process
had not gotten as far as registering it. The second process spawns the
database server and might go a little further than the first one,
registering it with the name db_server
.
At this stage, the second process is preempted and the first process
continues where it left off. It tries to register the database server it
created with the name db_server
but
fails with a runtime error, as there already is a process with that name.
What we would have expected is the tuple {error,
already_started}
to be returned, instead of a runtime error.
Race conditions such as this one in Erlang are rare, but they do happen
when you might least expect them. A variant of the preceding example was
taken from one of the early Erlang libraries and reported as a bug in
1996.
A second potential problem to keep in mind involves deadlocks. A good design of a system based on client/server principles is often enough to guarantee that your application will be deadlock-free. The only rule you have to follow is that if process A sends a message and waits for a response from process B, in effect doing a synchronous call, process B is not allowed, anywhere in its code, to do a synchronous call to process A, as the messages might cross and cause the deadlock. Deadlocks are extremely rare in Erlang as a direct result of the way in which programs are structured. In those rare occasions where they slip through the design phase, they are caught at a very early stage of testing.[15]
By calling the BIF process_flag(priority,
Priority)
, where Priority
can
be set to the atom high
, normal
, or low
, the behavior of the scheduler can be
changed, giving processes with higher priority a precedence when being
dispatched. Not only should you use this feature sparingly; in fact, you
should not use it at all! As large parts of the Erlang runtime system are
written in Erlang running at a normal
priority, you will end up with deadlocks, starvation, and in extreme
cases, a scheduler that gives low-priority processes more CPU time than
its high-priority counterparts. With SMP, this behavior becomes even more
non-deterministic. Endless flame wars and arguments regarding process and
priorities have been fought on the Erlang-questions mailing list,
deserving a whole chapter on the subject. We will limit ourselves to
saying that under no circumstances should you use process priorities. A
proper design of your concurrency model will ensure that your system is
well balanced and deterministic, with no process starvation, deadlocks, or
race conditions. You have been warned!
The Process Manager
The process manager is a debugging tool used to inspect the state of processes in
Erlang systems. Whereas the debugger concentrates on tracing the
sequential aspects of your program, the process manager deals with the
concurrent ones. You can start the process manager by writing pman:start()
in the shell. A window will open
(see Figure 4-15), displaying contents
similar to what you saw when experimenting with the i()
command. Double-clicking any of the
processes will open a trace output window. You can choose your settings by
picking options in the File menu.
For each process with an output window, you can trace all the messages that are sent and received. You can trace BIF and function calls, as well as concurrency-related events, such as processes being spawned or terminated. Your can also redirect your trace output from the window to a file. Finally, you can pick the inheritance level of your trace events. A very detailed and well-written user guide comes with the Erlang distribution that we recommend as further reading.
Warning
At the time of writing, because of its underlying TCL/TK graphics libraries that are no longer supported, the process manager can be unstable when running on Microsoft Windows operating systems.
This chapter introduced the basics of concurrency in Erlang, which
is based on message passing between concurrent processes, rather than on
shared memory. Message passing is asynchronous, and the selective receive
facility, under which messages can be
handled independently of the order in which they are received, allows
modular and concise concurrent programs to be written. In the next
chapter, we’ll build on this introduction and look at design patterns for
process-based systems.
Exercises
Exercise 4-1: An Echo Server
Write the server in Figure 4-16 that will wait in a receive loop until a message is sent to it. Depending on the message, it should either print its contents and loop again, or terminate. You want to hide the fact that you are dealing with a process, and access its services through a functional interface, which you can call from the shell.
This functional interface, exported in the echo.erl
module, will spawn the process and
send messages to it. The function interfaces are shown here:
echo:start() ⇒ ok echo:print(Term) ⇒ ok echo:stop() ⇒ ok
Hint: use the register/2
built-in function, and test your echo server using the process
manager.
Warning: use an internal message protocol to avoid stopping the
process when you, for example, call the function echo:print(stop)
.
Exercise 4-2: The Process Ring
Write a program that will create N processes
connected in a ring, as shown in Figure 4-17.
Once started, these processes will send M number of
messages around the ring and then terminate gracefully when they receive
a quit message. You can start the ring with the call ring:start(M, N, Message)
.
There are two basic strategies to tackling this exercise. The first one is to have a central process that sets up the ring and initiates sending the message. The second strategy consists of the new process spawning the next process in the ring. With this strategy, you have to find a method to connect the first process to the second process.
Regardless of the strategy you choose, make sure you have solved
this exercise with pen and paper before you start coding. It differs
from the ones you have solved before because you will have many
processes executing the same function in the same module at the same
time. Furthermore, processes will be using this function to interact
with each other. When writing your program, make sure your code has many
io:format
statements in every loop
iteration. This will give you a complete overview of what is happening
(or not happening) and should help you solve the exercise.
[13] Partially because the return values of the operations in the shell are cached.
[14] The number of reductions will vary between releases. In the R12 release, the number of reductions starts at 2,000 and is reduced by one for every operation. In R13, the number of initial reductions depends on the number of scheduler threads.
[15] In 15 years of working with Erlang on systems with millions of lines of code, Francesco has come across only one deadlock that made it as far as the integration phase.
Get Erlang Programming now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.