Up until now, we have been considering programs that run on a single
machine, while possibly making use of multiple processors to exploit
parallelism. But there is a far more plentiful source of parallelism: running a program on multiple machines
simultaneously. We call this distributed programming, and
Haskell supports it through a framework called distributed-process
.[50]
Aside from the obvious advantages of multimachine parallelism, there are other reasons to write distributed programs. For example:
- A distributed server can make more efficient use of network resources by moving the servers closer to the clients. We will see an example of this in A Distributed Chat Server.
- A distributed program can exploit a heterogeneous environment, where certain resources are available only to certain machines. An example of this might be a cluster of machines with local disks, where a large data structure is spread across the disks and we wish to run our computation on the machine that has the appropriate part of the data structure on its local disk.
So what should distributed programming look like from the programmer’s
perspective? Should it look like Concurrent Haskell, with forkIO
,
MVar
, and STM
? In fact, there are some good reasons to treat
distributed computation very differently from computation on a
shared-memory multicore:
- There is a realistic possibility of partial hardware failure: that is, some of the machines involved in a computation may go down while others continue to run. Indeed, given a large enough cluster of machines, having nodes go down becomes the norm. It would be unacceptable to simply abort the entire program in this case. Recovery is likely to be application-specific, so it makes sense to make failure visible to the programmer and let him handle it in an appropriate way for his application.
- Communication time becomes significant. In the shared-memory setting, it is convenient and practical to allow unrestricted sharing. This is because, for example, passing a pointer to a large data structure from one thread to another has no cost (beyond the costs imposed by the hardware and the runtime memory manager, but again it is convenient and practical to ignore these). In a distributed setting, however, communication can be costly, and sharing a data structure between threads is something the programmer will want to think about and explicitly control.
- In a distributed setting, it becomes far more difficult to provide any global consistency guarantees of the kind that, for example, STM provides in the shared-memory setting. Achieving a consistent view of the state of the system becomes a very hard problem indeed. There are algorithms for achieving agreement between nodes in a distributed system, but the exact nature of the consistency requirements depend on the application, so we don’t want to build a particular algorithm into the system.
For these reasons, the Haskell developers decided that the model for
distributed programming should be based on explicit message
passing, and not the MVar
and STM
models that we provide for
shared-memory concurrency.[51] Think of it as having TChan
be the basic
primitive available for communication. It is possible to build
higher-level abstractions on top of the explicit message-passing
layer, just as we built higher-level abstractions on top of STM and
MVar
in earlier chapters.
There is no built-in support for distributed programming in Haskell. It is all implemented as libraries using the concurrency facilities we have covered in earlier chapters.
The package providing the core APIs for distributed programming is
called distributed-process
. It must be used
together with a separate transport layer package that provides
infrastructure for sending and receiving messages between nodes in the
distributed network. The distributed-process
package is
deliberately independent of the transport layer so we can plug
in different transport layer implementations. The most common
transport layer is likely to be TCP/IP, as provided by the network-transport-tcp
package, but we could imagine a transport
layer that used shared memory to communicate among multiple nodes on
the same multicore machine, or transport layers supporting some of the
faster networks designed for clusters, such as InfiniBand.
Each transport layer needs a different mechanism for creating and
shutting down nodes on the network and discovering which nodes are
available (peer discovery). We will be using the package
distributed-process-simplelocalnet
that provides a simple
implementation on top of the network-transport-tcp
transport layer.
At the time of writing, the distributed-process
framework is
somewhat new and a little rough around the edges, but it is already
quite fully featured and we expect it to mature in due
course.[52]
It is reasonable to wonder whether we even need a framework to do
distributed message-passing. After all, can’t we just use the
network
package directly and program our own message passing?
Certainly you could do this, but the packages described in this chapter provide a lot
of functionality that makes it much easier to build a distributed
application. They let you think about your application as a single program that happens to run on multiple machines, rather than a collection of
programs running on different machines that talk to one another.
For example, with the distributed-process
framework, we can call a
function spawn
that spawns a process (like a thread) on a different
machine, and we can exchange messages with the remote process directly in the
form of Haskell data types. Even though we are writing a single
program to execute on multiple machines, there is no need for all the
machines to be identical; indeed, programmers often want to
exploit some non-uniformity. For example, we might want to run a
caching service on a machine with lots of memory while sending
compute-intensive tasks to machines with lots of fast cores. There
may also be nonuniformity in the network topology. We might want to
perform a database query on a machine close to the database server,
for example, or put services that communicate with each other
frequently close to one another in the network.
The distributed-process
framework provides a whole infrastructure suite that
supports the distributed application domain. These are some of the important
facilities it provides:
We have included distribution in the concurrency part of this book for the simple reason that the explicit message-passing API we’ll describe is concurrent and nondeterministic. And yet, the main reason to want to use distribution is to exploit the parallelism of running on multiple machines simultaneously. So this setting is similar to parallel programming using threads described in Chapter 13, except that here we have only message passing and no shared state for coordination.
It is a little unfortunate that we have to resort to a
nondeterministic programming model to achieve parallelism just
because we want to exploit multiple machines. There are efforts
under way to build deterministic programming models atop the
distributed-process
framework, although at the time of writing these projects are too
experimental to include in this book.[53]
To get acquainted with the basics of distributed programming, we will start with a simple example: a ping/pong message exchange. To start with, there will be a single master process that creates a child process. The master process will send a “ping” message to the child, which will respond with a “pong” message and the program will then exit.
The ping example will illustrate the basic pattern for setting up a
program to use the distributed-process
framework and introduce the APIs for
creating processes and simple message passing. The first version of
the program will run on a single node (machine) so we can
get familiar with the basics of the interface before moving on to
working with multiple nodes.
For reference, the subset of the Control.Distributed.Process
API that we will be using is shown here:
data
Process
-- instance Monad, MonadIO
data
NodeId
-- instance Eq, Ord, Show, Typeable, Binary
data
ProcessId
-- instance Eq, Ord, Show, Typeable, Binary
getSelfPid
::
Process
ProcessId
getSelfNode
::
Process
NodeId
spawn
::
NodeId
->
Closure
(
Process
()
)
->
Process
ProcessId
send
::
Serializable
a
=>
ProcessId
->
a
->
Process
()
expect
::
Serializable
a
=>
Process
a
terminate
::
Process
a
say
::
String
->
Process
()
First, a bit of terminology. A distributed program consists of a set of
processes that may communicate with one another by sending and
receiving messages. A process is like a thread. Processes run
concurrently with one another, and every process has
a unique ProcessId
. There are a
couple of important differences between threads and processes,
however:
- Threads are always created on the current node, whereas a process can be created on a remote node (we won’t be using this facility until the next section, though).
-
Processes run in the
Process
monad, rather than theIO
monad.Process
is an instance ofMonadIO
, so you can performIO
operations inProcess
by wrapping them inliftIO
. All message-passing operations are inProcess
, so only processes, not threads, can engage in message passing.
We start by defining the type of messages that our processes will send and receive:
distrib-ping/ping.hs
data
Message
=
Ping
ProcessId
|
Pong
ProcessId
deriving
(
Typeable
,
Generic
)
--
instance
Binary
Message
--
The Ping
message contains the ProcessId
of the process that sent
it so that the target of the message knows where to send the response.
The Pong
response also includes the ProcessId
of the responder so
that the master process can tell which process a particular response
comes from.
Messages in a distributed program can be sent over the network, which
Involves serializing the Haskell data into a stream of bytes before
it is sent and deserializing the bytes back into Haskell data at
the other end. The distributed-process
framework uses the Binary
class from the binary
package to implement serialization and
deserialization, and hence every message type must be an instance of
Binary
.
The serialization format is under your control. If you want, you can
define your own Binary
instance that uses a specialized
serialization format. Normally, however, you’ll just want an
automatically derived Binary
instance. Fortunately, the binary
package[54] lets you derive
Binary
instances using GHC’s DeriveGeneric
extension.[55] To do this, we first derive the Generic
class () and then declare an instance
of Binary
for Message
(); GHC
fills in the method definitions of this instance for us.
Message types must also be an instance of Typeable
, because they can
be sent to dynamically typed channels (more about this later). For
Typeable
, we can derive the instance directly ().
Typeable
and Binary
are normally packaged up together and referred
to as Serializable
using the following class provided by Control.Distributed.Process.Serializable
:
class
(
Binary
a
,
Typeable
a
)
=>
Serializable
a
instance
(
Binary
a
,
Typeable
a
)
=>
Serializable
a
There’s nothing magic about Serializable
. Just think of
Serializable a
as shorthand for (Binary a, Typeable a)
. You’ll
see Serializable
used a lot in the Control.Distributed.Process
APIs.
Next, we’ll write the code for a “ping server” process. The ping
server must wait for a Ping
message and then respond with a
Pong
message.
pingServer
::
Process
()
pingServer
=
do
Ping
from
<-
expect
--
say
$
printf
"ping received from %s"
(
show
from
)
--
mypid
<-
getSelfPid
--
send
from
(
Pong
mypid
)
--
First of all, notice that we are in the Process
monad. As we
mentioned earlier, virtually all of the Control.Distributed.Process
API is in this monad,
and only code running in the Process
monad can communicate with
other processes and spawn new processes. There has to be a way to get
into Process
in the first place; we’ll see how that happens
shortly, but for now let’s assume we’re already in Process
and we
need to program the ping server.
At we receive the next message
using expect
:
expect
::
Serializable
a
=>
Process
a
The expect
function receives a message sent directly to this
process. Each process has a channel associated with it, and the
channel can receive messages of any type. The expect
call receives
a message of a particular type, where the type is determined by the
context. If the type cannot be determined, the compiler will
complain that the type is ambiguous, and the usual fix is to add a
type signature. In the example just shown, the type of messages to receive
is determined by the pattern match on the result, which matches
directly on the Ping
constructor and thus forces expect
to receive
messages of the type Message
.
The expect
function is a little like Haskell’s read
function, in
that it returns a value whose type depends on the context. But
whereas read
fails if its argument cannot be parsed as the desired
type, expect
skips over messages in the queue that do not match and
returns the first one that matches. Messages that don’t match the
expected type are left in the channel for the time being.
If there are no messages of the right type, expect
will block until
one arrives. Therefore, it should be used with care: the other
messages in the queue are ignored while expect
is waiting for the
right kind of message to arrive, which could lead to a deadlock.
We’ll see later how to wait for several different types of message at
the same time.
The say
function, called at ,
causes a message to be logged, which is a useful way to debug your
program. Usually, the message will be logged to stderr
, but it might
be sent somewhere else if the transport layer overrides the default
logging process.
At we call getSelfPid
to
obtain the ProcessId
of the current process. The ProcessId
of the
current process is needed because the Pong
message will contain it:
getSelfPid
::
Process
ProcessId
And at we send a response back
to the originator of the Ping
. The function send
is used to send
a message to a process, and it has the following type:
send
::
(
Serializable
a
)
=>
ProcessId
->
a
->
Process
()
We know which ProcessId
to send the Pong
to because it was
contained in the original Ping
message.
Now we need to be able to create processes running pingServer
.
Although in this example we will be creating the process on the local
node, in general we might be creating the process on another
node. Functions that will be executed remotely in this way need to be
declared explicitly.[56] The
following declaration invokes a bit of Template Haskell magic that creates the necessary infrastructure to allow pingServer
to be executed remotely:[57]
remotable
[
'pingServer
]
Next, we will write the code for the master process. As you might expect, this is an operation of type Process ()
:
master
::
Process
()
master
=
do
node
<-
getSelfNode
--
say
$
printf
"spawning on %s"
(
show
node
)
pid
<-
spawn
node
$
(
mkStaticClosure
'pingServer) --
mypid
<-
getSelfPid
--
say
$
printf
"sending ping to %s"
(
show
pid
)
send
pid
(
Ping
mypid
)
--
Pong
_
<-
expect
--
say
"pong."
terminate
--
Call
getSelfNode
, which returns theNodeId
of the current node. ANodeId
is needed when creating a new process.Call
spawn
to create the child process. Here is the function’s signature:spawn
::
NodeId
->
Closure
(
Process
()
)
->
Process
ProcessId
The
spawn
function creates a new process on the givenNodeId
(which here is the current node). The new process runs the computation supplied as the second argument tospawn
, which is a value of typeClosure (Process ())
. Ultimately, we want to spawn a computation of typeProcess ()
, but such values cannot be serialized because in practice a value of typeProcess ()
could refer to an arbitrary amount of local data, including things that cannot be sent to other nodes (such as aTVar
). Hence the typeClosure
is used to represent serializable computations.How do we get one of these? First, the function to call must be declared
remotable
, as we did above. Then, if there are no arguments to pass, the Template Haskell functionmkStaticClosure
generates the appropriate code for the closure. (If there are arguments, then we need to use a different function, which we will see later.)The
spawn
operation returns theProcessId
of the new process, which we bind topid
.Call
getSelfPid
to return theProcessId
of the current process. We need this to send in thePing
message.Send the
Ping
message to the child process.Call
expect
to receive thePong
message from the child process.Finally, terminate the process by calling
terminate
. In this case, simply returning frommaster
would terminate the process, but sometimes we need to end the process in a context where it is not practical to arrange the top-level function to return, and in those casesterminate
is useful. Moreover, it is good practice to indicate the end of the process explicitly.
All that remains to complete the program is to define our main
function, and here it is:
main
::
IO
()
main
=
distribMain
(
\
_
->
master
)
Main
.
__remoteTable
The main
function calls distribMain
from DistribUtils
, which is a small
module of utilities provided with the sample code to make these
examples a bit less cluttered. The distribMain
function is a wrapper around the lower-level startup facilities from the distributed-process-simplelocalnet
package. It starts up the distributed-process
framework with the
distributed-process-simplelocalnet
backend on a single node.
The first argument to distribMain
is the Process
computation to
run as the master process on the node. It has type [NodeId] ->
Process ()
, where the list of NodeId
s are the other nodes in our
distributed network. Because this example is
running on a single node, we ignore the [NodeId]
and just invoke the
master
function as our master process.
The second argument to distribMain
is the metadata used to execute
remote calls; in this case we pass Main.__remoteTable
, which is
generated by the Template Haskell call to remotable
we showed
earlier.
When you run the program, you should see output like this:[58]
$ ./ping pid://localhost:44444:0:3: spawning on nid://localhost:44444:0 pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4 pid://localhost:44444:0:4: ping received from pid://localhost:44444:0:3 pid://localhost:44444:0:3: pong.
Each of these messages corresponds to one of the calls to say
in the
example program, and they are tagged with the date, time, and
ProcessId
of the process that called say
.
In this section, we built the simplest distributed program possible: it spawns a single child process and performs a simple ping/pong message exchange. Here are the key things to take away:
-
To create a process, we call
spawn
, passing aNodeId
and aClosure (Process ())
. The former we got fromgetSelfNode
(there are other ways, which we will encounter shortly), and the latter was generated by a call to the Template Haskell functionmkStaticClosure
. -
Processes run in the
Process
monad, which is a layer over theIO
monad. -
Messages can be sent to a process using
send
and received by callingexpect
. Messages are ordinary Haskell data; the only requirement is that the type of the message is an instance of theBinary
andTypeable
classes.
There is a certain amount of boilerplate associated with distributed
programming: deriving Binary
instances, declaring remotable
functions with remotable
, starting up the framework with
distribMain
, and so on. Remember that the distributed-process
framework is
currently implemented as a library entirely in Haskell. There is no
support for distributed programming built into the language or GHC
itself, and this accounts for some of the boilerplate. As the
framework matures, distributed programming will likely become a
smoother experience.
The previous example showed how to create a process and exchange some simple messages. Now we will extend the program to be truly distributed. Instead of spawning a process on the local node, we will run the program on several nodes, create a process on each one, and perform the ping/pong protocol with all nodes simultaneously.
The Message
type and pingServer
remain exactly as before. The only
changes will be to the master
and main
functions. The new
master
function is shown below, along with a waitForPongs
helper function:
distrib-ping/ping-multi.hs
master
::
[
NodeId
]
->
Process
()
--master
peers
=
do
ps
<-
forM
peers
$
\
nid
->
do
--say
$
printf
"spawning on %s"
(
show
nid
)
spawn
nid
$
(
mkStaticClosure
'pingServer
)
mypid
<-
getSelfPid
forM_ps
$
\
pid
->
do
--say
$
printf
"pinging %s"
(
show
pid
)
send
pid
(
Ping
mypid
)
waitForPongs
ps
--say
"All pongs successfully received"
terminate
waitForPongs
::
[
ProcessId
]
->
Process
()
--waitForPongs
[]
=
return
()
waitForPongs
ps
=
do
m
<-
expect
case
m
of
Pong
p
->
waitForPongs
(
filter
(
/=
p
)
ps
)
_
->
say
"MASTER received ping"
>>
terminate
This time, the
master
process takes an argument of type[NodeId]
, containing aNodeId
for each node in the distributed network. This list is supplied by the framework when it starts up, after it has discovered the set of peers in the network. We’ll see shortly how to start up the program on multiple nodes.Spawn a new process on each of the peer nodes, and bind the resulting list of
ProcessId
s tops
.Call
waitForPongs
(defined below) to receive all the pong messages. WhenwaitForPongs
returns, the program emits a diagnostic and terminates.waitForPongs
is a simple algorithm that removes eachProcessId
from the list as its pong message is received and returns when the list is empty.
The main
function is almost the same as before:
main
::
IO
()
main
=
distribMain
master
Main
.
__remoteTable
The only difference is that the [Node]
argument gets passed along
to master
instead of being discarded here.
First, I’ll illustrate starting multiple nodes on the same machine and then progress on to multiple machines.
A distributed program consists of a single master node and one or more slave nodes. The master is the node that begins with a process running; the slave nodes just wait until processes are spawned on them.
Let’s start by creating two slave nodes:
$ ./ping-multi slave 44445 & [3] 58837 $ ./ping-multi slave 44446 & [4] 58847
The ping-multi
program takes two command-line arguments; these are
interpreted by the distrbMain
function and tell it how to
initialize the framework. The first argument is either master
or
slave
and indicates which kind of node to create. The second
argument is the TCP port number that this node should use to
communicate on, with the default being 44444.[59] Always use different port numbers
when creating multiple nodes on the same machine.
I used &
to create these as background processes in the shell. If
you’re on Windows, just open a few Command Prompt windows and run the
program in each one.
Having started the slaves, we now start the master node:
$ ./ping-multi pid://localhost:44444:0:3: spawning on nid://localhost:44445:0 pid://localhost:44444:0:3: spawning on nid://localhost:44446:0 pid://localhost:44444:0:3: pinging pid://localhost:44445:0:4 pid://localhost:44444:0:3: pinging pid://localhost:44446:0:4 pid://localhost:44446:0:4: ping received from pid://localhost:44444:0:3 pid://localhost:44445:0:4: ping received from pid://localhost:44444:0:3 pid://localhost:44444:0:3: All pongs successfully received
The first thing to note is that the master node automatically found
the two slave nodes. The distributed-process-simplelocalnet
package
includes a peer discovery mechanism that is designed to
automatically locate and connect to other instances running on the
same machine or other machines on the local network.
It is also possible to restart the master without restarting the
slaves—try invoking ping-multi
again, and you should see the same
result. The new master
node discovers and reconnects to the
existing slaves.
If we have multiple machines connected on the same network, we can run a distributed Haskell program on them. The first step is to distribute the binary to all the machines; every machine must be running the same binary. A mismatch in the binary on different machines can cause strange failures, such as errors when decoding messages.
Next, we start the slaves as before, but this time we start slaves on the remote machines and pass an extra argument:
$ ./ping-multi slave 192.168.1.100 44444 $ ./ping-multi slave 192.168.1.101 44444
(The above commands are executed on the appropriate machines.) The second argument is new and gives the IP address that identifies the slave. This is the address that the other nodes will use to contact it, so it must be an address that resolves to the correct machine. It doesn’t have to be an IP address, but using IP addresses is simpler and eliminates a potential source of failure (the DNS).
When the slaves are running, we can start the master:
$ ./ping-multi master 44444 pid://localhost:44444:0:3: spawning on nid://192.168.1.100:44444:0 pid://localhost:44444:0:3: spawning on nid://192.168.1.101:44444:0 pid://localhost:44444:0:3: pinging pid://192.168.1.100:44444:0:5 pid://localhost:44444:0:3: pinging pid://192.168.1.101:44444:0:5 pid://192.168.1.100:44444:0:5: ping received from pid://localhost:44444:0:3 pid://192.168.1.101:44444:0:5: ping received from pid://localhost:44444:0:3 pid://localhost:44444:0:3: All pongs successfully received
The program successfully identified the remote nodes, spawned a processes on each one, and exchanged ping-pong messages with the process on each node.
In the examples so far, we saw messages being delivered to a process and the process receiving the messages by using expect
. This scheme is quite convenient: we need to know only a process’s ProcessId
to
send it messages, and we can send it messages of any type. However,
all the messages for a process go into the same queue, which has a
couple of disadvantages:
-
Each time we call
expect
, the implementation has to search the queue for a message of the right type, which could be slow. -
If we are receiving messages of the same type from multiple senders,
then we need to explicitly include some information in the message that
lets us tell them apart (e.g., the
ProcessId
of the sender).
The distributed-process
framework provides an alternative means of
message passing based on typed channels, which addresses these two
problems. The interface is as follows:
data
SendPort
a
-- instance of Typeable, Binary
data
ReceivePort
a
newChan
::
Serializable
a
=>
Process
(
SendPort
a
,
ReceivePort
a
)
sendChan
::
Serializable
a
=>
SendPort
a
->
a
->
Process
()
receiveChan
::
Serializable
a
=>
ReceivePort
a
->
Process
a
A typed channel consists of two ports, a SendPort
and a ReceivePort
. Messages are sent to the SendPort
by sendChannel
and received from the ReceivePort
using receiveChannel
. As the name suggests, a typed channel can carry messages only of a particular
type.
Typed channels imply a different pattern of interaction. For example, suppose we were making a request to another process and expecting a response. Using typed channels, we could program this as follows:
- The client creates a new channel for an interaction.
-
The client sends the request, along with the
SendPort
. -
The server responds on the
SendPort
it was sent.
In general, the server might make its own channel and send that to the client, and the subsequent interaction would happen over these two channels.
The advantage of creating a channel to carry the response is that the client knows that a message arriving on this channel can only be a response to the original request, and it is not possible to mix up this response with other responses. The channel serves as a link between the original request and the response; we know that it is a response to this particular request, because it arrived on the right channel.
In the absence of typed channels, ensuring that the response can be uniquely identified would involve creating a new identifier to send along with the original message.[60]
Let’s look at how to modify the ping example to use typed channels:
data
Message
=
Ping
(
SendPort
ProcessId
)
deriving
(
Typeable
,
Generic
)
instance
Binary
Message
Note that we don’t need a Pong
message anymore. Instead, the Ping
message will contain a SendPort
on which to send the reply, and the
reply is just the ProcessId
of the sender. In fact, in this example
we don’t really need to send any content back at all—just sending
()
would be enough—but for the purposes of illustration we will
send back the ProcessId
.
pingServer
::
Process
()
pingServer
=
do
Ping
chan
<-
expect
say
$
printf
"ping received from %s"
(
show
chan
)
mypid
<-
getSelfPid
sendChan
chan
mypid
master
::
[
NodeId
]
->
Process
()
master
peers
=
do
ps
<-
forM
peers
$
\
nid
->
do
say
$
printf
"spawning on %s"
(
show
nid
)
spawn
nid
$
(
mkStaticClosure
'pingServer
)
mapM_ monitor ps
ports
<-
forM
ps
$
\
pid
->
do
say
$
printf
"pinging %s"
(
show
pid
)
(
sendport
,
recvport
)
<-
newChan
--send
pid
(
Ping
sendport
)
--return
recvport
forM_
ports
$
\
port
->
do
--_
<-
receiveChan
port
return
()
say
"All pongs successfully received"
terminate
This code is simpler than the previous version in
Multi-Node Ping. However, note that we still sent the Ping
messages directly to the process, rather than using a typed
channel. If we wanted to use a typed channel here too, things get more
complicated. We want to do something like this (considering just a
single worker for simplicity):
do
(
s1
,
r1
)
<-
newChan
spawn
nid
(
$
(
mkClosure
`
pingServer
)
r1
)
(
s2
,
r2
)
<-
newChan
sendChan
s1
(
Ping
s2
)
receiveChan
r2
This seems quite natural: we create a channel with send port s1
and
receive port r1
on which to send the
Ping
message. Then we give the receive port of the channel to the
pingServer
process when we spawn it. The code shows how to use spawn
to apply a function (here pingServer
) to an argument (here
r1
): use mkClosure
instead of mkStaticClosure
, and then pass the
argument to it (we’ll come back to this later; the details aren’t
important right now).
But there’s a big problem here. ReceivePorts
are not Serializable
,
which prevents us passing the ReceivePort
r1
to the spawned
process. GHC will reject the program with a type error.
Why are ReceivePorts
not Serializable
? If you think about it a
bit, this makes a lot of sense. If a process were allowed to send a
ReceivePort
somewhere else, the implementation would have to
deal with two things: routing messages to the correct destination when
a ReceivePort
has been forwarded (possibly multiple times), and
routing messages to multiple destinations, because sending a
ReceivePort
would create a new copy. This would introduce a vast
amount of complexity to the implementation, and it is not at all clear
that it is a good feature to allow. So the remote
framework
explicitly disallows it, which fortunately can be done using Haskell’s
type system.
This means that we have to jump through an extra hoop to fix the
previous code, though. Instead of passing the ReceivePort
to the
spawned process, the spawned process must create the channel and send
us back the SendPort
. This means we need another channel so
that the spawned process can send us back its SendPort
.
do
(
s
,
r
)
<-
newChan
-- throw-away channel
spawn
nid
(
$
(
mkClosure
`
pingServer
)
s
)
ping
<-
receiveChan
r
(
sendpong
,
recvpong
)
<-
newChan
sendChan
ping
(
Ping
sendpong
)
receiveChan
recvpong
Since this extra handshake is a bit of a hassle, you might well prefer
to send messages directly to the spawned process using send
rather
than using typed channels, which is exactly what the example code at the
beginning of this section did.
In the previous section, we waited for a response from each child process
in turn, whereas the old waitForPongs
version processed the messages
in the order they arrived. In this case it isn’t a problem, but
suppose some of these messages required a response. Then we might
have introduced some extra latency: if a process toward the end of
the list replies early, it won’t get a response until the
master process has dealt with the messages from the other processes
earlier in the list, some of which might take a while to reply.
So we need a way to wait for messages from multiple channels
simultaneously. The distributed-process
framework has an elegant
way to do this. Channels can be merged together to make a single
channel that receives messages from any of the original channels.
There are two ways to do this:
mergePortsBiased
::
Serializable
a
=>
[
ReceivePort
a
]
->
Process
(
ReceivePort
a
)
mergePortsRR
::
Serializable
a
=>
[
ReceivePort
a
]
->
Process
(
ReceivePort
a
)
The difference is in the order in which messages arrive on the merged
channel. In mergePortsBiased
, each receive searches the
ports in left-to-right order for a message, returning the first
message it finds. The alternative is mergePortsRR
(the RR stands
for “round robin”) which also searches left to right, but rotates the list by one element after each receive, with the leftmost port moving to the end of the list.
One important thing to note is that merging channels does not affect the original channel; we can still receive messages from either source, and indeed there is no problem with merging multiple overlapping sets of channels.[61]
Here is the ping example with channels, where instead of waiting for the responses one by one, we merge the channels together and wait for all the responses simultaneously.
master
::
[
NodeId
]
->
Process
()
master
peers
=
do
ps
<-
forM
peers
$
\
nid
->
do
say
$
printf
"spawning on %s"
(
show
nid
)
spawn
nid
$
(
mkStaticClosure
'pingServer
)
ports
<-
forM
ps
$
\
pid
->
do
say
$
printf
"pinging %s"
(
show
pid
)
(
sendport
,
recvport
)
<-
newChan
send
pid
(
Ping
sendport
)
return
recvport
oneport
<-
mergePortsBiased
ports
--waitForPongs
oneport
ps
--say
"All pongs successfully received"
terminate
waitForPongs
::
ReceivePort
ProcessId
->
[
ProcessId
]
->
Process
()
waitForPongs
_
[]
=
return
()
waitForPongs
port
ps
=
do
pid
<-
receiveChan
port
waitForPongs
port
(
filter
(
/=
pid
)
ps
)
One of the important benefits provided by the distributed-process
framework is handling and recovering
from failure. Failure is a fact of life in distributed computing, and we should be prepared for the possibility that any of our processes might fail at any time, whether due to network outage, a hardware crash, or software faults.
Here is a basic example showing how the failure of one process can be
caught and acted upon by another process. In the original ping
example from Defining a Message Type, recall that the Message
type has two constructors:
data
Message
=
Ping
ProcessId
|
Pong
ProcessId
and the code for pingServer
matches explicitly on the
Ping
constructor:
pingServer
::
Process
()
pingServer
=
do
Ping
from
<-
expect
say
$
printf
"ping received from %s"
(
show
from
)
mypid
<-
getSelfPid
send
from
(
Pong
mypid
)
What will happen if the message is a Pong
, rather than a
Ping
? Both messages have the type Message
, so expect
cannot
distinguish them; if the context requires a message of type Message
, expect
can return either a Ping
or a Pong
. Clearly, if
expect
returns a Pong
here, then the pattern match against Ping
will fail,
and as usual in Haskell this throws an
exception. Since there are no exception handlers, the
exception will result in the termination of the pingServer
process.
There are ways to prevent the error, of course, but for now let’s see
how we can catch this failure from another process. We’ll use
withMonitor
, which has the following signature:
withMonitor
::
ProcessId
->
Process
a
->
Process
a
withMonitor
takes a ProcessId
to monitor and an action to perform. During the action, if the specified process fails in any way, a special message of type ProcessMonitorNotification
is sent to the current process.
To wait for either the ProcessMonitorNotification
message or a Pong
, we need to know how to wait for different types of message at the same time. The basic pattern for this is as follows:
receiveWait
[
match
$
\
p
->
do
...
,
match
$
\
q
->
do
...
]
where p
and q
are patterns that match different types of
message. The types of these functions are shown here:
receiveWait
::
[
Match
b
]
->
Process
b
receiveTimeout
::
Int
->
[
Match
b
]
->
Process
(
Maybe
b
)
match
::
Serializable
a
=>
(
a
->
Process
b
)
->
Match
b
matchIf
::
Serializable
a
=>
(
a
->
Bool
)
->
(
a
->
Process
b
)
->
Match
b
The function receiveWait
waits until any of the match
functions
applies to a message in the queue, and then executes the associated
action. The receiveTimeout
operation is similar, but instead of
waiting indefinitely for a matching message, it takes a time in
milliseconds and returns Nothing
if a matching message did not
arrive before the time.
Here is how we monitor the pingServer
process and then wait for
either a Pong
message or a ProcessMonitorNotification
:
withMonitor
pid
$
do
send
pid
(
Pong
mypid
)
--
receiveWait
[
match
$
\
(
Pong
_
)
->
do
say
"pong."
terminate
,
match
$
\
(
ProcessMonitorNotification
_ref
deadpid
reason
)
->
do
say
(
printf
"process %s died: %s"
(
show
deadpid
)
(
show
reason
))
terminate
]
Note that we deliberately send the child a Pong
message () to cause it to fail. Running the program
results in this:
pid://localhost:44444:0:3: spawning on nid://localhost:44444:0 pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4 pid://localhost:44444:0:3: process pid://localhost:44444:0:4 died: DiedException "user error (Pattern match failure in do expression at distrib-ping/ping-fail.hs:24:3-11)"
The third log message indicates that the master received the notification of the failed process, and gives the details of the failure: a pattern-match error, as we expected.
It is worth asking whether having a single Message
data type for our
messages was a good idea in the first place. Perhaps we should have
made separate types, as in:
newtype
Pong
=
Pong
ProcessId
newtype
Ping
=
Ping
ProcessId
The choice comes down to whether we are using typed channels or not.
With typed channels, we could use only a single message type, whereas
using the per-process dynamically typed channel with send
and
expect
or receiveWait
, we could use multiple message types. Having
one type for each message would avoid the possibility of a pattern-match
failure when matching on a message, but unless we also have a
catch-all case to match unrecognized messages, the other messages
could be left in the queue forever, which could amount to an
undetected error or deadlock. So there might well be cases where we
want to match both messages because one is definitely an
error, and so using a single message type would help ensure that we
always match on all the possible messages.
The more appropriate choice depends on the particular circumstances in your application.
A summary of the API for process monitoring follows:
monitor
::
ProcessId
->
Process
MonitorRef
unmonitor
::
MonitorRef
->
Process
()
withMonitor
::
ProcessId
->
Process
a
->
Process
a
data
ProcessMonitorNotification
=
ProcessMonitorNotification
MonitorRef
ProcessId
DiedReason
data
MonitorRef
-- abstract
data
DiedReason
=
DiedNormal
-- Normal termination
|
DiedException
!
String
-- The process exited with an exception
|
DiedDisconnect
-- We got disconnected from the process node
|
DiedNodeDown
-- The process node died
|
DiedUnknownId
-- Invalid (process/node/channel) identifier
In addition to the withMonitor
function mentioned earlier, a process
can also be monitored by calling the monitor
function. This
function returns a token of type MonitorRef
, which can be passed to
unmonitor
to stop monitoring the process again. In general, it is
better to use withMonitor
than the monitor
and unmonitor
pair if
possible, because withMonitor
will automatically stop monitoring the
remote process in the event of an exception. However, sometimes
withMonitor
doesn’t fit the control flow, which is when monitor
and
unmonitor
are useful.
In a distributed system, parts of the running program may fail at any time due to circumstances beyond our control. Such a failure typically results in one or more of the processes in our network becoming disconnected without warning; there is no exception and no opportunity to clean up whatever it was doing. Perhaps the hardware it was running on failed, or the network on which we were communicating with it stopped working.
A far-reaching approach for such failures can be seen in Erlang, a programming language with distributed programming at its heart. The only mechanism for communication is message passing, so every concurrent Erlang program is fundamentally distributable. The Erlang designers promote a particular philosophy for dealing with failure, often known by its catchphrase: “Let it crash.” The basic principle is that since in a distributed system we must already be prepared for a process to simply disappear, we might as well deal with all kinds of failure in this way because doing so makes failure handling much simpler. And since failure handling is difficult to test, making it simpler is highly desirable.
Concretely, instead of trying to enumerate local failure conditions and handle them in some way, we can just let them propagate to the top of the process and let the process die. The distributed program must be prepared for this eventuality already (since this is a distributed system), so the system will recover in some way: perhaps by restarting the failed process in some known-good state and logging the failure somewhere.
Thus the granularity at which we have to consider failure is the process, and we can design our applications such that individual processes can fail without catastrophic consequences. A process will probably have some internal state that is lost when it dies, but the parent should know how to construct the initial state to restart the process or to propagate the failure to a higher layer that can.
In A Chat Server, we built a multithreaded chat server using Concurrent Haskell and STM. In this section, we will extend the chat server to be distributed. The server will be running across multiple machines, clients may connect to any of the machines, and any client will be able to chat with any other client connected via any of the servers. Essentially, the distributed chat server will behave just like the single-threaded server (minus some subtle differences that we will discuss shortly), except that clients have a choice of machines to connect to.
A distributed chat network saves bandwidth. For example, suppose we set up a chat network with two servers A and B on each side of the Atlantic Ocean. Each server has a large number of clients connected, with each client connecting to its closest server. When a client on server A broadcasts a message, it needs to be sent across the trans-Atlantic link to server B only once, and server B then forwards it to each of its connected clients. The broadcast message crosses the Atlantic only once, instead of once for each of the clients on the other side.
We have already written all the code for the multithreaded server, so
it seems a shame to throw it away and rewrite it all to use
distributed-process
instead. Fortunately, we don’t have to do that. We can simply add some extra code to handle distribution, using the original server code nearly intact. Each client will still be managed
by ordinary IO threads synchronized using STM, but additionally we
will have some code communicating with the other servers using
distributed-process
. In Haskell, distributed programming is not
all or nothing. We can freely mix distributed and concurrent
programming in the same program. This means we can take advantage of
the simplicity and performance of ordinary concurrent programming on
each node, while using the heavier-weight distributed interfaces for
the parts of the program that need to work across multiple nodes.
In this first version, we will use a master/slave configuration in which the master will start up server instances on all the slaves once at the beginning. Later, we will consider how to modify the program so that all nodes are equal, and nodes may come and go at arbitrary times.
We will need a few changes to the data structures compared with the multithreaded server. When one client sends a message to another client connected to a different server, we need to know where to send the message. So each server will need to keep a list of all the clients connected to any server in the network, along with the server to which the client is connected. The information about a client now has two possibilities: either it is a local client (connected to this server), or a remote client (connected to a different server).
type
ClientName
=
String
data
Client
=
ClientLocal
LocalClient
|
ClientRemote
RemoteClient
data
RemoteClient
=
RemoteClient
{
remoteName
::
ClientName
,
clientHome
::
ProcessId
}
data
LocalClient
=
LocalClient
{
localName
::
ClientName
,
clientHandle
::
Handle
,
clientKicked
::
TVar
(
Maybe
String
)
,
clientSendChan
::
TChan
Message
}
clientName
::
Client
->
ClientName
clientName
(
ClientLocal
c
)
=
localName
c
clientName
(
ClientRemote
c
)
=
remoteName
c
newLocalClient
::
ClientName
->
Handle
->
STM
LocalClient
newLocalClient
name
handle
=
do
c
<-
newTChan
k
<-
newTVar
Nothing
return
LocalClient
{
localName
=
name
,
clientHandle
=
handle
,
clientSendChan
=
c
,
clientKicked
=
k
}
LocalClient
is what we previously called Client
, and
RemoteClient
is a client connected to another server. The Client
type is now a disjunction of these two, with constructors
ClientLocal
and ClientRemote
.
The Message
type is as before, except that we need to derive
Typeable
and Binary
, because Message
s will be sent over the
network:
data
Message
=
Notice
String
|
Tell
ClientName
String
|
Broadcast
ClientName
String
|
Command
String
deriving
(
Typeable
,
Generic
)
instance
Binary
Message
Servers need to communicate with one another, and the kinds of messages
they need to send are richer than Message
. For example, servers need
to tell one another when a new client connects, or one client kicks
another. So we have a new type for messages sent between servers,
which we call PMessage
:
data
PMessage
=
MsgServers
[
ProcessId
]
|
MsgSend
ClientName
Message
|
MsgBroadcast
Message
|
MsgKick
ClientName
ClientName
|
MsgNewClient
ClientName
ProcessId
|
MsgClientDisconnected
ClientName
ProcessId
deriving
(
Typeable
,
Generic
)
instance
Binary
PMessage
Most of these are self-explanatory, except for one: MsgServers
is a
special message sent to each server node when it starts up, telling it
the ProcessId
s of all the server nodes in the network.
The Server
type previously contained only the mapping from
ClientName
to Client
, but now it needs some more information:
data
Server
=
Server
{
clients
::
TVar
(
Map
ClientName
Client
)
,
proxychan
::
TChan
(
Process
()
)
,
servers
::
TVar
[
ProcessId
]
,
spid
::
ProcessId
}
newServer
::
[
ProcessId
]
->
Process
Server
newServer
pids
=
do
pid
<-
getSelfPid
liftIO
$
do
s
<-
newTVarIO
pids
c
<-
newTVarIO
Map
.
empty
o
<-
newTChanIO
return
Server
{
clients
=
c
,
servers
=
s
,
proxychan
=
o
,
spid
=
pid
}
clients
is the client mapping, as before; servers
is the list of
other server ProcessId
s, and spid
is the ProcessId
of this
server (for convenience).
The proxychan
field pertains to an added bit of complexity in our
distributed architecture. Remember that we are
leaving as much of the existing server infrastructure intact as
possible; that means the existing server threads are ordinary
forkIO
threads. A forkIO
thread cannot perform operations in the
Process
monad, yet we certainly need to be able to do that somehow
because certain actions by a client must trigger communication with
other servers in the network. So the trick we use is a proxy, which is a
process that reads actions from a TChan
and performs them in the
Process
monad. To have a Process
action performed from an IO
thread, we simply queue it on the proxy TChan
. Each server has a
single proxy channel, created when the server starts up and stored in
the proxychan
field of Server
.
Next, we need a few small utilities. First, a way to send a Message
to a LocalClient
:
sendLocal
::
LocalClient
->
Message
->
STM
()
sendLocal
LocalClient
{
..
}
msg
=
writeTChan
clientSendChan
msg
The following function, sendRemote
, sends a PMessage
to a remote
server. To do this, it needs to use the proxychan
(which it gets
from the Server
) and it needs the pid
of the destination process:
sendRemote
::
Server
->
ProcessId
->
PMessage
->
STM
()
sendRemote
Server
{
..
}
pid
pmsg
=
writeTChan
proxychan
(
send
pid
pmsg
)
Now that we can send both local and remote messages, we can define
sendMessage
, which sends a Message
to any client:
sendMessage
::
Server
->
Client
->
Message
->
STM
()
sendMessage
server
(
ClientLocal
client
)
msg
=
sendLocal
client
msg
sendMessage
server
(
ClientRemote
client
)
msg
=
sendRemote
server
(
clientHome
client
)
(
MsgSend
(
remoteName
client
)
msg
)
A variant sends a message to a named client or returns False
if the
client is not connected:
sendToName
::
Server
->
ClientName
->
Message
->
STM
Bool
sendToName
server
@
Server
{
..
}
name
msg
=
do
clientmap
<-
readTVar
clients
case
Map
.
lookup
name
clientmap
of
Nothing
->
return
False
Just
client
->
sendMessage
server
client
msg
>>
return
True
Next, we consider broadcasting messages. First, we need a way to send
a PMessage
to all the connected servers:
sendRemoteAll
::
Server
->
PMessage
->
STM
()
sendRemoteAll
server
@
Server
{
..
}
pmsg
=
do
pids
<-
readTVar
servers
mapM_
(
\
pid
->
sendRemote
server
pid
pmsg
)
pids
We also need a broadcastLocal
function that sends a message to the local
clients only:
broadcastLocal
::
Server
->
Message
->
STM
()
broadcastLocal
server
@
Server
{
..
}
msg
=
do
clientmap
<-
readTVar
clients
mapM_
sendIfLocal
(
Map
.
elems
clientmap
)
where
sendIfLocal
(
ClientLocal
c
)
=
sendLocal
c
msg
sendIfLocal
(
ClientRemote
_
)
=
return
()
This function works by calling an auxiliary function sendIfLocal
on
each of the clients, which calls sendLocal
if the client is local
and does nothing if the client is remote.
Putting sendRemoteAll
and broadcastLocal
together, we can
broadcast a Message
to everyone:
broadcast
::
Server
->
Message
->
STM
()
broadcast
server
@
Server
{
..
}
msg
=
do
sendRemoteAll
server
(
MsgBroadcast
msg
)
broadcastLocal
server
msg
The rest of the local server code is almost identical to that in
A Chat Server, so we don’t reproduce it here. The only important
differences are that we need to inform other servers whenever a client
connects or disconnects by calling sendRemoteAll
with a
MsgNewClient
or MsgClientDisconnected
respectively.
The interesting part is how we handle distribution. Previously, the
main
function was responsible for setting up the network socket and
accepting new connections. This is now delegated to a function
socketListener
, which is otherwise identical to the previous main
:
socketListener
::
Server
->
Int
->
IO
()
socketListener
server
port
=
withSocketsDo
$
do
sock
<-
listenOn
(
PortNumber
(
fromIntegral
port
))
printf
"Listening on port %d
\n
"
port
forever
$
do
(
handle
,
host
,
port
)
<-
accept
sock
printf
"Accepted connection from %s: %s
\n
"
host
(
show
port
)
forkFinally
(
talk
server
handle
)
(
\
_
->
hClose
handle
)
We need a function to implement the proxy, described above in
Sending Messages. All it does is repeatedly read Process
()
values from the proxychan
and execute them:
proxy
::
Server
->
Process
()
proxy
Server
{
..
}
=
forever
$
join
$
liftIO
$
atomically
$
readTChan
proxychan
Now, the chatServer
function is the main Process ()
action that
implements a chat server:
chatServer
::
Int
->
Process
()
chatServer
port
=
do
server
<-
newServer
[]
liftIO
$
forkIO
(
socketListener
server
port
)
--
spawnLocal
(
proxy
server
)
--
forever
$
do
m
<-
expect
;
handleRemoteMessage
server
m
--
Starts up the
socketListener
thread.Creates the
proxy
. Note here that we usespawnLocal
, which is likespawn
except that the new process is always created on the current node. This means that the computation to be spawned doesn’t need to be serialized, sospawnLocal
takes an ordinaryProcess
value rather than aClosure
, which makes it easier to use.Repeatedly grabs the next message and calls
handleRemoteMessage
(defined next) to act on it.
handleRemoteMessage
::
Server
->
PMessage
->
Process
()
handleRemoteMessage
server
@
Server
{
..
}
m
=
liftIO
$
atomically
$
case
m
of
MsgServers
pids
->
writeTVar
servers
(
filter
(
/=
spid
)
pids
)
--
MsgSend
name
msg
->
void
$
sendToName
server
name
msg
--
MsgBroadcast
msg
->
broadcastLocal
server
msg
--
MsgKick
who
by
->
kick
server
who
by
--
MsgNewClient
name
pid
->
do
--
ok
<-
checkAddClient
server
(
ClientRemote
(
RemoteClient
name
pid
))
when
(
not
ok
)
$
sendRemote
server
pid
(
MsgKick
name
"SYSTEM"
)
MsgClientDisconnected
name
pid
->
do
--
clientmap
<-
readTVar
clients
case
Map
.
lookup
name
clientmap
of
Nothing
->
return
()
Just
(
ClientRemote
(
RemoteClient
_
pid'
))
|
pid
==
pid'
->
deleteClient
server
name
Just
_
->
return
()
The special
MsgServers
message is sent once at startup to tell each server theProcessId
s of all the servers in the network. This is used to set theservers
field ofServer
.MsgSend
,MsgBroadcast
, andMsgKick
are straightforward. They cause the appropriate action to take place just as if a local client had initiated it.MsgNewClient
indicates that a client has connected to a remote server. We attempt to add the remote client to the local state, but it may be that this server already has a client with the same name. Unlike in the single server case where we relied on STM to ensure that inconsistencies like this could never arise, in a distributed system there is no global consistency. So we have to handle the case where two clients connect at the same time on different servers. The method we choose here is simple but brutal: reply with aMsgKick
to kick the other client. It is likely that the remote server will simultaneously do the same, so both clients will end up being kicked, but at least the inconsistency is resolved, and this case will be rare in practice.MsgClientDisconnected
is not difficult, but we do have to be careful to check that the client being disconnected is in fact the correct client, just in case an inconsistency has arisen (in particular, this might be the response to theMsgKick
initiated by theMsgNewClient
case just shown).
Now that the server code is in place, we just need to write the
code to start up the whole distributed network. The main
function
invokes master
on the master node:
port
::
Int
port
=
44444
master
::
[
NodeId
]
->
Process
()
master
peers
=
do
let
run
nid
port
=
do
say
$
printf
"spawning on %s"
(
show
nid
)
spawn
nid
(
$
(
mkClosure
'chatServer
)
port
)
pids
<-
zipWithM run peers
[
port+1..
]
mypid
<-
getSelfPid
let all_pids
=
mypid
:
pids
mapM_
(
\
pid
<-
send pid
(
MsgServers
))
all_pids
chatServer port
main
=
distribMain master
Main
.
__remoteTable
The master
function is fairly straightforward. It spawns
chatServer
on each of the slaves, using increasing port numbers, and then sends a MsgServers
message to each server process containing a list of all the server ProcessId
s.[62]
We can start up a few nodes on a single machine like so:
$ ./chat slave 55551 & ./chat slave 55552 & ./chat master 55553 pid://localhost:55553:0:3: spawning on nid://localhost:55552:0 pid://localhost:55553:0:3: spawning on nid://localhost:55551:0 Listening on port 44444 Listening on port 44445 Listening on port 44446
(Remember the port numbers given on the command line are the ports
used by the distributed-process
framework; the ports that the chat
server listens to are hardcoded to 44444, 44445, …)
Then connect to one of the nodes:
$ nc localhost 44445 What is your name? Fred *** Fred has connected
And connect to a different node:
$ nc localhost 44446 What is your name? Bob *** Bob has connected hi <Bob>: hi
We should now see the new activity on the first connection:
*** Bob has connected <Bob>: hi
Our distributed server works only with a fixed set of nodes, which makes it quite limited. In practice, we want to be able to add and remove nodes from the network at will. Nodes will disconnect due to network and hardware outages, and we would like to be able to add new nodes without restarting the entire network.
My sketch implementation can be found in distrib-chat/chat-noslave.hs, but you might want to try implementing this for yourself. Some hints on how to go about it follow.
We need to abandon the master/slave architecture; every node will be
equal. Instead of using our DistribUtils
module, we can use the
following sequence to initialize the simplelocalnet
backend and
start up a node:
main
=
do
[
port
,
chat_port
]
<-
getArgs
backend
<-
initializeBackend
"localhost"
port
(
Main
.
__remoteTable
initRemoteTable
)
node
<-
newLocalNode
backend
Node
.
runProcess
node
(
master
backend
chat_port
)
Now the function master
has type Backend -> String -> Process ()
and runs on every node. The outline of the rest of the
implementation is as follows:
When a node starts up, it calls
findPeers
to get the other nodes in the network.findPeers
::
Backend
->
Int
{- timeout -}
->
IO
[
NodeId
]
It registers the current process as
"chatServer"
on the local node using theregister
function:register
::
String
->
ProcessId
->
Process
()
Next we call
whereisRemoteAsync
for each of the other nodes, asking for theProcessId
of"chatServer"
.whereisRemoteAsync
::
NodeId
->
String
->
Process
()
The remote node will respond with a
WhereIsReply
:data
WhereIsReply
=
WhereIsReply
String
(
Maybe
ProcessId
)
We won’t wait for the reply immediately; it will be received along with other messages in the main message loop.
-
Then we start up the
chatServer
as before, but now we need to also handleWhereIsReply
messages. When one of these messages is received, if it indicates that we found a"chatServer"
process on another node, then we move on to the next step. -
Send that
ProcessId
a message to tell it that we have joined the network. This is a newPMessage
that we callMsgServerInfo
. It contains the currentProcessId
and the list of local clients we have (because clients may have already connected by now). -
On receipt of a
MsgServerInfo
, add thatProcessId
to theservers
list if it isn’t already there. - Add the information about the remote clients to the state. There may need to be some conflict resolution at this point if the remote server has clients with the same names as clients that we already know about.
-
If the new server is not already known to us, then we should respond
with a
MsgServerInfo
of our own to tell the other server which local clients are on this server. - Start monitoring the remote process. Then we can be informed when the remote process dies and remove its clients from our local state.
A key-value store is a simple database that supports only operations to store and retrieve values associated with keys. Key-value stores have become popular over recent years because they offer scalability advantages over traditional relational databases in exchange for supporting fewer operations (in particular, they lack database joins).
This exercise is to use the distributed-process
framework to
implement a distributed fault-tolerant key-value store (albeit a
very simplistic one).
The interface exposed to clients is the following:
type
Database
type
Key
=
String
type
Value
=
String
createDB
::
Process
Database
set
::
Database
->
Key
->
Value
->
Process
()
get
::
Database
->
Key
->
Process
(
Maybe
Value
)
Here, createDB
creates a database, and set
and get
perform
operations on it. The set
operation sets the given key to the given
value, and get
returns the current value associated with the given
key or Nothing
if the key has no entry.
Part 1. In distrib-db/db.hs, I supplied a sample main
function that
acts as a client for the database, and you can use this to test your
database. The skeleton for the database code itself is in
Database.hs in the same directory. The first exercise is to
implement a single-node database by modifying Database.hs. That is:
-
createDB
should spawn a process to act as the database. It can spawn on the current node. -
get
andset
should talk to the database process via messages; you need to define the message type and the operations.
When you run db.hs, it will call createDB
to create a database
and then populate it using the Database.hs source file itself. Every
word in the file is a key that maps to the word after it. The client
will then look up a couple of keys and then go into an interactive
mode where you can type in keys that are looked up in the database.
Try it out with your database implementation and satisfy yourself
that it is working.
Part 2. The second stage is to make the database distributed. In practice, the reason for doing this is to store a database much larger than we can store on a single machine and still have fast access to all of it.
The basic plan is that we are going to divide up the key space uniformly and store each portion of the key space on a separate node. The exact method used for splitting up the key space is important in practice because if you get it wrong, then the load might not be well-balanced between the nodes. For the purposes of this exercise, though, a simple scheme will do: take the first character of the key modulo the number of workers.
There will still be a single process handling requests from clients,
so we still have type Database = ProcessId
. However, this process
needs to delegate requests to the correct worker process according to
the key:
-
Arrange to start worker processes on each of the nodes. The list of
nodes in the network is passed to
createDB
. -
Write the code for the worker process. You probably need to put it
in a different module (e.g., called
Worker
) due to restrictions imposed by Template Haskell. The worker process needs to maintain its ownMap
and handleget
andset
requests. - Make the main database process delegate operations to the correct worker. You should be able to make the worker reply directly to the original client rather than having to forward the response from the worker back to the client.
Compile db.hs against your distributed database to make sure it still works.
Part 3. Make the main database process monitor all the worker processes.
Detect failure of a worker and emit a message using say
. You will
need to use receiveWait
to wait for multiple types of messages; see
the ping-fail.hs example for hints.
Note that we can’t yet do anything sensible if a worker dies. That is the next part of the exercise.
Part 4. Implement fault tolerance by replicating the database across multiple nodes.
- Instead of dividing the key space evenly across workers, put the workers in pairs and give each pair a slice of the key space. Both workers in the pair will have exactly the same data.
-
Forward requests to both workers in the pair (it doesn’t matter that
there will be two responses in the case of a
get
). - If a worker dies, you will need to remove the worker from your internal list of workers so that you don’t try to send it messages in the future.[63]
This should result in a distributed key-value store that is robust to individual nodes going down, as long as we don’t kill too many nodes too close together. Try it out—kill a node while the database is running and check that you can still look up keys.
A sample solution can be found in distrib-db/DatabaseSample.hs and distrib-db/WorkerSample.hs.
[50] Also known as “Cloud Haskell.”
[51] This is also known as the actor model.
[52] The distributed-process
package is in fact the second implementation of these ideas, the first prototype being the remote
package.
[53] For example, meta-par
and
HdpH
.
[54] As of binary
version 0.6.3.0.
[55] As of GHC version 7.2.1.
[56] We expect that in the future, GHC will provide syntactic sugar to make remote code execution easier.
[57] Template Haskell is a feature provided by GHC that allows Haskell code to be manipulated and generated at compile time. For more details, see the GHC User’s Guide.
[58] The log messages produced by say
are normally prefixed by a timestamp, but I have omitted the timestamps here for clarity.
[59] The default
port is chosen by our distribMain
wrapper, not the
distributed-process
framework.
[60] Indeed, some of Erlang’s libraries use exactly this technique.
[61] The current implementation of channels uses STM, and channels are merged using orElse
.
[62] This is mainly so that we can test the server on a single machine; in practice, you would want to choose the port number via a command-line option or some other method.
[63] A real fault-tolerant database would restart the worker on a new node and copy the database slice from its partner. The solution provided in this book doesn’t do this, but by all means have a go at doing it.
Get Parallel and Concurrent Programming in Haskell now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.