Chapter 4. Integrating Networking Into Our Own Async Runtime
In the previous chapter we built our own async runtime queue to understand how async tasks run through an async runtime. However, we only did some basic sleep and print operations. Focusing on simple operations is useful initially as we do not have to split focus between how the async runtime works and another concept. However, simple sleep and print functions are limiting. In this chapter we build on the async runtime we defined in the previous chapter and integrate networking protocols so they can run on our async runtime.
By the end of this chapter you will be able to integrate the Hyper Crate for HTTP requests into our runtime using traits. This means you will be able to take this example and integrate other third party dependencies into our async runtime via traits after reading the documentation of that crate. Finally, we will go to a lower level by implementing the Mio crate to directly poll sockets in our futures. This will give you an understanding of how to utilize fine grained control over how the socket is polled, read, and written to in our async runtime. With this exposure and some further external reading, you will be able to implement our own custom networking protocols.
It must be noted that this is the hardest chapter to follow, and it is not essential if you are not planning to integrate networking into a custom runtime. If you are not feeling it, feel free to skip this chapter and come back after reading the rest of the book. The reason that this chapter is placed at Chapter 4, is that it builds off the code written in Chapter 3.
Before we progress through the chapter we will need the following additional dependencies alongside the dependencies we used in the previous chapter:
[
dependencies
]
hyper
=
{
version
=
"0.14.26"
,
features
=
[
"http1"
,
"http2"
,
"client"
,
"runtime"
]
}
smol
=
"1.3.0"
anyhow
=
"1.0.70"
async
-
native
-
tls
=
"0.5.0"
http
=
"0.2.9"
tokio
=
"1.14.0"
We are using these dependencies:
-
hyper
: This crate is a fast and popular HTTP implementation. We are going to use this to make requests. We need the featuresclient
to allow us to make HTTP requests, andruntime
to allow compatability with a custom async runtime -
smol
: This crate is a small and fast async runtime. It is particularly good with lightweight tasks with low overhead. -
anyhow
: This crate is an error handling library -
async-native-tls
: This crate provides asynchronous TLS (Transport Layer Security) support. -
http
: This crate provides types for working with HTTP requests and their responses -
tokio
: This crate we have used before for demonstrating async runtime and will use again in this chapter.
As you can see we are going to be using Hyper for this example. This is to give you a different set of tools to what we have used in previous examples, and to demonstrate that tools like tokio are layered in other commonly used libraries. Before we write any code however, we must understand what an executor is.
Executors and Connectors
An executor is responsible for running Futures to completion. It is the part of the runtime that schedules tasks and makes sure they run (or executed) when they are ready. We need to have an executor when we introduce networking into our runtime because without it, our Futures such as HTTP requests would be created by never actually run.
A connector in networking is a component that establishes a connection between our application and the service we want to connect to. It handles things like opening TCP connections and maintaining these connections through the lifetime of the request.
Integrating Hyper into Our Async Runtime
Now that we understand what executors and connectors means, we can see how these concepts are essential when integrating a library like Hyper into our async runtime. Without an appropriate executor and connector, our runtime wouldn’t be able to handle the HTTP requests and connections that Hyper relies on.
If we look at Hyper official documentation or various online tutorials, we can get the impression that we can perform a simple get request using the Hyper crate with the following code:
use
hyper
::{
Request
,
Client
};
let
url
=
"http://www.rust-lang.org"
;
let
uri
:
Uri
=
url
.
parse
().
unwrap
();
let
request
=
Request
::
builder
()
.
method
(
"GET"
)
.
uri
(
uri
)
.
header
(
"User-Agent"
,
"hyper/0.14.2"
)
.
header
(
"Accept"
,
"text/html"
)
.
body
(
hyper
::
Body
::
empty
()).
unwrap
();
let
future
=
async
{
let
client
=
Client
::
new
();
client
.
request
(
request
).
await
.
unwrap
()
};
let
test
=
spawn_task
!
(
future
);
let
response
=
future
::
block_on
(
test
);
println!
(
"Response status: {}"
,
response
.
status
());
However, if we run the tutorial code, we would get the following error:
thread
'<unnamed>'
panicked
at
'
thereis
no
reactor
running,
must
be
called
from
the
context
of
a
Tokio
1
.xruntime
This is because under the hood, Hyper by default runs on the Tokio runtime and there is no executor being specified in our code. If you were going to use the Reqwest
or other popular crates chances are you will get a similar error.
To address the issue, we will create a custom executor that can handle our tasks within the custom async runtime we’ve built. Then, we’ll build a custom connector to manage the actual network connections, allowing our runtime to seamlessly integrate with Hyper and other similar libraries.
First step is to import the following into our program:
usestd::net::Shutdown
;
use
std::net::
{
TcpStream,ToSocketAddrs
}
;
use
std::pin::Pin
;
use
std::task::
{
Context,Poll
}
;
use
anyhow::
{
bail,Context
as
_,
Error,
Result
}
;
use
async_native_tls::TlsStream
;
use
http::Uri
;
use
hyper::
{
Body,Client,
Request,
Response
}
;
use
smol::
{
io,prelude::*,
Async
}
;
We can build our own executor with the code below:
structCustomExecutor
;
impl<F:
Future
+
Send
+
'
static>hyper::rt::Executor<F>
for
CustomExecutor
{
fn
execute
(
&
self,fut:
F
)
{
spawn_task!
(
async
{
println!
(
"sending request"
)
;
fut.await
;
})
.detach()
;
}
}
In the above code, we define our custom executor, and the behavior of the execute function. Inside our execute function, we call our spawn task macro. Inside we essentially create an async block and await for the future that was passed into the execute function. We must note that we employ the detach function otherwise the channel will be closed and we will not continue with our request due to the task moving out of scope and simply being dropped. If we recall from the previous chapter, the detach function will send the pointer of the task to a loop to be polled until the task has finished before dropping the task.
We now have a custom executor that we can pass into the Hyper client. However, our Hyper client will still fail to make the request because it is expecting the connection to be managed by the Tokio runtime. To fully integrate Hyper with our custom async runtime, we need to build our own async connector that handles network connections independently of Tokio.
Building An HTTP Connection
When it comes to networking requests the protocols are very well defined and standardized. For instance, a TCP connection has a three step handshake to establish a connection before sending packets of bytes through that connection. There is zero benefit in implementing the TCP connection from scratch unless you have very specific needs that the standardized connection protocols cannot provide. In Figure 4-1, we can see that HTTP and HTTPS are application layer protocol running over a transport protocol such as TCP:
With HTTP we are sending over a body, header etc. With HTTPS there are even more steps, where a certificate is checked and sent over to the client before the client starts sending over data. This is because the data needs to be encrypted. Considering all the back and forth in these protocols and waiting for responses, networking requests are a sensible target for async. We cannot get rid of the steps in networking without losing security and assurance that the connection is made. However, we can release the CPU from networking requests when waiting for responses with async.
For our connector, we are going to support HTTP and HTTPS so we are going to need the following enum:
enum
CustomStream
{
Plain
(
Async
<
TcpStream
>
),
Tls
(
TlsStream
<
Async
<
TcpStream
>>
),
}
The Plain variant is an async TCP stream. Considering Figure 4-1, we can deduce that the Plain variant supports HTTP requests. With the Tls variant we remember that HTTPS is merely a TLS layer between the TCP and the HTTP means that our Tls variant supports HTTPS.
We can now use this custom steam enum to implement the hyper Service trait for a custom connector strut with the code below:
#[derive(Clone)]
struct
CustomConnector
;
impl
hyper
::
service
::
Service
<
Uri
>
for
CustomConnector
{
type
Response
=
CustomStream
;
type
Error
=
Error
;
type
Future
=
Pin
<
Box
<
dyn
Future
<
Output
=
Result
<
Self
::
Response
,
Self
::
Error
>>
+
Send
>>
;
fn
poll_ready
(
&
mut
self
,
_
:
&
mut
Context
<'
_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
.
.
.
}
fn
call
(
&
mut
self
,
uri
:
Uri
)
->
Self
::
Future
{
.
.
.
}
}
The Service trait is essentially for defining the future for the connection. We can see our connection is a thread safe future that returns our stream enum. This enum is either an async TCP connection, or an async TCP connection that is wrapped in a TLS stream.
We can also see that our poll_ready
function just returns a ready. The poll_ready
function is used by Hyper to check if a service is ready to process requests. If we return a pending, then the task will be polled until the service becomes ready. We return an error when the service can no longer process requests. Because we are using the Service
trait for a client call, we will always return ready for the poll_ready
. If we were implementing the Service
trait for a server, we could have the following poll_ready
function:
fn
poll_ready
(
&
mut
self
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
Result
<
(),
Error
>>
{
Poll
::
Ready
(
Ok
(()))
}
We can see that our poll_ready
function returns that the future is ready. We could ideally not bother defining the poll_ready
function as our implementation makes calling it redundant. However, the poll_ready
function is a requirement for the Service
trait.
We can now move onto the response function which is the call
function. The poll_ready
function needs to return an Ok before we can use the call
function. Our call
function has the following outline:
fn
call
(
&
mut
self
,
uri
:
Uri
)
->
Self
::
Future
{
Box
::
pin
(
async
move
{
let
host
=
uri
.
host
().
context
(
"cannot parse host"
)
?
;
match
uri
.
scheme_str
()
{
Some
(
"http"
)
=>
{
.
.
.
}
Some
(
"https"
)
=>
{
.
.
.
}
scheme
=>
bail
!
(
"unsupported scheme: {:?}"
,
scheme
),
}
})
}
We remember the pin and the async block returns a future. So, whatever the return statement is of the async block will be our pinned future. For our HTTPS block, we have build a future with the code below:
let
socket_addr
=
{
let
host
=
host
.
to_string
();
let
port
=
uri
.
port_u16
().
unwrap_or
(
443
);
smol
::
unblock
(
move
||
(
host
.
as_str
(),
port
).
to_socket_addrs
())
.
await
?
.
next
()
.
context
(
"cannot resolve address"
)
?
};
let
stream
=
Async
::
<
TcpStream
>
::
connect
(
socket_addr
).
await
?
;
let
stream
=
async_native_tls
::
connect
(
host
,
stream
).
await
?
;
Ok
(
CustomStream
::
Tls
(
stream
))
The port is 443 because this is the standard port for HTTPS. We then pass a closure into the unblock function. The closure returns the socket address. The unblock function runs blocking code on a threadpool so we can have the async interface on blocking code. So while we are resolving the socket address, we can free up the thread to do something else. We then connect our TCP steam, and then connect our steam to our native TLS. Once our connection is achieved, we finally return the CustomStream enum.
When it comes to building our HTTP code it is exactly the same, however, the port is 80 instead of 443, and the TLS connection is not required resulting in us returning a Ok(CustomStream::Plain(stream))
.
Our call
function is now defined. However, if we try to make a HTTPS call to a website with our stream enum or connection struct at this point, we will get an error message stating that we have not implemented the AsyncRead
and AsyncWrite
Tokio
traits for our steam trait. This is because hyper requires these traits to be implemented in order for our connection enum to be used.
Implementing The Tokio AsyncRead Trait
The AsyncRead
trait is similar to the std::io::Read
trait but integrates with asynchronous tasks systems. When implementing our AsyncRead
trait, we only have to define the poll_read
function which returns a poll enum as a result. If we return a Poll::Ready
we are saying that the data was immediately read and placed into the output buffer. If we return a Poll::Pending
, we are saying that no data was read into the buffer that we provided. We are also saying that the I/O object is not currently readable, but may become readable in the future. The return og Pending
results in the current future’s task being scheduled to be unparked when the object is readable. The final Poll
enum variant that we can return is a Ready
but with an error which would usually be standard I/O errors.
Our implementation of our AsyncRead
trait is defined in the code below:
impl
tokio
::
io
::
AsyncRead
for
CustomStream
{
fn
poll_read
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
,
buf
:
&
mut
tokio
::
io
::
ReadBuf
<'
_
>
,
)
->
Poll
<
io
::
Result
<
()
>>
{
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
{
Pin
::
new
(
s
)
.
poll_read
(
cx
,
buf
.
initialize_unfilled
())
.
map_ok
(
|
size
|
{
buf
.
advance
(
size
);
})
}
CustomStream
::
Tls
(
s
)
=>
{
Pin
::
new
(
s
)
.
poll_read
(
cx
,
buf
.
initialize_unfilled
())
.
map_ok
(
|
size
|
{
buf
.
advance
(
size
);
})
}
}
}
}
Our different streams essentially have the same treatment, we just pass in either the async TCP stream or an async TCP stream with TLS. We then pin this stream and execute the stream’s poll_read
function which essentially performs a read and returns a Poll
enum with the size of how much the buffer grew due to the read. Once the poll_read
is done, we then execute the map_ok
which takes in an FnOnce(T)
which is either a function or a closure that can only be called once.
Note
In the context of map_ok
, the closure’s purpose is to advance the buffer by the size returned from the poll_read
. This is a one-time operation for each read, and hence FnOnce
is sufficient and preferred. If the closure needed to be called multiple times, Fn
or FnMut
would be required. By using FnOnce
, we ensure that the closure can take ownership of the environment it captures, providing flexibility for what the closure can do. This is particularly useful in async programming, where ownership and lifetimes must be carefully managed.
The map_ok
also references itself which is the result from the poll_read
. If the Poll
result is ready but with an error, then the Poll
ready with the error is returned. If the Poll
result is Pending
then pending is returned. We can see that we pass in the context into the poll_read
so a waker is utilized if we have a pending result. If we have a Ready
with a Ok
result, then the closure is called with the result from the poll_read
Ready
Ok
is returned from the map_ok
function. Our closure passed into our map_ok
function advances the buffer.
There is a lot going on under the hood, but essentially, our stream is pinned, a read is performed on the pinned stream, and if the read is successful, we advance the size of the filled region of the buffer as the read data is now in the buffer. The polling in the poll_read
, and the matching of the poll enum in the map_ok
, enable this read process to be compatible with an async runtime.
So, we can now read into our buffer in an async manner but we also need to write in an async way for our HTTP request to be complete.
Implementing The Tokio AsyncWrite Trait
The AsyncWrite
trait is a trait that is similar to the std::io::Write
but interacts with asynchronous task systems. It write bytes in an asynchronous manner, and like the AsyncRead
we just implemented, comes from Tokio.
When implementing the AsyncWrite trait, we will need the following outline:
impl
tokio
::
io
::
AsyncWrite
for
CustomStream
{
fn
poll_write
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
,
buf
:
&
[
u8
],
)
->
Poll
<
io
::
Result
<
usize
>>
{
.
.
.
}
fn
poll_flush
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
io
::
Result
<
()
>>
{
.
.
.
}
fn
poll_shutdown
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
io
::
Result
<
()
>>
{
.
.
.
}
}
The poll_write
function should not be a surprise, however, we can also note that we have poll_flush
and poll_shutdown
functions. We can see that all functions return a variant of the Poll
enum and accept the context. Therefore, we can deduce that all functions are able to put the task to sleep to be woken again to check if the future is ready for shutting down, flushing, and writing to the connection.
We should start with our poll_write
function which has the code body below:
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
Pin
::
new
(
s
).
poll_write
(
cx
,
buf
),
CustomStream
::
Tls
(
s
)
=>
Pin
::
new
(
s
).
poll_write
(
cx
,
buf
),
}
Here we can see that we are matching the stream, pinning the stream, and executing the poll_write
function of the stream. At this point in the chapter it should not come as a surprise that the poll_write
function tries to write bytes from the buffer into an object. Like the read, if the write is successful, the number of bytes written is returned. If the object is not ready for writing, we will get a Pending
, and if we get a 0, then this usually means that the object is no longer able to accept bytes.
Inside the poll_write
function of the stream, a loop is executed where the mutable reference of the I/O handler is obtained. The loop then repeatedly tries to write to the underlying I/O until all the bytes from the buffer are written. Each write attempt has a result which is handled. If the error of the write is a io::ErrorKind::WouldBlock
, this means that the write could not complete immediately and the loop repeats until the write is complete. If the result is any other error, the loop waits for the resource to be available again by returning a pending for the future to be polled again at a later time.
Now that we have written the poll_write
, we can define the body of the poll_flush
function with the following code:
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
Pin
::
new
(
s
).
poll_flush
(
cx
),
CustomStream
::
Tls
(
s
)
=>
Pin
::
new
(
s
).
poll_flush
(
cx
),
}
This has the same outline as our poll_write function. However, in this case we call the poll_flush function on our stream. A flush is like a write except we ensure that all the contents of the buffer immediately reaches the destination. The underlying mechanism of the flush is exactly the same as the write with the loop, however, the flush function will be called in the loop as opposed to the write function.
We can now move onto our final function which is the shutdown function which takes the form below:
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
{
s
.
get_ref
().
shutdown
(
Shutdown
::
Write
)
?
;
Poll
::
Ready
(
Ok
(()))
}
CustomStream
::
Tls
(
s
)
=>
Pin
::
new
(
s
).
poll_close
(
cx
),
}
We can see that there is a slight variation in the way we implement the different types of our custom stream. The plain stream is just shut down directly. Once the plain stream is shut down we return a Poll
that is ready. However, the TLS stream is an async implementation by itself. Because the TLS is async, we need to pin it to avoid it being moved in memory as it could be put on the task queue a number of times until the poll is complete, and we call the poll_close
function which will return a poll result by itself.
We have now implemented our async read and write traits for our hyper client. All we need to do now is connect and run HTTP requests to test our implementation.
Connecting And Running Our Client
At this stage we are wrapping up what we have done and testing it. We can create our connection and send request function with the code below:
impl
hyper
::
client
::
connect
::
Connection
for
CustomStream
{
fn
connected
(
&
self
)
->
hyper
::
client
::
connect
::
Connected
{
hyper
::
client
::
connect
::
Connected
::
new
()
}
}
async
fn
fetch
(
req
:
Request
<
Body
>
)
->
Result
<
Response
<
Body
>>
{
Ok
(
Client
::
builder
()
.
executor
(
CustomExecutor
)
.
build
::
<
_
,
Body
>
(
CustomConnector
)
.
request
(
req
)
.
await
?
)
}
Now all we need to do is run our HTTP client on our async runtime in the main function with the following code:
fn
main
()
{
Runtime
::
new
().
with_low_num
(
2
).
with_high_num
(
4
).
run
();
let
future
=
async
{
let
req
=
Request
::
get
(
"https://www.rust-lang.org"
)
.
body
(
Body
::
empty
())
.
unwrap
();
let
response
=
fetch
(
req
).
await
.
unwrap
();
let
body_bytes
=
hyper
::
body
::
to_bytes
(
response
.
into_body
())
.
await
.
unwrap
();
let
html
=
String
::
from_utf8
(
body_bytes
.
to_vec
()).
unwrap
();
println!
(
"{}"
,
html
);
};
let
test
=
spawn_task
!
(
future
);
let
_outcome
=
future
::
block_on
(
test
);
}
And here we have it, we can run our code and get the HTML code from the Rust website. We can now say that our async runtime can communicate to the internet in an async manner, but what about accepting requests? We have already covered implementing traits from other crates to get an async implementation. We would get diminishing educational returns if we filled the rest of this chapter with implementing such traits to get a TCP listener running in an async manner. Instead, we are going to go one step lower, and directly listen to events in sockets with the Mio crate.
Introduction To Mio
When it comes to implementing async functionality with sockets, we cannot really get any lower than Mio without directly calling the OS. Mio (Metal I/O) is a low-level, non-blocking I/O library in Rust that provides the building blocks for building high-performance async applications. It acts as a thin abstraction over the OS’s asynchronous I/O capabilities.
Mio is so crucial because it serves as a foundation to other higher level async runtimes, including Tokio. These higher-level libraries abstract of the complexity away to make it easier to work with. Mio is useful for developers who need fine-grained control over their I/O operations and want to optimise for performance. In Figure 4-2, you will see how Tokio is built on Mio.
In the previous parts of the chapter we connected Hyper to our runtime. In order to get the full picture, we are now going to explore Mio and integrate this in our runtime. Before we proceed, we need the following dependency in the Cargo.toml
:
mio
=
{
version
=
"1.0.2"
,
features
=
[
"net"
,
"os-poll"
]}
We also need the imports below:
use
mio
::
net
::{
TcpListener
,
TcpStream
};
use
mio
::{
Events
,
Interest
,
Poll
as
MioPoll
,
Token
};
use
std
::
io
::{
Read
,
Write
};
use
std
::
time
::
Duration
;
use
std
::
error
::
Error
;
use
std
::
future
::
Future
;
use
std
::
pin
::
Pin
;
use
std
::
task
::{
Context
,
Poll
};
Warning
It must be stressed that our exploration of Mio in this chapter is not an optimal approach to creating a TCP server. If you want to create a production server, you will need to take an approach similar to the hyper example below:
#[tokio::main]
async
fn
main
()
->
Result
<
(),
Box
<
dyn
std
::
error
::
Error
+
Send
+
Sync
>>
{
let
addr
=
SocketAddr
::
from
(([
127
,
0
,
0
,
1
],
3000
));
let
listener
=
TcpListener
::
bind
(
addr
).
await
?
;
loop
{
let
(
stream
,
_
)
=
listener
.
accept
().
await
?
;
let
io
=
TokioIo
::
new
(
stream
);
tokio
::
task
::
spawn
(
async
move
{
if
let
Err
(
err
)
=
http1
::
Builder
::
new
()
.
serve_connection
(
io
,
service_fn
(
hello
))
.
await
{
println!
(
"Error serving connection: {:?}"
,
err
);
}
});
}
}
Here we can see that the main thread is waiting for incoming data, and when incoming data arrives, a new task is spawned to handle that data. This keeps the listener ready to accept more incoming data. Whilst our Mio examples will get us to understand how polling TCP connections work, it is most sensible to use the listener that the framework or library gives you when building a web application. We will discuess some web concepts to give context to our example, but a comprehensive overview of web development is beyond the scope of this book.
Now that we have laid all the groundwork, we can move onto polling TCP sockets in futures.
Polling Sockets in Futures
Mio is built for handling many different sockets (thousands). Therefore we need to identify which socket triggered the notification. Tokens enable us to do this. When we register a socket with the event loop, we pass it a token and that token is returned in the handler. The token is a struct tuple around usize
. This is because every OS allows a pointer amount of data to be associated with a socket. So in the handler we can have a mapping function where the token is the key and we map it with a socket.
Mio is not using callbacks here because we want a zero cost abstraction and tokens were the only way of doing that. We can build callbacks, streams, and futures on top of Mio.
With tokens we now have the following steps:
-
register sockets with event loop
-
wait for socket readiness
-
lookup socket state using token
-
operate on the socket
-
repeat
In our example we are just going to define our tokens with the code below as our example is simple negating the need for mapping:
const
SERVER
:
Token
=
Token
(
0
);
const
CLIENT
:
Token
=
Token
(
1
);
Here, we just have to ensure that our tokens are unique. The integer passed into the Token
is used to differentiate the Token
from other tokens. Now that we have our tokens, we define the future that is going to poll the socket using the following struct:
struct
ServerFuture
{
server
:
TcpListener
,
poll
:
MioPoll
,
}
impl
Future
for
ServerFuture
{
type
Output
=
String
;
fn
poll
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
Self
::
Output
>
{
.
.
.
}
}
Here we are using the TcpListener
to accept incoming data, and the MioPoll
to poll the socket and tell the future when the socket is readable. Inside our future poll function we can define the events and poll the socket with the code below:
let
mut
events
=
Events
::
with_capacity
(
1
);
let
_
=
self
.
poll
.
poll
(
&
mut
events
,
Some
(
Duration
::
from_millis
(
200
))
).
unwrap
();
for
event
in
events
.
iter
()
{
.
.
.
}
cx
.
waker
().
wake_by_ref
();
return
Poll
::
Pending
The poll will extract the events from the socket into the events iterator. We also set the socket poll to timeout after 200 milliseconds. This means that if there are no events in the socket, we proceed without any events, and return a pending. This means that we will continue polling until we get an event.
When we do get events, we loop through them. In the preceding code we have set the capacity to one but we can increase the capacity to handle multiple events if needed. When processing an event, we need to clarify what type of event it is. For our future, we need to ensure that the socket is readable, and that the token is the SERVER token with the following code:
if
event
.
token
()
==
SERVER
&&
event
.
is_readable
()
{
let
(
mut
stream
,
_
)
=
self
.
server
.
accept
().
unwrap
();
let
mut
buffer
=
[
0
u8
;
1024
];
let
mut
received_data
=
Vec
::
new
();
loop
{
.
.
.
}
if
!
received_data
.
is_empty
()
{
let
received_str
=
String
::
from_utf8_lossy
(
&
received_data
);
return
Poll
::
Ready
(
received_str
.
to_string
())
}
cx
.
waker
().
wake_by_ref
();
return
Poll
::
Pending
}
The event is readable if there is data in the socket. If our event is the right event, we extract the TcpStream
, define a data_recieved
collection on the heap with the Vec
, using the buffer slice to perform the reads. If the data is empty, we return a Pending so we can poll the socket again if the data is not there. We then convert it to a string and return that string with a Ready
. This means that our socket listener is finished once we have the data.
Note
If we wanted our socket to be continuously polled throughout the lifetime of our program, we would spawn a detached task where we pass the data into an async function to handle the data as seen before.
if
!
received_data
.
is_empty
()
{
spawn_task
!
(
some_async_handle_function
(
&
received_data
)).
detach
();
return
Poll
::
Pending
}
In our loop, we read the data from the socket with the following code:
loop
{
match
stream
.
read
(
&
mut
buffer
)
{
Ok
(
n
)
if
n
>
0
=>
{
received_data
.
extend_from_slice
(
&
buffer
[
..
n
]);
}
Ok
(
_
)
=>
{
break
;
}
Err
(
e
)
=>
{
eprintln!
(
"Error reading from stream: {}"
,
e
);
break
;
}
}
}
It does not matter if the received message is bigger than the buffer, our loop will continue to extract all the bytes to be processed, adding them onto our Vec
. If there are no more bytes, we can stop our loop to process the data.
We now have a future that will continue to be polled until it accepts data from a socket. Once it has received the data it will then terminate. We can also make this future continuously poll the socket. Considering this, there is an argument that we would use this continuous polling future to keep track of thousands of different sockets if needed. We would have one socket per future and spawn thousands of futures into our runtime. Now that we have our TcpListener
logic defined, we can move onto our client logic to send data over the socket to our future.
Sending Data Over Socket
For our client, we are going to run everything in the main function which have the signature below:
fn
main
()
->
Result
<
(),
Box
<
dyn
Error
>>
{
Runtime
::
new
().
with_low_num
(
2
).
with_high_num
(
4
).
run
();
.
.
.
Ok
(())
}
In our main, we initially create our listener and our stream for the client with the following code:
let
addr
=
"127.0.0.1:13265"
.
parse
()
?
;
let
mut
server
=
TcpListener
::
bind
(
addr
)
?
;
let
mut
stream
=
TcpStream
::
connect
(
server
.
local_addr
()
?
)
?
;
For our example we are just requiring one stream however, we can create multiple streams if we need them. We then register our server with a Mio poll and use the server and poll to spawn the listener task with the following code:
let
poll
:
MioPoll
=
MioPoll
::
new
()
?
;
poll
.
registry
()
.
register
(
&
mut
server
,
SERVER
,
Interest
::
READABLE
)
?
;
let
server_worker
=
ServerFuture
{
server
,
poll
,
};
let
test
=
spawn_task
!
(
server_worker
);
Now our task is continuously polling the TCP port for incoming events. We then create another poll with the CLIENT token for writeable events. This means that if the socket is not full, we can write to it. If the socket is full, the socket is no longer writable and needs to be flushed. Our client poll takes the following form:
let
mut
client_poll
:
MioPoll
=
MioPoll
::
new
()
?
;
client_poll
.
registry
()
.
register
(
&
mut
stream
,
CLIENT
,
Interest
::
WRITABLE
)
?
;
Note
With Mio we can also create polls that can trigger if the Socket is readable or writable with the example below:
.
register
(
&
mut
server
,
SERVER
,
Interest
::
READABLE
|
Interest
::
WRITABLE
)
?
;
Now that we have created our poll, we can wait for the socket to become writable before writing to it. We do this with the poll call below:
let
mut
events
=
Events
::
with_capacity
(
128
);
let
_
=
client_poll
.
poll
(
&
mut
events
,
None
).
unwrap
();
It must be noted that there is a None for the timeout. This means that our current thread will be blocked until an event is yielded by the poll call. Once we have the event, we send a simple message to the socket with the following code:
for
event
in
events
.
iter
()
{
if
event
.
token
()
==
CLIENT
&&
event
.
is_writable
()
{
let
message
=
"that's so dingo!
\n
"
;
let
_
=
stream
.
write_all
(
message
.
as_bytes
());
}
}
The message is sent, and therefore, we can block our thread and then print out the message with the code below:
let
outcome
=
future
::
block_on
(
test
);
println!
(
"outcome: {}"
,
outcome
);
When running the code, you might get the following printout:
Error
reading
from
stream
:
Resource
temporarily
unavailable
(
os
error
35
)
outcome
:
that
'
s
so
dingo
!
It works but we get the initial error. This can be a result of non-blocking TCP listeners. Mio is non-blocking. The “Resource temporarily unavailable” error is usually down to no data being available in the socket. This can happen when the TCP stream is created but it is not a problem as we handle these errors in our loop and we returning, a Pending
so the socket continues to be polled as seen below:
use
std
::
io
::
ErrorKind
;
.
.
.
Err
(
ref
e
)
if
e
.
kind
()
==
ErrorKind
::
WouldBlock
=>
{
waker
.
cx
.
waker
().
wake_by_ref
();
return
Poll
::
Pending
;
}
Note
With Mio’s polling feature we have essentially implemented async communication through a TCP socket. We can also use Mio to send data between processes via a UnixDatagram
. UnixDatagrams are sockets that are restricted to sending data on the same machine. Because of this, UnixDatagrams are faster, require less context switching, and UnixDatagrams do not have to go through the network stack.
Summary
We have finally managed to get our async runtime to do something apart from sleep and print. In this chapter we have explored how traits can help us integrate third party crates into our runtime, and we have gone lower to poll TCP sockets using Mio. When it comes to getting a custom async runtime running there is not anything else standing in your way as long as you have access to trait documentation. If you really have to get a firm grip of your async knowledge so far, you are in the position to create a basic web server that handles different endpoints. It would be difficult to implement all your communication in Mio but using it just for async programming is much easier. Hyper’s HttpListener
will cover the protocol complexity so you can focus on how to pass the requests as async tasks, and the response to the client. For our journey in this book, we are exploring async programming as opposed to web programming. Therefore, we are going to move onto how we implement async programming to solve specific problems. We start in the next chapter with coroutines.
Get Async Rust now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.