Chapter 4. Cloud Native Patterns
Progress is possible only if we train ourselves to think about programs without thinking of them as pieces of executable code.1
Edsger W. Dijkstra, August 1979
In 1991, while still at Sun Microsystems, L Peter Deutsch2 formulated the “fallacies of distributed computing,” which lists some of the false assumptions that programmers new (and not so new) to distributed applications often make:
-
The network is reliable: switches fail, routers get misconfigured
-
Latency is zero: it takes time to move data across a network
-
Bandwidth is infinite: a network can handle only so much data at a time
-
The network is secure: don’t share secrets in plain text; encrypt everything
-
Topology doesn’t change: servers and services come and go
-
There is one administrator: multiple admins lead to heterogeneous solutions
-
Transport cost is zero: moving data around costs time and money
-
The network is homogeneous: every network is (sometimes very) different
If I might be so audacious, I’d like to add a ninth one:
-
Services are reliable: services that you depend on can fail at any time
In this chapter, I’ll present a selection of idiomatic patterns—tested, proven development paradigms—designed to address one or more of the conditions described in Deutsch’s fallacies and demonstrate how to implement them in Go. None of the patterns discussed in this book are original to this book—some have been around for as long as distributed applications have existed—but most haven’t been previously published together in a single work. Many of them are unique to Go or have novel implementations in Go relative to other languages.
Unfortunately, this book won’t cover infrastructure-level patterns like the Bulkhead or Gatekeeper patterns. Largely, this is because our focus is on application-layer development in Go, and those patterns, while indispensable, function at an entirely different abstraction level. If you’re interested in learning more, I recommend Cloud Native Infrastructure by Justin Garrison and Kris Nova (O’Reilly) and Designing Distributed Systems by Brendan Burns (O’Reilly).
The Context Package
Most of the code examples in this chapter make use of the context
package, which was introduced in Go 1.7 to provide an idiomatic means of carrying deadlines, cancellation signals, and request-scoped values between processes. It contains a single interface, context.Context
, whose methods are listed in the following:
type
Context
interface
{
// Deadline returns the time when this Context should be canceled; it
// returns ok==false if no deadline is set.
Deadline
()
(
deadline
time
.
Time
,
ok
bool
)
// Done returns a channel that's closed when this Context is canceled.
Done
()
<-
chan
struct
{}
// Err indicates why this context was canceled after the Done channel is
// closed. If Done is not yet closed, Err returns nil.
Err
()
error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Use with care.
Value
(
key
any
)
any
}
Three of these methods can be used to learn something about a Context
value’s cancellation status or behavior. The fourth, Value
, can be used to retrieve a value associated with an arbitrary key. Context
’s Value
method is the focus of some controversy in the Go world and will be discussed more in “Defining Request-Scoped Values”.
What Context Can Do for You
A context.Context
value is used by passing it directly to a service request, which may in turn pass it to one or more subrequests. What makes this useful is that when a Context
is canceled, all functions holding it (or a derived Context
; more on this in “Defining Context Deadlines and Timeouts”) will receive the signal, allowing them to coordinate their cancellation and reduce wasted effort.
Take, for example, a request from a user to a service, which in turn makes a request to a database. In an ideal scenario, the user, application, and database requests can be diagrammed as in Figure 4-1.
But what if the user terminates their request before it’s fully completed? In most cases, oblivious to the overall context of the request, the processes will continue to live on anyway (Figure 4-2), consuming resources in order to provide a result that’ll never be used.
However, by sharing a Context
to each subsequent request, all long-running processes can be sent a simultaneous “done” signal, allowing the cancellation signal to be coordinated among each of the processes (Figure 4-3).
Importantly, Context
values are also thread safe: they can be safely used by multiple concurrently executing goroutines without fear of unexpected behaviors.
Creating Context
A brand-new context.Context
can be obtained using one of two functions:
Background() Context
-
Returns an empty
Context
that’s never canceled, has no values, and has no deadline. It’s typically used by the main function, initialization, and tests and as the top-levelContext
for incoming requests. TODO() Context
-
Also provides an empty
Context
, but it’s intended to be used as a placeholder when it’s unclear whichContext
to use or when a parentContext
is not yet available.
Defining Context Deadlines and Timeouts
The context
package also includes a number of methods for creating derived Context
values that allow you to direct cancellation behavior, either by applying a timeout or by a function hook that can explicitly trigger a cancellation:
WithDeadline(Context, time.Time) (Context, CancelFunc)
-
Accepts a specific time at which the
Context
will be canceled and theDone
channel will be closed. WithTimeout(Context, time.Duration) (Context, CancelFunc)
-
Accepts a duration after which the
Context
will be canceled and theDone
channel will be closed. WithCancel(Context) (Context, CancelFunc)
-
Unlike the previous functions,
WithCancel
accepts nothing additional and only returns a function that can be called to explicitly cancel theContext
.
All three of these functions return a derived Context
that includes any requested decoration and a context.CancelFunc
, a zero-parameter function that can be called to explicitly cancel the Context
and all of its derived values.
Tip
When a Context
is canceled, all Context
s that are derived from it are also canceled. Context
s that it was derived from are not.
For the sake of completion, it’s worth mentioning that there are also three recently introduced functions that parallel the three just mentioned but that also allow you to specify a specific error
value as the cancellation cause:
WithDeadlineCause(Context, time.Time, error) (Context, CancelFunc)
-
Introduced in Go 1.21. Behaves like
WithDeadline
but also sets the cause of the returnedContext
when the deadline is exceeded. The returnedCancelFunc
does not set the cause. WithTimeoutCause(Context, time.Duration, error) (Context, CancelFunc)
-
Introduced in Go 1.21. Behaves like
WithTimeout
but also sets the cause of the returnedContext
when the timeout expires. The returnedCancelFunc
does not set the cause. WithCancelCause(Context) (Context, CancelCauseFunc)
-
Introduced in Go 1.20. Behaves like
WithCancel
but returns aCancelCauseFunc
instead of aCancelFunc
. Callingcancel
with a non-nilerror
(the “cause”) records thaterror
inctx
; it can then be retrieved usingCause(ctx)
.
The ability to explicitly define a cause can provide useful context3 for logging or otherwise deciding on an appropriate response.
Defining Request-Scoped Values
Finally, the context
package includes a function that can be used to define an arbitrary request-scoped key-value pair that can be accessed from the returned Context
—and all Context
values derived from it—via the Value
method:
WithValue(parent Context, key, val any) Context
-
WithValue
returns a derivation ofparent
in whichkey
is associated with the valueval
.
Using a Context
When a service request is initiated, either by an incoming request or triggered by the main
function, the top-level process will use the Background
function to create a new Context
value, possibly decorating it with one or more of the context.With*
functions, before passing it along to any subrequests. Those subrequests then need only watch the Done
channel for cancellation signals.
For example, take a look at the following Stream
function:
func
Stream
(
ctx
context
.
Context
,
out
chan
<-
Value
)
error
{
// Create a derived Context with a 10s timeout; dctx
// will be canceled upon timeout, but ctx will not.
// cancel is a function that will explicitly cancel dctx.
dctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
10
*
time
.
Second
)
// Release resources if SlowOperation completes before timeout
defer
cancel
()
res
,
err
:=
SlowOperation
(
dctx
)
// res is a Value channel
if
err
!=
nil
{
// True if dctx times out
return
err
}
for
{
select
{
case
out
<-
<-
res
:
// Read from res; send to out
case
<-
ctx
.
Done
():
// Triggered if ctx is canceled
return
ctx
.
Err
()
// but not if dctx is canceled
}
}
}
Stream
receives a ctx Context
as an input parameter, which it sends to WithTimeout
to create dctx
, a derived Context
with a 10-second timeout. Because of this decoration, the SlowOperation(dctx)
call could possibly time out after 10 seconds and return an error. Functions using the original ctx
, however, will not have this timeout decoration and will not time out.
Further down, the original ctx
value is used in a for
loop around a select
statement to retrieve values from the res
channel provided by the SlowOperation
function. Note the case <-ctx.Done()
statement, which is executed when the ctx.Done
channel closes to return an appropriate error value.
Layout of This Chapter
The general presentation of each pattern in this chapter is loosely based on the one used in the famous “Gang of Four” Design Patterns book4 but is simpler and less formal. Each pattern opens with a very brief description of its purpose and the reasons for using it and is followed by the following sections:
- Applicability
-
Context and descriptions of where this pattern may be applied.
- Participants
-
A listing of the components of the pattern and their roles.
- Implementation
-
A discussion of the solution and its implementation.
- Sample code
-
A demonstration of how the code may be implemented in Go.
Stability Patterns
The stability patterns presented here address one or more of the assumptions called out by the fallacies of distributed computing. They’re generally intended to be applied by distributed applications to improve their own stability and the stability of the larger system they’re a part of.
Circuit Breaker
Circuit Breaker automatically degrades service functions in response to a likely fault, preventing larger or cascading failures by eliminating recurring errors and providing reasonable error responses.
Applicability
If the fallacies of distributed computing were to be distilled to one point, it would be that errors and failures are an undeniable fact of life for distributed, cloud native systems. Services become misconfigured, databases crash, networks partition. We can’t prevent it; we can only accept and account for it.
Failing to do so can have some rather unpleasant consequences. We’ve all seen them, and they aren’t pretty. Some services might keep futilely trying to do their job and returning nonsense to their client; others might fail catastrophically and maybe even fall into a crash/restart death spiral. It doesn’t matter, because in the end they’re all wasting resources, obscuring the source of original failure, and making cascading failures even more likely.
On the other hand, a service that’s designed with the assumption that its dependencies can fail at any time can respond reasonably when they do. The Circuit Breaker allows a service to detect such failures and to “open the circuit” by temporarily ceasing to execute requests, instead providing clients with an error message consistent with the service’s communication contract.
For example, imagine a service that (ideally) receives a request from a client, executes a database query, and returns a response. What if the database fails? The service might continue futilely trying to query it anyway, flooding the logs with error messages and eventually timing out or returning useless errors. Such a service can use a Circuit Breaker to “open the circuit” when the database fails, preventing the service from making any more doomed database requests (at least for a while) and allowing it to respond to the client immediately with a meaningful notification.
Participants
This pattern includes the following participants:
- Circuit
-
The function that interacts with the service.
- Breaker
-
A closure with the same function signature as Circuit.
Implementation
Essentially, the Circuit Breaker is just a specialized Adapter pattern, with Breaker
wrapping Circuit
to add some additional error handling logic.
Like the electrical switch from which this pattern derives its name, Breaker
has two possible states: closed and open. In the closed state, everything is functioning normally. All requests received from the client by Breaker
are forwarded unchanged to Circuit
, and all responses from Circuit
are forwarded back to the client. In the open state, Breaker
doesn’t forward requests to Circuit
. Instead it “fails fast” by responding with an informative error message.
Breaker
internally tracks the errors returned by Circuit
; if the number of consecutive errors returned by Circuit
exceeds a defined threshold, Breaker
trips and its state switches to open.
Most implementations of Circuit Breaker include some logic to automatically close the circuit after some period of time. Keep in mind, though, that hammering an already malfunctioning service with lots of retries can cause its own problems, so it’s standard to include some kind of backoff logic that reduces the rate of retries over time. The subject of backoff is actually fairly nuanced, but it will be covered in detail in “Play It Again: Retrying Requests”.
In a multinode service, this implementation can be extended to include some shared storage mechanism, such as a Memcached or Redis network cache, to track the circuit state.
Sample code
We begin by creating a Circuit
type that specifies the signature of the function that’s interacting with your database or other upstream service. In practice, this can take whatever form is appropriate for your functionality. It should include an error
in its return list, however:
type
Circuit
func
(
context
.
Context
)
(
string
,
error
)
In this example, Circuit
is a function that accepts a Context
value, which was described in depth in “The Context Package”. Your implementation may vary.
The Breaker
function accepts any function that conforms to the Circuit
type definition, and an integer representing the number of consecutive failures allowed before the circuit automatically opens. In return it provides another function, which also conforms to the Circuit
type definition:
func
Breaker
(
circuit
Circuit
,
threshold
int
)
Circuit
{
var
failures
int
var
last
=
time
.
Now
()
var
m
sync
.
RWMutex
return
func
(
ctx
context
.
Context
)
(
string
,
error
)
{
m
.
RLock
()
// Establish a "read lock"
d
:=
failures
-
threshold
if
d
>=
0
{
shouldRetryAt
:=
last
.
Add
((
2
<<
d
)
*
time
.
Second
)
if
!
time
.
Now
().
After
(
shouldRetryAt
)
{
m
.
RUnlock
()
return
""
,
errors
.
New
(
"service unavailable"
)
}
}
m
.
RUnlock
()
// Release read `lock`
response
,
err
:=
circuit
(
ctx
)
// Issue the request proper
m
.
Lock
()
// Lock around shared resources
defer
m
.
Unlock
()
last
=
time
.
Now
()
// Record time of attempt
if
err
!=
nil
{
// Circuit returned an error,
failures
++
// so we count the failure
return
response
,
err
// and return
}
failures
=
0
// Reset failures counter
return
response
,
nil
}
}
The Breaker
function constructs another function, also of type Circuit
, which wraps circuit
to provide the desired functionality. You may recognize this from “Anonymous Functions and Closures” as a closure: a nested function with access to the variables of its parent function. As you will see, all of the “stability” functions implemented for this chapter work this way.
The closure works by counting the number of consecutive errors returned by
circuit
. If that value meets the failure threshold, then it returns the error “service unreachable” without actually calling circuit
. Any successful calls to circuit
cause failures
to reset to 0, and the cycle begins again.
The closure even includes an automatic reset mechanism that allows requests to call circuit
again after several seconds, with an exponential backoff in which the durations of the delays between retries roughly doubles with each attempt. Though simple and quite common, this actually isn’t the ideal backoff algorithm. We’ll review exactly why in “Backoff Algorithms”.
This function also includes our first use of a mutex (also known as a lock).5 Mutexes are a common idiom in concurrent programming, and we’re going to use them quite a bit in this chapter, so if you’re fuzzy on mutexes in Go, see “Mutexes”.
Debounce
Debounce limits the frequency of a function invocation so that only the first or last in a cluster of calls is actually performed.
Applicability
Debounce is the second of our patterns to be labeled with an electrical circuit theme. Specifically, it’s named after a phenomenon in which a switch’s contacts “bounce” when they’re opened or closed, causing the circuit to fluctuate a bit before settling down. It’s usually no big deal, but this “contact bounce” can be a real problem in logic circuits where a series of on/off pulses can be interpreted as a data stream. The practice of eliminating contact bounce so that only one signal is transmitted by an opening or closing contact is called debouncing.
In the world of services, we sometimes find ourselves performing a cluster of potentially slow or costly operations where only one would do. Using the Debounce pattern, a series of similar calls that are tightly clustered in time are restricted to only one call, typically the first or last in a batch.
This technique has been used in the JavaScript world for years to limit the number of operations that could slow the browser by taking only the first in a series of user events, or to delay a call until a user is ready. You’ve probably seen an application of this technique in practice before. We’re all familiar with the experience of using a search bar whose autocomplete pop-up doesn’t display until after you pause typing, or spam-clicking a button only to see the clicks after the first ignored.
Those of us who specialize in backend services can learn a lot from our frontend colleagues, who have been working for years to account for the reliability, latency, and bandwidth issues inherent to distributed systems. For example, this approach could be used to retrieve some slowly updating remote resource without bogging down, wasting both client and server time with wasteful requests.
This pattern is similar to “Throttle”, in that it limits how often a function can be called. But where Debounce restricts clusters of invocations, Throttle simply limits according to time period. For more on the difference between the Debounce and Throttle patterns, see “What’s the Difference Between Throttle and Debounce?”.
Participants
This pattern includes the following participants:
- Circuit
-
The function to regulate
- Debounce
-
A closure with the same function signature as Circuit
Implementation
The Debounce implementation is actually very similar to the one for Circuit Breaker in that it wraps Circuit to provide the rate-limiting logic. That logic is actually quite straightforward: on each call of the outer function—regardless of its outcome—a time interval is set. Any subsequent call made before that time interval expires is ignored; any call made afterward is passed along to the inner function. This implementation, in which the inner function is called once and subsequent calls are ignored, is called function-first and is useful because it allows the initial response from the inner function to be cached and returned.
A function-last implementation will wait for a pause after a series of calls before calling the inner function. This variant is common in the JavaScript world when a programmer wants a certain amount of input before making a function call, such as when a search bar waits for a pause in typing before autocompleting. Function-last tends to be less common in backend services because it doesn’t provide an immediate response, but it can be useful if your function doesn’t need results right away.
Sample code
Just like in the Circuit Breaker implementation, we start by defining a derived function type with the signature of the function we want to limit. Also like Circuit Breaker, we call it Circuit
; it’s identical to the one declared in that example. Again, Circuit
can take whatever form is appropriate for your functionality, but it should include an error
in its returns:
type
Circuit
func
(
context
.
Context
)
(
string
,
error
)
The similarity with the Circuit Breaker implementation is quite intentional: their compatibility makes them “chainable,” as demonstrated in the following:
func
myFunction
(
ctx
context
.
Context
)
(
string
,
error
)
{
/* ... */
}
wrapped
:=
Breaker
(
Debounce
(
myFunction
))
response
,
err
:=
wrapped
(
ctx
)
The function-first implementation of Debounce—DebounceFirst
—is very straightforward compared to function-last because it needs to track only the last time it was called and return a cached result if it’s called again less than d
duration after:
func
DebounceFirst
(
circuit
Circuit
,
d
time
.
Duration
)
Circuit
{
var
threshold
time
.
Time
var
result
string
var
err
error
var
m
sync
.
Mutex
return
func
(
ctx
context
.
Context
)
(
string
,
error
)
{
m
.
Lock
()
defer
m
.
Unlock
()
if
time
.
Now
().
Before
(
threshold
)
{
return
result
,
err
}
result
,
err
=
circuit
(
ctx
)
threshold
=
time
.
Now
().
Add
(
d
)
return
result
,
err
}
}
This implementation of DebounceFirst
takes pains to ensure thread safety by wrapping the entire function in a mutex. While this will force overlapping calls at the start of a cluster to have to wait until the result is cached, it also guarantees that circuit
is called exactly once, at the very beginning of a cluster. A defer
ensures that the value of threshold
, representing the time when a cluster ends (if there are no further calls), is reset with every call.
There’s a potential problem with this approach: it effectively just caches the result of the function and returns it if it’s called again. But what if the circuit
function has important side effects? The following variation, DebounceFirstContext
, is a little more sophisticated in that every call to it produces a call to circuit
, but each call after the first context cancels the one before it:
func
DebounceFirstContext
(
circuit
Circuit
,
d
time
.
Duration
)
Circuit
{
var
threshold
time
.
Time
var
m
sync
.
Mutex
var
lastCtx
context
.
Context
var
lastCancel
context
.
CancelFunc
return
func
(
ctx
context
.
Context
)
(
string
,
error
)
{
m
.
Lock
()
if
time
.
Now
().
Before
(
threshold
)
{
lastCancel
()
}
lastCtx
,
lastCancel
=
context
.
WithCancel
(
ctx
)
threshold
=
time
.
Now
().
Add
(
d
)
m
.
Unlock
()
result
,
err
:=
circuit
(
lastCtx
)
return
result
,
err
}
}
In DebounceFirstContext
we use the same general structure and locking scheme as in DebounceFirst
, but this time we set aside a Context
and CancelFunc
for circuit
, which allows us to explicitly cancel circuit
with each subsequent invocation before calling it again. In this way, circuit
is still called (and any expected side effects triggered) each time while explicitly canceling any prior calls.
What if we want to call our Circuit
function at the end of a cluster of calls? For this we’ll use a function-last implementation. Unfortunately, it’s a bit more awkward because it involves the use of a timer function to determine whether enough time has passed since the function was last called, and to call only circuit
when it has:
type
Circuit
func
(
context
.
Context
)
(
string
,
error
)
func
DebounceLast
(
circuit
Circuit
,
d
time
.
Duration
)
Circuit
{
var
m
sync
.
Mutex
var
timer
*
time
.
Timer
var
cctx
context
.
Context
var
cancel
context
.
CancelFunc
return
func
(
ctx
context
.
Context
)
(
string
,
error
)
{
m
.
Lock
()
if
timer
!=
nil
{
timer
.
Stop
()
cancel
()
}
cctx
,
cancel
=
context
.
WithCancel
(
ctx
)
ch
:=
make
(
chan
struct
{
result
string
err
error
},
1
)
timer
=
time
.
AfterFunc
(
d
,
func
()
{
r
,
e
:=
circuit
(
cctx
)
ch
<-
struct
{
result
string
err
error
}{
r
,
e
}
})
m
.
Unlock
()
select
{
case
res
:=
<-
ch
:
return
res
.
result
,
res
.
err
case
<-
cctx
.
Done
():
return
""
,
cctx
.
Err
()
}
}
}
In this implementation, a call to DebounceLast
uses time.AfterFunc
to execute the circuit
function after the specified duration. This useful function lets us call an arbitrary function after a specific duration. It also provides a time.Timer
value that can be used to cancel it. This is exactly what we need: you’ll notice that any already-existing timer
is stopped (via its Stop
method) before starting a new one, ensuring that circuit
is called only once.
You’ve probably noticed that we also use a channel to send an anonymous struct (yes, you can do that!). This not only allows us to transmit both of the return values of circuit
beyond the asynchronous function it’s called in, but it also conveniently allows us to use a select
statement to react appropriately to a context cancellation event.
Retry
Retry accounts for a possible transient fault in a distributed system by transparently retrying a failed operation.
Applicability
Transient errors are a fact of life when working with complex distributed systems. These can be caused by any number of (hopefully) temporary conditions, especially if the downstream service or network resource has protective strategies in place, such as throttling that temporarily rejects requests under high workload, or adaptive strategies like autoscaling that can add capacity when needed.
These faults often resolve themselves after a bit of time, so repeating the request after a reasonable delay is likely (but not guaranteed) to be successful. Failing to account for transient faults can lead to a system that’s unnecessarily brittle. On the other hand, implementing an automatic retry strategy can considerably improve the stability of the service that can benefit both it and its upstream consumers.
Warning
Retry should be used with only idempotent operations. If you are not familiar with the concept of idempotence, we will cover it in detail in “What Is Idempotence and Why Does It Matter?”.
Participants
This pattern includes the following participants:
- Effector
-
The function that interacts with the service.
- Retry
-
A function that accepts Effector and returns a closure with the same function signature as Effector.
Implementation
This pattern works similarly to Circuit Breaker or Debounce in that there is a derived function type, Effector, that defines a function signature. This signature can take whatever form is appropriate for your implementation, but when the function executing the potentially failing operation is implemented, it must match the signature defined by Effector.
The Retry function accepts the user-defined Effector function and returns an Effector function that wraps the user-defined function to provide the retry logic. Along with the user-defined function, Retry also accepts an integer describing the maximum number of retry attempts that it will make and a time.Duration
that describes how long it’ll wait between each retry attempt. If the retries
parameter is 0, then the retry logic will effectively become a no-op.
Note
Although not included here, most retry implementations will include some kind of backoff logic.
Sample code
The signature for function argument of the Retry
function is Effector
. It looks exactly like the function types for the previous patterns:
type
Effector
func
(
context
.
Context
)
(
string
,
error
)
The Retry
function itself is relatively straightforward, at least compared to the functions we’ve seen so far in this chapter:
func
Retry
(
effector
Effector
,
maxRetries
int
,
delay
time
.
Duration
)
Effector
{
return
func
(
ctx
context
.
Context
)
(
string
,
error
)
{
for
r
:=
0
;
;
r
++
{
response
,
err
:=
effector
(
ctx
)
if
err
==
nil
||
r
>=
maxRetries
{
return
response
,
err
}
log
.
Printf
(
"Attempt %d failed; retrying in %v"
,
r
+
1
,
delay
)
select
{
case
<-
time
.
After
(
delay
):
case
<-
ctx
.
Done
():
return
""
,
ctx
.
Err
()
}
}
}
}
You may have already noticed what it is that keeps the Retry
function so slender: although it returns a function, that function doesn’t have any external state. This means we don’t need any elaborate mechanisms to manage concurrency.
Note the contents of the select
block, which demonstrates a common idiom for implementing a channel read timeout in Go based on the time.After
function, similar to the example in “Implementing channel timeouts”. This very useful function returns a channel that emits a message after the specified time has elapsed, which activates its case and ends the current iteration of the retry loop.
To use Retry
, we can implement the function that executes the potentially failing operation and whose signature matches the Effector
type; this role is played by
EmulateTransientError
in the following example:
var
count
int
func
EmulateTransientError
(
ctx
context
.
Context
)
(
string
,
error
)
{
count
++
if
count
<=
3
{
return
"intentional fail"
,
errors
.
New
(
"error"
)
}
else
{
return
"success"
,
nil
}
}
func
main
()
{
r
:=
Retry
(
EmulateTransientError
,
5
,
2
*
time
.
Second
)
res
,
err
:=
r
(
context
.
Background
())
fmt
.
Println
(
res
,
err
)
}
In the main
function, the EmulateTransientError
function is passed to Retry
, providing the function variable r
. When r
is called, EmulateTransientError
is called, and called again after a delay if it returns an error, according to the retry logic shown previously. Finally, after the fourth attempt, EmulateTransientError
returns a nil
error, and the Retry function exits.
Throttle
Throttle limits the frequency of a function call to some maximum number of invocations per unit of time.
Applicability
The Throttle pattern is named after a device used to manage the flow of a fluid, such as the amount of fuel going into a car engine. Like its namesake mechanism, Throttle restricts the number of times that a function can be called over a period of time. For example:
-
A user may be allowed only 10 service requests per second.
-
A client may restrict itself to call a particular function once every 500 milliseconds.
-
An account may be allowed only three failed login attempts in a 24-hour period.
Perhaps the most common reason to apply a Throttle is to account for sharp activity spikes that could saturate the system with a possibly unreasonable number of requests that may be expensive to satisfy, or lead to service degradation and eventually failure. While it may be possible for a system to scale up to add sufficient capacity to meet user demand, this takes time, and the system may not be able to react quickly enough.
Participants
This pattern includes the following participants:
- Effector
-
The function to regulate
- Throttle
-
A function that accepts Effector and returns a closure with the same function signature as Effector
Implementation
The Throttle pattern is similar to many of the other patterns described in this chapter: it’s implemented as a function that accepts an effector function and returns a Throttle
closure with the same signature that provides the rate-limiting logic.
The most common algorithm for implementing rate-limiting behavior is the token bucket, which uses the analogy of a bucket that can hold some maximum number of tokens. When a function is called, a token is taken from the bucket, which then refills at some fixed rate.
The way that a Throttle
treats requests when there are insufficient tokens in the bucket to pay for it can vary depending on the needs of the developer. Some common strategies are as follows:
- Return an error
-
This is the most basic strategy and is common when you’re only trying to restrict unreasonable or potentially abusive numbers of client requests. A RESTful service adopting this strategy might respond with a status
429 (Too Many Requests)
. - Replay the response of the last successful function call
-
This strategy can be useful when a service or expensive function call is likely to provide an identical result if called too soon. It’s commonly used in the JavaScript world.
- Enqueue the request for execution when sufficient tokens are available
-
This approach can be useful when you want to eventually handle all requests, but it’s also more complex and may require taking care to ensure that memory isn’t exhausted.
Sample code
The following example implements a basic “token bucket” algorithm that uses the “error” strategy:
type
Effector
func
(
context
.
Context
)
(
string
,
error
)
func
Throttle
(
e
Effector
,
max
uint
,
refill
uint
,
d
time
.
Duration
)
Effector
{
var
tokens
=
max
var
once
sync
.
Once
var
m
sync
.
Mutex
return
func
(
ctx
context
.
Context
)
(
string
,
error
)
{
if
ctx
.
Err
()
!=
nil
{
return
""
,
ctx
.
Err
()
}
once
.
Do
(
func
()
{
ticker
:=
time
.
NewTicker
(
d
)
go
func
()
{
defer
ticker
.
Stop
()
for
{
select
{
case
<-
ctx
.
Done
():
return
case
<-
ticker
.
C
:
m
.
Lock
()
t
:=
tokens
+
refill
if
t
>
max
{
t
=
max
}
tokens
=
t
m
.
Unlock
()
}
}
}()
})
m
.
Lock
()
defer
m
.
Unlock
()
if
tokens
<=
0
{
return
""
,
fmt
.
Errorf
(
"too many calls"
)
}
tokens
--
return
e
(
ctx
)
}
}
This Throttle
implementation is similar to our other examples in that it wraps an effector function e
with a closure that contains the rate-limiting logic. The bucket is initially allocated max
tokens; each time the closure is triggered, it checks whether it has any remaining tokens. If tokens are available, it decrements the token count by one and triggers the effector function. If not, an error is returned. Tokens are added at a rate of refill
tokens every duration d
.
Timeout
Timeout allows a process to stop waiting for an answer once it’s clear that an answer may not be coming.
Applicability
The first of the fallacies of distributed computing is that “the network is reliable,” and it’s first for a reason. Switches fail, routers and firewalls get misconfigured, packets get blackholed. Even if your network is working perfectly, not every service is thoughtful enough to guarantee a meaningful and timely response—or any response at all—if and when it malfunctions.
Timeout represents a common solution to this dilemma and is so beautifully simple that it barely even qualifies as a pattern at all: given a service request or function call that’s running for a longer-than-expected time, the caller simply…stops waiting.
However, don’t mistake “simple” or “common” for “useless.” On the contrary, the ubiquity of the timeout strategy is a testament to its usefulness. The judicious use of timeouts can provide a degree of fault isolation, preventing cascading failures and reducing the chance that a problem in a downstream resource becomes your problem.
Participants
This pattern includes the following participants:
- Client
-
The client who wants to execute SlowFunction
- SlowFunction
-
The long-running function that implements the functionality desired by Client
- Timeout
-
A wrapper function around SlowFunction that implements the timeout logic
Implementation
There are several ways to implement a timeout in Go, but the most idiomatic way is to use the functionality provided by the context
package. See “The Context Package” for more information.
In an ideal world, any possibly long-running function will accept a context.Context
parameter directly. If so, your work is fairly straightforward: you need only pass it a Context
value decorated with the context.WithTimeout
function:
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
result
,
err
:=
SomeFunction
(
ctx
)
However, this isn’t always the case, and with third-party libraries you don’t always have the option of refactoring to accept a Context
value. In these cases, the best course
of action may be to wrap the function call in such a way that it does respect your
Context
.
For example, imagine you have a potentially long-running function that not only doesn’t accept a Context
value but comes from a package you don’t control. If Client were to call SlowFunction directly, it would be forced to wait until the function completes, if indeed it ever does. Now what?
Instead of calling SlowFunction directly, you can call it in a goroutine. This way you can capture the results if it returns in an acceptable period of time, but you can also move on if it doesn’t.
Warning
Timing out doesn’t actually cancel SlowFunction. If it doesn’t end somehow, the result will be a goroutine leak. See “Goroutines” for more on this phenomenon.
To do this, we can leverage a few tools that we’ve seen before: context.Context
for timeouts, channels for communicating results, and select
to catch whichever one acts first.
Sample code
The following example imagines the existence of the fictional function, Slow
, whose execution may or may not complete in some reasonable amount of time and whose signature conforms with the following type definition:
type
SlowFunction
func
(
string
)
(
string
,
error
)
Rather than calling Slow
directly, we instead provide a Timeout
function, which wraps a provided SlowFunction
in a closure and returns a WithContext
function, which adds a context.Context
to the SlowFunction
’s parameter list:
type
WithContext
func
(
context
.
Context
,
string
)
(
string
,
error
)
func
Timeout
(
f
SlowFunction
)
WithContext
{
return
func
(
ctx
context
.
Context
,
arg
string
)
(
string
,
error
)
{
ch
:=
make
(
chan
struct
{
result
string
err
error
},
1
)
go
func
()
{
res
,
err
:=
f
(
arg
)
ch
<-
struct
{
result
string
err
error
}{
res
,
err
}
}()
select
{
case
res
:=
<-
ch
:
return
res
.
result
,
res
.
err
case
<-
ctx
.
Done
():
return
""
,
ctx
.
Err
()
}
}
}
Within the function that Timeout
constructs, the SlowFunction
is run in a goroutine, with its return values being wrapped in a struct and sent into a channel constructed for that purpose, assuming it completes in time.
The next statement select
’s on two channels: the SlowFunction
function response channel and the Context
value’s Done
channel. If the former completes first, the closure will return the SlowFunction
’s return values; otherwise, it returns the error provided by the Context
.
Tip
If the SlowFunction
is slow because it’s expensive, a possible improvement would be to check whether ctx.Err()
returns a non-nil value before calling the goroutine.
Using the Timeout
function isn’t much more complicated than consuming Slow
directly, except that instead of one function call, we have two: the call to Timeout
to retrieve the closure and the call to the closure itself:
func
main
()
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
time
.
Second
)
defer
cancel
()
timeout
:=
Timeout
(
Slow
)
res
,
err
:=
timeout
(
ctx
,
"some input"
)
fmt
.
Println
(
res
,
err
)
}
Finally, although it’s usually preferred to implement service timeouts using
context.Context
, channel timeouts can also be implemented using the channel provided by the time.After
function. See “Implementing channel timeouts” for an example of how this is done.
Concurrency Patterns
A cloud native service will often be called upon to efficiently juggle multiple processes and handle high (and highly variable) levels of load, ideally without having to suffer the trouble and expense of scaling up. As such, it needs to be highly concurrent and able to manage multiple simultaneous requests from multiple clients. While Go is known for its concurrency support, bottlenecks can and do happen. Some of the patterns that have been developed to prevent them are presented here.
For the sake of simplicity, the code examples in this section don’t implement context cancellation. Generally, you shouldn’t have too much difficulty adding it on your own.
Fan-In
Fan-in multiplexes multiple input channels onto one output channel.
Applicability
Services that have some number of workers that all generate output may find it useful to combine all of the workers’ outputs to be processed as a single unified stream. For these scenarios we use the fan-in pattern, which can read from multiple input channels by multiplexing them onto a single destination channel.
Participants
This pattern includes the following participants:
- Sources
-
A set of one or more input channels with the same type. Accepted by Funnel.
- Destination
-
An output channel of the same type as Sources. Created and provided by Funnel.
- Funnel
-
Accepts Sources and immediately returns Destination. Any input from any Sources will be output by Destination.
Implementation
Funnel is implemented as a function that receives zero to N input channels (Sources). For each input channel in Sources, the Funnel function starts a separate goroutine to read values from its assigned channel and forward them to a single output channel shared by all of the goroutines (Destination).
Sample code
The Funnel
function is a variadic function that receives sources
: zero to N channels of some type (int
in the following example):
func
Funnel
(
sources
...<-
chan
int
)
<-
chan
int
{
dest
:=
make
(
chan
int
)
// The shared output channel
wg
:=
sync
.
WaitGroup
{}
// Used to automatically close dest
// when all sources are closed
wg
.
Add
(
len
(
sources
))
// Set size of the WaitGroup
for
_
,
ch
:=
range
sources
{
// Start a goroutine for each source
go
func
(
ch
<-
chan
int
)
{
defer
wg
.
Done
()
// Notify WaitGroup when ch closes
for
n
:=
range
ch
{
dest
<-
n
}
}(
ch
)
}
go
func
()
{
// Start a goroutine to close dest
wg
.
Wait
()
// after all sources close
close
(
dest
)
}()
return
dest
}
For each channel in the list of sources
, Funnel
starts a dedicated goroutine that reads values from its assigned channel and forwards them to dest
, a single-output channel shared by all of the goroutines.
Note the use of a sync.WaitGroup
to ensure that the destination channel is closed appropriately. Initially, a WaitGroup
is created and set to the total number of source channels. If a channel is closed, its associated goroutine exits, calling wg.Done
. When all of the channels are closed, the WaitGroup’s counter reaches zero, the lock imposed by wg.Wait
is released, and the dest
channel is closed.
Using Funnel
is reasonably straightforward: given N source channels (or a slice of N channels), pass the channels to Funnel
. The returned destination channel may be read in the usual way and will close when all source channels close:
func
main
()
{
var
sources
[]
<-
chan
int
// Declare an empty channel slice
for
i
:=
0
;
i
<
3
;
i
++
{
ch
:=
make
(
chan
int
)
sources
=
append
(
sources
,
ch
)
// Create a channel; add to sources
go
func
()
{
// Run a toy goroutine for each
defer
close
(
ch
)
// Close ch when the routine ends
for
i
:=
1
;
i
<=
5
;
i
++
{
ch
<-
i
time
.
Sleep
(
time
.
Second
)
}
}()
}
dest
:=
Funnel
(
sources
...
)
for
d
:=
range
dest
{
fmt
.
Println
(
d
)
}
}
This example creates a slice of three int
channels, into which the values from 1 to 5 are sent before being closed. In a separate goroutine, the outputs of the single dest
channel are printed. Running this will result in the appropriate 15 lines being printed before dest
closes and the function ends.
Fan-Out
Fan-out evenly distributes messages from an input channel to multiple output channels.
Applicability
Fan-out receives messages from an input channel, distributing them evenly among output channels, and is a useful pattern for parallelizing CPU and I/O utilization.
For example, imagine that you have an input source, such as a Reader
on an input stream or a listener on a message broker that provides the inputs for some resource-intensive unit of work. Rather than coupling the input and computation processes, which would confine the effort to a single serial process, you might prefer to parallelize the workload by distributing it among some number of concurrent worker
processes.
Participants
This pattern includes the following participants:
- Source
-
An input channel. Accepted by Split.
- Destinations
-
An output channel of the same type as Source. Created and provided by Split.
- Split
-
A function that accepts Source and immediately returns Destinations. Any input from Source will be output to a Destination.
Implementation
Fan-out may be relatively conceptually straightforward, but the devil is in the details.
Typically, fan-out is implemented as a Split function, which accepts a single Source channel and integer representing the desired number of Destination channels. The Split function creates the Destination channels and executes some background process that retrieves values from Source channel and forwards them to one of the Destinations.
The implementation of the forwarding logic can be done in one of two ways:
-
Using a single goroutine that reads values from Source and forwards them to the Destinations in a round-robin fashion. This has the virtue of requiring only one master goroutine, but if the next channel isn’t ready to read yet, it’ll slow the entire process.
-
Using separate goroutines for each Destination that compete to read the next value from Source and forward it to their respective Destination. This requires slightly more resources but is less likely to get bogged down by a single slow-running worker.
The next example uses the latter approach.
Sample code
In this example, the Split
function accepts a single receive-only channel, source
, and an integer describing the number of channels to split the input into, n
. It returns a slice of n
send-only channels with the same type as source
.
Internally, Split
creates the destination channels. For each channel created, it executes a goroutine that retrieves values from source
in a for
loop and forwards them to their assigned output channel. Effectively, each goroutine competes for reads from source
; if several are trying to read, the “winner” will be randomly determined. If source
is closed, all goroutines terminate and all of the destination channels are closed:
func
Split
(
source
<-
chan
int
,
n
int
)
[]
<-
chan
int
{
var
dests
[]
<-
chan
int
// Declare the dests slice
for
i
:=
0
;
i
<
n
;
i
++
{
// Create n destination channels
ch
:=
make
(
chan
int
)
dests
=
append
(
dests
,
ch
)
go
func
()
{
// Each channel gets a dedicated
defer
close
(
ch
)
// goroutine that competes for reads
for
val
:=
range
source
{
ch
<-
val
}
}()
}
return
dests
}
Given a channel of some specific type, the Split
function will return a number of destination channels. Typically, each will be passed to a separate goroutine, as demonstrated in the following example:
func
main
()
{
source
:=
make
(
chan
int
)
// The input channel
dests
:=
Split
(
source
,
5
)
// Retrieve 5 output channels
go
func
()
{
// Send the number 1..10 to source
for
i
:=
1
;
i
<=
10
;
i
++
{
// and close it when we're done
source
<-
i
}
close
(
source
)
}()
var
wg
sync
.
WaitGroup
// Use WaitGroup to wait until
wg
.
Add
(
len
(
dests
))
// the output channels all close
for
i
,
d
:=
range
dests
{
go
func
(
i
int
,
d
<-
chan
int
)
{
defer
wg
.
Done
()
for
val
:=
range
d
{
fmt
.
Printf
(
"#%d got %d\n"
,
i
,
val
)
}
}(
i
,
d
)
}
wg
.
Wait
()
}
This example creates an input channel, source
, which it passes to Split
to receive its output channels. Concurrently, it passes the values 1 to 10 into source
in a goroutine while receiving values from dests
in 5 others. When the inputs are complete, the source
channel is closed, which triggers closures in the output channels, which ends the read loops, which causes wg.Done
to be called by each of the read goroutines, which releases the lock on wg.Wait
and allows the function to end.
Future
Future provides a placeholder for a value that’s not yet known.
Applicability
Futures (also known as Promises or Delays)6 are a synchronization construct that provide a placeholder for a value that’s still being generated by an asynchronous process.
This pattern isn’t used as frequently in Go as in some other languages because channels can often be used in a similar way. For example, the long-running blocking function BlockingInverse
(not shown) can be executed in a goroutine that returns the result (when it arrives) along a channel. The ConcurrentInverse
function that follows does exactly that, returning a channel that can be read when a result becomes available:
func
ConcurrentInverse
(
m
Matrix
)
<-
chan
Matrix
{
out
:=
make
(
chan
Matrix
)
go
func
()
{
out
<-
BlockingInverse
(
m
)
close
(
out
)
}()
return
out
}
Using ConcurrentInverse
, one could then build a function to calculate the inverse product of two matrices:
func
InverseProduct
(
a
,
b
Matrix
)
Matrix
{
inva
:=
ConcurrentInverse
(
a
)
invb
:=
ConcurrentInverse
(
b
)
return
Product
(
<-
inva
,
<-
invb
)
}
This doesn’t seem so bad, but it comes with some baggage that makes it undesirable for something like a public API. First, the caller has to be careful to call
ConcurrentInverse
with the correct timing. To see what I mean, take a close look at the following:
return
Product
(
<-
ConcurrentInverse
(
a
),
<-
ConcurrentInverse
(
b
))
See the problem? Since the computation isn’t started until ConcurrentInverse
is actually called, this construct would be effectively executed serially, requiring twice the runtime.
What’s more, when using channels this way, functions with more than one return value will usually assign a dedicated channel to each member of the return list, which can become awkward as the return list grows or when the values need to be read by more than one goroutine.
The Future pattern contains this complexity by encapsulating it in an API that provides the consumer with a simple interface whose method can be called normally, blocking all calling routines until all of its results are resolved. The interface that the value satisfies doesn’t even have to be constructed specially for that purpose; any interface that’s convenient for the consumer can be used.
Participants
This pattern includes the following participants:
- Future
-
The interface that is received by the consumer to retrieve the eventual result
- SlowFunction
-
A wrapper function around some function to be asynchronously executed; provides Future
- InnerFuture
-
Satisfies the Future interface; includes an attached method that contains the result access logic
Implementation
The API presented to the consumer is fairly straightforward: the programmer calls SlowFunction, which returns a value that satisfies the Future interface. Future may be a bespoke interface, as in the following example, or it may be something more like an io.Reader
that can be passed to its own functions.
In actuality, when SlowFunction is called, it executes the core function of interest as a goroutine. In doing so, it defines channels to capture the core function’s output, which it wraps in InnerFuture.
InnerFuture has one or more methods that satisfy the Future interface, which retrieve the values returned by the core function from the channels, cache them, and return them. If the values aren’t available on the channel, the request blocks. If they have already been retrieved, the cached values are returned.
Sample code
In this example, we use a Future
interface that the InnerFuture
will satisfy:
type
Future
interface
{
Result
()
(
string
,
error
)
}
The InnerFuture
struct is used internally to provide the concurrent functionality. In this example, it satisfies the Future
interface but could just as easily choose to satisfy something like io.Reader
by attaching a Read
method, if you prefer:
type
InnerFuture
struct
{
once
sync
.
Once
wg
sync
.
WaitGroup
res
string
err
error
resCh
<-
chan
string
errCh
<-
chan
error
}
func
(
f
*
InnerFuture
)
Result
()
(
string
,
error
)
{
f
.
once
.
Do
(
func
()
{
f
.
wg
.
Add
(
1
)
defer
f
.
wg
.
Done
()
f
.
res
=
<-
f
.
resCh
f
.
err
=
<-
f
.
errCh
})
f
.
wg
.
Wait
()
return
f
.
res
,
f
.
err
}
In this implementation, the struct itself contains a channel and a variable for each value returned by the Result
method. When Result
is first called, it attempts to read the results from the channels and send them back to the InnerFuture
struct so that subsequent calls to Result
can immediately return the cached values.
Note the use of sync.Once
and sync.WaitGroup
. The former does what it says on the tin: it ensures that the function that’s passed to it is called exactly once. The WaitGroup
is used to make this function call thread safe: any calls after the first will be blocked at wg.Wait
until the channel reads are complete.
SlowFunction
is a wrapper around the core functionality that you want to run concurrently. It has the job of creating the results channels, running the core function in a goroutine, and creating and returning the Future
implementation (InnerFuture
, in this example):
func
SlowFunction
(
ctx
context
.
Context
)
Future
{
resCh
:=
make
(
chan
string
)
errCh
:=
make
(
chan
error
)
go
func
()
{
select
{
case
<-
time
.
After
(
2
*
time
.
Second
):
resCh
<-
"I slept for 2 seconds"
errCh
<-
nil
case
<-
ctx
.
Done
():
resCh
<-
""
errCh
<-
ctx
.
Err
()
}
}()
return
&
InnerFuture
{
resCh
:
resCh
,
errCh
:
errCh
}
}
To make use of this pattern, you need only call the SlowFunction
and use the returned Future
as you would any other value:
func
main
()
{
ctx
:=
context
.
Background
()
future
:=
SlowFunction
(
ctx
)
// Do stuff while SlowFunction chugs along in the background.
res
,
err
:=
future
.
Result
()
if
err
!=
nil
{
fmt
.
Println
(
"error:"
,
err
)
return
}
fmt
.
Println
(
res
)
}
This approach provides a reasonably good user experience. The programmer can create a Future
and access it as they wish, and can even apply timeouts or deadlines with a Context
.
Sharding
Sharding splits a large data structure into multiple partitions to localize the effects of read/write locks.
Applicability
The term sharding is typically used in the context of distributed state to describe data that is partitioned between server instances. This kind of horizontal sharding is commonly used by databases and other data stores to distribute load and provide redundancy.
A slightly different issue can sometimes affect highly concurrent services that have a shared data structure with a locking mechanism to protect it from conflicting writes. In this scenario, the locks that serve to ensure the fidelity of the data can also create a bottleneck when processes start to spend more time waiting for locks than they do doing their jobs. This unfortunate phenomenon is called lock contention.
While this might be resolved in some cases by scaling the number of instances, this also increases complexity and latency, because distributed locks need to be established, and writes need to establish consistency. An alternative strategy for reducing lock contention around shared data structures within an instance of a service is vertical sharding, in which a large data structure is partitioned into two or more structures, each representing a part of the whole. Using this strategy, only a portion of the overall structure needs to be locked at a time, decreasing overall lock contention.
Participants
This pattern includes the following participants:
- ShardedMap
-
An abstraction around one or more Shards providing read and write access as if the Shards were a single map
- Shard
-
An individually lockable collection representing a single data partition
Implementation
While idiomatic Go generally prefers the use of memory sharing via channels over using locks to protect shared resources,7 this isn’t always possible. Maps are particularly unsafe for concurrent use, making the use of locks as a synchronization mechanism a necessary evil. Fortunately, Go provides sync.RWMutex
for precisely this purpose. You may recall sync.RWMutex
from “Mutexes”.
This strategy generally works well enough. However, because locks allow access to only one process at a time, the average amount of time spent waiting for locks to clear in a read/write intensive application can increase dramatically with the number of concurrent processes acting on the resource. The resulting lock contention can potentially bottleneck key functionality.
Vertical sharding reduces lock contention by splitting the underlying data structure—usually a map—into several individually lockable maps. An abstraction layer provides access to the underlying shards as if they were a single structure (see Figure 4-5).
Internally, this is accomplished by creating an abstraction layer around what is essentially a map of maps. Whenever a value is read or written to the map abstraction, a hash value is calculated for the key, which is then modded by the number of shards to generate a shard index. This allows the map abstraction to isolate the necessary locking to only the shard at that index.
Sample code
In the following example, we use the standard sync
and hash/fnv
packages to implement a basic sharded map: ShardedMap
.
Internally, ShardedMap
is just a slice of pointers to some number of Shard
values, but we define it as a type so we can attach methods to it. Each Shard
includes
a map[string]any
that contains that shard’s data and a composed
sync.RWMutex
so that it can be individually locked:
type
Shard
[
K
comparable
,
V
any
]
struct
{
sync
.
RWMutex
// Compose from sync.RWMutex
items
map
[
K
]
V
// m contains the shard's data
}
type
ShardedMap
[
K
comparable
,
V
any
]
[]
*
Shard
[
K
,
V
]
Go doesn’t have Java-style constructors, so we provide a NewShardedMap
function to retrieve a new ShardedMap
:
func
NewShardedMap
[
K
comparable
,
V
any
](
nshards
int
)
ShardedMap
[
K
,
V
]
{
shards
:=
make
([]
*
Shard
[
K
,
V
],
nshards
)
// Initialize a *Shards slice
for
i
:=
0
;
i
<
nshards
;
i
++
{
shard
:=
make
(
map
[
K
]
V
)
shards
[
i
]
=
&
Shard
[
K
,
V
]{
items
:
shard
}
// A ShardedMap IS a slice!
}
return
shards
}
ShardedMap
has two unexported methods, getShardIndex
and getShard
, which are used to calculate a key’s shard index and retrieve a key’s correct shard, respectively. These could be easily combined into a single method, but splitting them this way makes them easier to test:8
func
(
m
ShardedMap
[
K
,
V
])
getShardIndex
(
key
K
)
int
{
str
:=
reflect
.
ValueOf
(
key
).
String
()
// Get string representation of key
hash
:=
fnv
.
New32a
()
// Get a hash implementation
hash
.
Write
([]
byte
(
str
))
// Write bytes to the hash
sum
:=
int
(
hash
.
Sum32
())
// Get the resulting checksum
return
sum
%
len
(
m
)
// Mod by len(m) to get index
}
func
(
m
ShardedMap
[
K
,
V
])
getShard
(
key
K
)
*
Shard
[
K
,
V
]
{
index
:=
m
.
getShardIndex
(
key
)
return
m
[
index
]
}
To get the index, we first compute the hash of the string representation of the value, from which we calculate our final value by modding it against the number of shards. This definitely isn’t the most computationally efficient way to go about generating a hash, but it’s conceptually simple and works well enough for an example.
The precise hash algorithm doesn’t really matter very much—we use the FNV-1a hash function in this example—as long as it’s deterministic and provides a reasonably uniform value distribution.
Finally, we add methods to ShardedMap
to allow a user to read and write values. Obviously, these don’t demonstrate all of the functionality a map might need. The source for this example is in the GitHub repository associated with this book, however, so please feel free to implement them as an exercise. A Delete
and a Contains
method would be nice:
func
(
m
ShardedMap
[
K
,
V
])
Get
(
key
K
)
V
{
shard
:=
m
.
getShard
(
key
)
shard
.
RLock
()
defer
shard
.
RUnlock
()
return
shard
.
items
[
key
]
}
func
(
m
ShardedMap
[
K
,
V
])
Set
(
key
K
,
value
V
)
{
shard
:=
m
.
getShard
(
key
)
shard
.
Lock
()
defer
shard
.
Unlock
()
shard
.
items
[
key
]
=
value
}
When you do need to establish locks on all of the tables, it’s generally best to do so concurrently. In the following, we implement a Keys
function using goroutines and our old friend sync.WaitGroup
:
func
(
m
ShardedMap
[
K
,
V
])
Keys
()
[]
K
{
var
keys
[]
K
// Declare an empty keys slice
var
mutex
sync
.
Mutex
// Mutex for write safety to keys
var
wg
sync
.
WaitGroup
// Create a wait group and add a
wg
.
Add
(
len
(
m
))
// wait value for each slice
for
_
,
shard
:=
range
m
{
// Run a goroutine for each slice in m
go
func
(
s
*
Shard
[
K
,
V
])
{
s
.
RLock
()
// Establish a read lock on s
defer
wg
.
Done
()
// Release of the read lock
defer
s
.
RUnlock
()
// Tell the WaitGroup it's done
for
key
,
_
:=
range
s
.
items
{
// Get the slice's keys
mutex
.
Lock
()
keys
=
append
(
keys
,
key
)
mutex
.
Unlock
()
}
}(
shard
)
}
wg
.
Wait
()
// Block until all goroutines are done
return
keys
// Return combined keys slice
}
Using ShardedMap
isn’t quite like using a standard map, unfortunately, but while it’s different, it’s no more complicated:
func
main
()
{
m
:=
NewShardedMap
[
string
,
int
](
5
)
keys
:=
[]
string
{
"alpha"
,
"beta"
,
"gamma"
}
for
i
,
k
:=
range
keys
{
m
.
Set
(
k
,
i
+
1
)
fmt
.
Printf
(
"%5s: shard=%d value=%d\n"
,
k
,
m
.
getShardIndex
(
k
),
m
.
Get
(
k
))
}
fmt
.
Println
(
m
.
Keys
())
}
Output:
alpha: shard=3 value=1 beta: shard=2 value=2 gamma: shard=0 value=3 [gamma beta alpha]
Prior to Go 1.21 (and in the previous edition of this book), the ShardedMap had to be constructed with any
type values, at least if it was going to be reusable. However, this came with the loss of type safety associated with the use of any
and the subsequent requirement of type assertions. Fortunately, with the release of Go generics, this is a solved problem.
Worker Pool
A worker pool directs multiple processes to concurrently execute work on a collection of input.
Applicability
Probably one of the most commonly used of the patterns in this chapter, a worker pool (or a “thread pool” in pretty much any other language) is a pattern used in many languages to efficiently manage the execution of multiple concurrent tasks by using a fixed number of workers.
Worker pools are very good at managing tasks that you want to run concurrently, but only to a point. Common applications include things like handling multiple incoming requests, processing task queues, handling steps in data pipelines, and executing long-duration batch processing jobs.
Participants
This pattern includes the following participants:
- Worker
-
A function that does some work on items from Jobs and sends the results to Results
- Jobs
-
A channel from which Worker receives the raw data to be worked on
- Results
-
A channel into which Worker sends the results of its work
Implementation
Programmers coming from other languages may be familiar with the concept of the “thread pool”: a group of threads that are standing by to do some work. This is a useful concurrency pattern in any language, but thread pools can often be tedious and sometimes complicated to work with.9
Fortunately, Go’s concurrency features—goroutines and channels—make it especially well-suited for building something like this. As such, our implementation is rather straightforward relative to many other languages: what we call a “worker” is really just a goroutine10 (Worker) that receives input data on an input channel (Jobs) and returns that work along an output channel (Results).
Both channels are shared among all workers, so any work sent along the jobs channel will be done by the first available worker. Importantly, each worker lives only as long as the jobs channel is open: they’re designed to automatically terminate once their job is done.
Sample code
If you’re accustomed to working with thread pools in other languages, you’ll hopefully find our example of a worker pool to be refreshingly simple. In my opinion, this pattern highlights the elegance of the design around Go’s concurrency features particularly well.
The meat of the pattern is the worker, which (as its name implies) does the work. Below is the blueprint for a single worker function:
func
worker
(
id
int
,
jobs
<-
chan
int
,
results
chan
<-
int
)
{
for
j
:=
range
jobs
{
fmt
.
Println
(
"Worker"
,
id
,
"started job"
,
j
)
time
.
Sleep
(
time
.
Second
)
results
<-
j
*
2
}
}
As you can see, this example worker is a fairly standard function that accepts a job
channel for sending work to it, and results
channels for sending the results of the work out. This implementation also takes an id
integer, which is just used for demonstration purposes.
This worker’s functionality is pretty trivial, reading an int
value from the jobs
channel and (after a brief pause to simulate effort) writing that value’s double out via the results
channel. Note that it’ll automatically exit when the jobs
channel closes.
Warning
You should always know how and when any goroutine you create is going to terminate, or you risk creating a goroutine leak.
Now, if we were using just one worker, this wouldn’t be much more useful than just looping over the inputs and doing the computation one by one. But of course, as you’ve probably already inferred, we won’t be using just one worker:
func
main
()
{
jobs
:=
make
(
chan
int
,
10
)
results
:=
make
(
chan
int
)
wg
:=
sync
.
WaitGroup
{}
for
w
:=
1
;
w
<=
3
;
w
++
{
// Spawn 3 workers processes
go
worker
(
w
,
jobs
,
results
)
}
for
j
:=
1
;
j
<=
10
;
j
++
{
// Send jobs to workers
wg
.
Add
(
1
)
jobs
<-
j
}
go
func
()
{
wg
.
Wait
()
close
(
jobs
)
close
(
results
)
}()
for
r
:=
range
results
{
fmt
.
Println
(
"Got result:"
,
r
)
wg
.
Done
()
}
}
In this example, we start by creating the two channels: job
to put work into and results
to receive the completed work from. We also create a sync.WaitGroup
, which lets us pause until all the work is done.
Now the interesting bit: we use the go
keyword to start three workers, passing both channels into each. We haven’t given them any work to do yet, so they’re all blocked waiting for jobs.
Now that we have the workers in place, let’s see what they can do by sending them some work units via the job
channel. Each work unit is received and processed by exactly one of the three workers. Finally, we read the results coming from the results
channel until it’s closed by the goroutine.
This pattern may strike you as trivial, and that’s okay. But it demonstrates how, with a couple of channels and goroutines, we can create a fully functional worker pool in Go.
Chord
The Chord (or Join) pattern performs an atomic consumption of messages from each member of a group of channels.
Applicability
In music, a chord is a group of multiple notes sounded together to produce a harmony. Like its namesake musical concept, the Chord pattern consists of multiple signals being sent together, in this case along channels, to produce a single output.
More specifically, the Chord pattern receives inputs from multiple channels but emits a value only when all its channels have emitted a value.
This could be a useful pattern if, for example, you need to receive inputs from multiple monitored data sources before acting.
Participants
This pattern includes the following participants:
- Sources
-
A set of one or more input channels with the same type. Accepted by Chord.
- Destination
-
An output channel of the same type as Sources. Created and provided by Chord.
- Chord
-
Accepts Sources and immediately returns Destination. Any input from any Sources will be output by Destination.
Implementation
The Chord pattern is very similar to “Fan-In” in that it concurrently reads inputs from multiple Sources channels and emits them back to the Destination channel, but that’s where the similarity ends. Where Fan-In immediately forwards all inputs, Chord waits to act until it’s received at least one value from each of its source channels, then it packages all the values into a slice and sends it on the Destination channel.
In fact, the first half of this pattern, which retrieves values from Sources and sends them into an intermediate channel for processing, looks and acts exactly like a Fan-In, to the extent that Chord can reasonably be considered an extension of Fan-In.
It’s that last half, though, that makes this pattern interesting. It’s this part that keeps track of which channels have sent messages since the last output, and for packaging the most recent inputs from each channel into a slice to be sent to Destination.
Sample code
Chord is probably the most conceptually complex pattern described in this chapter (which is why I saved it for last). However, as mentioned previously, the first half of the following example is taken directly from “Fan-In”.
Since you’re (hopefully) a Fan-in expert at this point, you’ll probably see that pattern in the first half or so of the following Chord
function:
func
Chord
(
sources
...<-
chan
int
)
<-
chan
[]
int
{
type
input
struct
{
// Used to send inputs
idx
,
input
int
// between goroutines
}
dest
:=
make
(
chan
[]
int
)
// The output channel
inputs
:=
make
(
chan
input
)
// An intermediate channel
wg
:=
sync
.
WaitGroup
{}
// Used to close channels when
wg
.
Add
(
len
(
sources
))
// all sources are closed
for
i
,
ch
:=
range
sources
{
// Start goroutine for each source
go
func
(
i
int
,
ch
<-
chan
int
)
{
defer
wg
.
Done
()
// Notify WaitGroup when ch closes
for
n
:=
range
ch
{
inputs
<-
input
{
i
,
n
}
// Transfer input to next goroutine
}
}(
i
,
ch
)
}
go
func
()
{
wg
.
Wait
()
// Wait for all sources to close
close
(
inputs
)
// then close inputs channel
}()
go
func
()
{
res
:=
make
([]
int
,
len
(
sources
))
// Slice for incoming inputs
sent
:=
make
([]
bool
,
len
(
sources
))
// Slice to track sent status
count
:=
len
(
sources
)
// Counter for channels
for
r
:=
range
inputs
{
res
[
r
.
idx
]
=
r
.
input
// Update incoming input
if
!
sent
[
r
.
idx
]
{
// First input from channel?
sent
[
r
.
idx
]
=
true
count
--
}
if
count
==
0
{
c
:=
make
([]
int
,
len
(
res
))
// Copy and send inputs slice
copy
(
c
,
res
)
dest
<-
c
count
=
len
(
sources
)
// Reset counter
clear
(
sent
)
// Clear status tracker
}
}
close
(
dest
)
}()
return
dest
}
Looking closely, you’ll notice that Chord
is composed of three key sections.
The first is a for
loop that spawns a number of goroutines, one for each channel in sources
. When any of these receives a value from its respective channel, it sends both its channel’s index and the value received to the intermediate inputs
channel to be picked up by another goroutine. When a sources
channel closes, the goroutine watching it ends as well, decrementing the WaitGroup
counter in the process.
The next section is a goroutine whose only job is to wait for the WaitGroup
. When all of the sources
channels close, the lock is released, and the intermediate inputs
channel is closed.
Finally, the last goroutine’s purpose is to read input
values from the intermediate inputs
channel and determine whether all channels in sources
have sent a value yet. Whenever a value is received from an input channel, the res
slice is updated at the appropriate index with the value. If this is the first time receiving a value from a particular channel, the count
counter is decremented.
Once count
hits zero, a few things happen: the res
slice (which contains all of the most recent inputs from all the sources channels
) is copied and sent into the destination channel, the count
counter is reset, and the sent
slice is cleared. With that, Chord
is reset for another read cycle.
Note how we copy the res
slice before sending it. This is a safety feature made necessary by the fact that slices are a pointer type, so if we don’t make a copy in this way, subsequent inputs could cause the slice to be modified as it’s being used elsewhere.
Now let’s see Chord
in action:
func
main
()
{
ch1
:=
make
(
chan
int
)
ch2
:=
make
(
chan
int
)
ch3
:=
make
(
chan
int
)
go
func
()
{
for
n
:=
1
;
n
<=
4
;
n
++
{
ch1
<-
n
ch1
<-
n
*
2
// Writing twice to ch1!
ch2
<-
n
ch3
<-
n
time
.
Sleep
(
time
.
Second
)
}
close
(
ch1
)
// Closing all input channels
close
(
ch2
)
// causes res to be closed as
close
(
ch3
)
// as well
}()
res
:=
Chord
(
ch1
,
ch2
,
ch3
)
for
s
:=
range
res
{
// Read results
fmt
.
Println
(
s
)
}
}
The first thing that this function does is create some input channels, which we’ll be sending some data to see how Chord
behaves.
It then spawns a goroutine that sends some data into the channels, closing them all when it’s done sending. Note the double send to the ch1
channel.
Running this produces a nice, consistent output and terminates normally:
[2 1 1] [4 2 2] [6 3 3] [8 4 4]
This output tells us two things. First, the program terminated normally, so we know that the res
channel indeed closed when all of the input channels closed, breaking the final for
loop as expected. Second, the values in position 0 of each of the output slices reflects the second value sent into ch1
, highlighting that this Chord
implementation responds with the most recently sent values.
Summary
This chapter covered quite a few very interesting—and useful—idioms. There are probably many more,11 but these are the ones I felt were most important, either because they’re somehow practical in a directly applicable way or because they showcase some interesting feature of the Go language. Often both.
In Chapter 5, we’ll move on to the next level, taking some of the things we discussed in Chapters 3 and 4 and putting them into practice by building a simple key-value store from scratch!
1 Spoken August 1979. Attested to by Vicki Almstrum, Tony Hoare, Niklaus Wirth, Wim Feijen, and Rajeev Joshi. In Pursuit of Simplicity: A Symposium Honoring Professor Edsger Wybe Dijkstra, May 12–13, 2000.
2 L (yes, his legal name is L) is a brilliant and fascinating human being. Look him up some time.
3 Pun unavoidable.
4 Erich Gamma et al., Design Patterns: Elements of Reusable Object-Oriented Software, 1st ed. (Addison-Wesley Professional, 1994).
5 If you prefer boring names.
6 While these terms are often used interchangeably, they can also have shades of meaning depending on their context. I know. Please don’t write me any angry letters about this.
7 See the article, “Share Memory by Communicating”, The Go Blog.
8 If you’re into that kind of thing.
9 Java developers, you know what I’m talking about.
10 Which is why we call it a “worker pool” instead of a “thread pool.”
11 Did I leave out your favorite? Let me know, and I’ll try to include it in the next edition!
Get Cloud Native Go, 2nd Edition now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.