Implementing request/response using context in Go network clients
Learn how NATS requests work and how to use the context package for cancellation.
The first two parts of this series created a general-purpose client that can subscribe to channels on a NATS server, send messages, and wait for responses. But one of the most common communication models is request/response, where two clients engage in one-to-one bidirectional communication. NATS is a pure PubSub system, meaning that everything is built on top of publish and subscribe operations. The NATS Go client supports the Request/Response model of communication by building it on top of the PubSub methods we have already developed.
Because making a request involves awaiting for a response, a Go package that is gaining increasing adoption is context
, which was designed for request APIs by providing support for deadlines, cancellation signals, and request-scoped data. Cancellation propagation is an important topic in Go, because it allows us quickly reclaim any resources that may have been in use as soon as the inflight request or a parent context is cancelled. If there is a blocking call in your library, users of your library can benefit from a context-aware API to let them manage the cancellation in an idiomatic way. Contexts also allow you to set a hard deadline to timeout the request, and to include request-scoped data such as a request ID, which might be useful for tracing the request.
When making a NATS request, we may want to give up waiting after a certain time by using context.WithTimeout
or arbitrarily cancel an inflight request and propagate the cancellation by calling the given CancelFunc
.
The prototype of the Request method now looks like:
// Request takes a context, a subject, and a payload
// and returns a response as a byte array or an error.
func
(
c
*
Client
)
Request
(
ctx
context
.
Context
,
subj
string
,
payload
[]
byte
)
([]
byte
,
error
)
{
// ...
}
And with context
based cancellation, its sample usage is:
package
main
import
(
"context"
"log"
"time"
)
func
main
()
{
nc
:=
NewClient
()
err
:=
nc
.
Connect
(
"127.0.0.1:4222"
)
if
err
!=
nil
{
log
.
Fatalf
(
"Error: %s"
,
err
)
}
defer
nc
.
Close
()
nc
.
Subscribe
(
"help"
,
func
(
subject
,
reply
string
,
payload
[]
byte
)
{
log
.
Printf
(
"[Received] %s"
,
string
(
payload
))
nc
.
Publish
(
reply
,
[]
byte
(
"ok!"
))
})
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
2
*
time
.
Second
)
defer
cancel
()
for
{
response
,
err
:=
nc
.
Request
(
ctx
,
"help"
,
[]
byte
(
"please"
))
if
err
!=
nil
{
break
}
log
.
Println
(
"[Response]"
,
string
(
response
))
}
}
Internally, we will implement Request/Response support by creating a unique subscription to an inbox for each client. When publishing a request to the server, the client will pass the identifier for the subscription, and then wait for another subscriber to publish the response to the inbox of the client.
The resulting protocol looks like this:
# Requestor
SUB _INBOX.nwOOaWSeWrt0ok20pKFfNz.* 2 PUBhelp
_INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 6 please MSG _INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo2
10 I canhelp
# Responder
SUBhelp
1 MSGhelp
1
_INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 6 please PUB _INBOX.nwOOaWSeWrt0ok20pKFfNz.nwOOaWSeWrt0ok20pKFfPo 10 I canhelp
To generate unique identifiers, we can use the crypto/rand package from Go (the NATS client itself uses the custom-written nuid library, which generates identifiers with better performance). A sample implementation follows of the setup required to have the client receive requests similar to the new request/response scheme in the 1.3.0 release of the NATS client.
type
Client
struct
{
// respSub is the wildcard subject on which the responses
// are received.
respSub
string
// respMap is a map of request inboxes to the channel that is
// signaled when the response is received.
respMap
map
[
string
]
chan
[]
byte
// respMux is the subscription on which the requests
// are received.
respMux
*
Subscription
// respSetup is used to set up the wildcard subscription
// for requests.
respSetup
sync
.
Once
}
// Request takes a context, a subject, and a payload
// and returns a response as a byte array or an error.
func
(
c
*
Client
)
Request
(
ctx
context
.
Context
,
subj
string
,
payload
[]
byte
)
([]
byte
,
error
)
{
c
.
Lock
()
// Set up request subscription if we haven't done so already.
if
c
.
respMap
==
nil
{
u
:=
make
([]
byte
,
11
)
io
.
ReadFull
(
rand
.
Reader
,
u
)
c
.
respSub
=
fmt
.
Sprintf
(
"%s.%s.*"
,
"_INBOX"
,
hex
.
EncodeToString
(
u
))
c
.
respMap
=
make
(
map
[
string
]
chan
[]
byte
)
}
// Buffered channel awaits a single response.
dataCh
:=
make
(
chan
[]
byte
,
1
)
u
:=
make
([]
byte
,
11
)
io
.
ReadFull
(
rand
.
Reader
,
u
)
token
:=
hex
.
EncodeToString
(
u
)
c
.
respMap
[
token
]
=
dataCh
ginbox
:=
c
.
respSub
prefix
:=
c
.
respSub
[:
29
]
// _INBOX. + unique prefix
respInbox
:=
fmt
.
Sprintf
(
"%s.%s"
,
prefix
,
token
)
createSub
:=
c
.
respMux
==
nil
c
.
Unlock
()
if
createSub
{
var
err
error
c
.
respSetup
.
Do
(
func
()
{
fn
:=
func
(
subj
,
reply
string
,
data
[]
byte
)
{
// _INBOX. + unique prefix + . + token
respToken
:=
subj
[
30
:]
// Dequeue the first response only.
c
.
Lock
()
mch
:=
c
.
respMap
[
respToken
]
delete
(
c
.
respMap
,
respToken
)
c
.
Unlock
()
select
{
case
mch
<-
data
:
default
:
return
}
}
var
sub
*
Subscription
sub
,
err
=
c
.
Subscribe
(
ginbox
,
fn
)
c
.
Lock
()
c
.
respMux
=
sub
c
.
Unlock
()
})
if
err
!=
nil
{
return
nil
,
err
}
}
// ...
Then we publish the request, tagging it with the unique inbox for this request. We signal a flush and wait for the global request handler subscription unique to the client receive the message, or let the context timeout be canceled.
func
(
c
*
Client
)
Request
(
ctx
context
.
Context
,
subj
string
,
payload
[]
byte
)
([]
byte
,
error
)
{
// ...
// Publish the request along with the payload, then wait
// for the reply to be sent to the client or for the
// context to timeout:
//
// PUB subject reply-inbox #number-of-bytes\r\n
// <payload>\r\n
//
msgh
:=
[]
byte
(
"PUB "
)
msgh
=
append
(
msgh
,
subj
...
)
msgh
=
append
(
msgh
,
' '
)
msgh
=
append
(
msgh
,
respInbox
...
)
msgh
=
append
(
msgh
,
' '
)
var
b
[
12
]
byte
var
i
=
len
(
b
)
if
len
(
payload
)
>
0
{
for
l
:=
len
(
payload
);
l
>
0
;
l
/=
10
{
i
-=
1
b
[
i
]
=
digits
[
l
%
10
]
}
}
else
{
i
-=
1
b
[
i
]
=
digits
[
0
]
}
msgh
=
append
(
msgh
,
b
[
i
:]
...
)
msgh
=
append
(
msgh
,
_CRLF_
...
)
c
.
Lock
()
_
,
err
:=
c
.
w
.
Write
(
msgh
)
if
err
==
nil
{
_
,
err
=
c
.
w
.
Write
(
payload
)
}
if
err
==
nil
{
_
,
err
=
c
.
w
.
WriteString
(
_CRLF_
)
}
if
err
!=
nil
{
c
.
Unlock
()
return
nil
,
err
}
// Signal a flush for the request if there are none pending (length is zero).
if
len
(
c
.
fch
)
==
0
{
select
{
case
c
.
fch
<-
struct
{}{}:
default
:
}
}
c
.
Unlock
()
// Wait for the response via the data channel or give up if context is done
select
{
case
data
:=
<-
dataCh
:
return
data
,
nil
case
<-
ctx
.
Done
():
return
nil
,
ctx
.
Err
()
}
return
nil
,
nil
}
Wrapping up
This series of articles demonstrated some of the Go techniques that have been adopted in the NATS client during its evolution. We’ll finish with a couple of observations that can help you adapt these solutions to your particular needs.
- Channels might not always be the best solution for communicating. They’re really great for signaling, but sometimes not the best for sharing data. In the case of the NATS client, this meant that a simple mutex and a linked list gave better results.
- Remember that the Go standard library is often the most useful for the general case. Depending of the circumstances of your program, sometimes what you get in the standard library might not be the best. Go has great tooling to take data based decisions to identify the best approach.
- The context package is very useful and helps you write more readable code when you’re dealing with cancellation and timeouts. Keep in mind, though, that the community is still actively figuring out the best practices around the use of this package.
Lastly, it should be noted that the Go community moves fast and things continue to be improved under the hood, so it is important to keep up with the ecosystem and to give feedback as well, in order to help improve the libraries and tools further (see, for example, the number of experience reports collected from the community). Everyone on the forums is super helpful, and there are many great resources. Joining Gopher Slack is great idea too, if you are a Slack user.
In this series we have shown only a few examples of how the NATS project has benefited from the flexibility and simplicity from Go. If you’re further interested in more techniques that have worked for the NATS team in writing high performance Go code, you can also check the GopherCon 2014 talk on the subject by Derek Collison (creator of NATS), and the excellent StrangeLoop 2017 talk from Tyler Treat from the NATS team. These have many more insights on improving the performance of Go programs.