Multi-threaded HTTP/WebSocket server in Rust
Building up on my previous posts about MIO-based server and parser combinators, this post is about making a very simple HTTP server capable of running on multiple threads and implementing WebSocket protocol.
TL;DR: code.
Benchmark
Quick benchmark with wrk
on 8 vCPUs, 30 GB
machine shows 110k rps vs 280 rps
when distributing socket reading/writing over 8 threads. Important note: this
benchmark is not representative on its own, just the comparison of two allows to
notice 2.5x speedup. Amdahl’s Law
in action: the main thread is still responsible for listening for incoming
connections and registering socket events.
Single-threaded
instance-1:~/mio-tcp-server$ wrk -d 1m -c 128 -t 8 http://127.0.0.1:8080/
Running 1m test @ http://127.0.0.1:8080/
8 threads and 128 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.15ms 131.93us 2.65ms 68.80%
Req/Sec 13.91k 0.86k 19.76k 66.96%
6645523 requests in 1.00m, 557.71MB read
Requests/sec: 110731.94
Transfer/sec: 9.29MB
Multi-threaded (8 cores, 8 threads)
instance-1:~/mio-websocket-server$ wrk -d 1m -c 128 -t 8 http://127.0.0.1:9000/
Running 1m test @ http://127.0.0.1:9000/
8 threads and 128 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 479.65us 554.02us 28.64ms 96.33%
Req/Sec 35.09k 2.01k 59.50k 72.71%
16765024 requests in 1.00m, 1.37GB read
Requests/sec: 279225.41
Transfer/sec: 23.43MB
Key Concepts
Handler
A container for token, actual socket and send/receive buffers. A ByteStream
from
parser-combinators
is useful for stateless parsing (receive stream) and buffered sending (send stream).
Handle wraps a socket provided from listener as a connection, and has pull()
to read from
socket into receive stream, push()
to write data from send stream to the socket, and put()
to store data for buffering into the send stream.
struct Handler {
token: Token,
socket: TcpStream,
is_open: bool,
recv_stream: ByteStream,
send_stream: ByteStream,
}
Worker Thread
Worker Thread receives events (handlers) from the main thread and runs the “payload”. Then returns handler (most likely in updated state) back to the main thread. All IO events on connected socket are actually happening on worker threads.
loop {
let mut handler = event_rx.lock().unwrap().recv().unwrap();
debug!("token {} background thread", handler.token.0);
handler.pull();
// do something useful here
handler.push();
ready_tx.send(handler).unwrap();
}
The payload (“something useful” part) might be actually parsing the receive buffer:
fn handle(req: Request) -> Response { ... }
if let Some(req) = parse_http_request(&mut handler.recv_stream) {
handler.recv_stream.pull(); // roll over the receive stream
let res = handle(req); // handle the request - get response
handler.put(res); // put response into send stream
}
Listener Thread
The Main Thread owns server socket that receives connections and also Poll
instance, that
allows getting and processing socket events. Once read/write event for specific handler was
received, it is time to send the handler to worker thread for processing.
Meanwhile, handlers that are returning from worker threads need re-registering for next socket events. So next thing to do for a Listener Thread is to re-register handlers for respective socket events: if a handler has non-empty send stream, it needs to receive writable event; and if not the assumption is that it is ready to read some more data.
loop {
poll.poll(&mut events, Some(Duration::from_millis(20))).unwrap();
// 1. process socket events
for event in &events {
match event.token() {
Token(0) => {
loop {
match listener.accept() {
Ok((socket, _)) => {
// accept connection, create Handler
},
Err(_) => break
}
}
},
token if event.readiness().is_readable() => {
debug!("token {} readable", token.0);
if let Some(handler) = handlers.remove(&token) {
event_tx.send(handler).unwrap();
}
},
token if event.readiness().is_writable() => {
debug!("token {} writable", token.0);
if let Some(handler) = handlers.remove(&token) {
event_tx.send(handler).unwrap();
}
},
_ => unreachable!()
}
}
// 2. process updates received from handlers
loop {
let opt = ready_rx.try_recv();
match opt {
Ok(handler) if !handler.is_open => {
// socket is closed, drop the handler
},
Ok(handler) => {
if handler.send_stream.len() > 0 {
// register handler for writing
} else {
// register handler for reading
}
handlers.insert(handler.token, handler);
},
_ => break,
}
}
}
HTTP to WebSocket
WebSocket Upgrade request is just a regular HTTP request, but it needs some special processing, like calculating ‘Sec-Websocket-Accept’ response header based on ‘Sec-Websocket-Key’ request header below:
fn res_sec_websocket_accept(req_sec_websocket_key: &String) -> String {
let mut hasher = Sha1::new();
hasher.input(req_sec_websocket_key.to_owned() + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
base64::encode(hasher.result())
}
For more details on WebSocket: see nice guide on MDN and rust-parser-combinators. I must admit parsing binary WebSocket frames is as straightforward as parsing text-based HTTP requests, so parser-combinators seem to do well.
It Works!
Finally, putting all pieces together allows connecting to the server from a browser:
Plans
- The more I’m moving towards fundamental constructs like sockets and threads, the more code around it seems like Actor Model. So I have already been doing-some-actors for some time.
- With clean and simple Actor Model implementation and HTTP/WebSocket protocol parser, the classic demo would be to build… a chat application! This is what is coming next, most likely.
- Actor from the Actor Model seems to be way too low-level for direct usage in application-level code.
- Somehow many people don’t feel wrong writing
class User extend Actor
(thus coupling domain-model entity with specific Actor Model implementation) - for me it seems the same as writingclass User extends Mutex
. Just my opinion. - Thus nice and clean (and preferably type-safe) API on top of that might be extremely useful! Something similar to Akka Streams maybe.
- Somehow many people don’t feel wrong writing