From 9ba72ce3fdc80a49101d594b0510b7fbed079371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 30 Jan 2022 15:55:35 +0100 Subject: [PATCH 1/2] Upgrade itoa to version 1 --- Cargo.lock | 4 ++-- aquatic_http/Cargo.toml | 2 +- aquatic_http/src/workers/socket.rs | 2 +- aquatic_http_protocol/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c29959f..1326442 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,7 +97,7 @@ dependencies = [ "futures-lite", "futures-rustls", "glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5)", - "itoa 0.4.8", + "itoa 1.0.1", "log", "memchr", "mimalloc", @@ -146,7 +146,7 @@ dependencies = [ "hashbrown 0.11.2", "hex", "httparse", - "itoa 0.4.8", + "itoa 1.0.1", "log", "memchr", "quickcheck", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 5d1df0b..67ac669 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -26,7 +26,7 @@ either = "1" futures-lite = "1" futures-rustls = "0.22" glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } -itoa = "0.4" +itoa = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } memchr = "2" diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index a3fb70b..60a4dd4 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -416,7 +416,7 @@ impl Connection { let mut response_bytes = Vec::with_capacity(39 + content_len_num_digits + body.len()); response_bytes.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: "); - ::itoa::write(&mut response_bytes, content_len)?; + response_bytes.extend_from_slice(::itoa::Buffer::new().format(content_len).as_bytes()); response_bytes.extend_from_slice(b"\r\n\r\n"); response_bytes.append(&mut body); response_bytes.extend_from_slice(b"\r\n"); diff --git a/aquatic_http_protocol/Cargo.toml b/aquatic_http_protocol/Cargo.toml index 718646a..6934321 100644 --- a/aquatic_http_protocol/Cargo.toml +++ b/aquatic_http_protocol/Cargo.toml @@ -26,7 +26,7 @@ anyhow = "1" hashbrown = "0.11" hex = { version = "0.4", default-features = false } httparse = "1" -itoa = "0.4" +itoa = "1" log = "0.4" memchr = "2" rand = { version = "0.8", features = ["small_rng"] } From 84d57c1c86e9a1b45ed0e9bffa80333a1dc8d3fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 30 Jan 2022 17:13:00 +0100 Subject: [PATCH 2/2] http: make response sending more efficient by avoiding some copies --- Cargo.lock | 1 + aquatic_http/Cargo.toml | 1 + aquatic_http/src/common.rs | 32 ---------------- aquatic_http/src/workers/socket.rs | 61 ++++++++++++++++++++++++------ 4 files changed, 51 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1326442..555cbb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,7 @@ dependencies = [ "log", "memchr", "mimalloc", + "once_cell", "parking_lot", "privdrop", "quickcheck", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 67ac669..2052f00 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -32,6 +32,7 @@ mimalloc = { version = "0.1", default-features = false } memchr = "2" parking_lot = "0.11" privdrop = "0.5" +once_cell = "1" rand = { version = "0.8", features = ["small_rng"] } rustls-pemfile = "0.2" serde = { version = "1", features = ["derive"] } diff --git a/aquatic_http/src/common.rs b/aquatic_http/src/common.rs index ab499ed..5aea0dd 100644 --- a/aquatic_http/src/common.rs +++ b/aquatic_http/src/common.rs @@ -222,35 +222,3 @@ impl TorrentMaps { pub struct State { pub access_list: Arc, } - -pub fn num_digits_in_usize(mut number: usize) -> usize { - let mut num_digits = 1usize; - - while number >= 10 { - num_digits += 1; - - number /= 10; - } - - num_digits -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_num_digits_in_usize() { - let f = num_digits_in_usize; - - assert_eq!(f(0), 1); - assert_eq!(f(1), 1); - assert_eq!(f(9), 1); - assert_eq!(f(10), 2); - assert_eq!(f(11), 2); - assert_eq!(f(99), 2); - assert_eq!(f(100), 3); - assert_eq!(f(101), 3); - assert_eq!(f(1000), 4); - } -} diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 60a4dd4..31c764b 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -22,6 +22,7 @@ use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; use glommio::timer::TimerActionRepeat; use glommio::{enclose, prelude::*}; +use once_cell::sync::Lazy; use slab::Slab; use crate::common::*; @@ -29,6 +30,14 @@ use crate::config::Config; const INTERMEDIATE_BUFFER_SIZE: usize = 1024; const MAX_REQUEST_SIZE: usize = 2048; +const MAX_RESPONSE_SIZE: usize = 4096; + +const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: "; +const RESPONSE_HEADER_B: &[u8] = b" "; +const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n"; + +static RESPONSE_HEADER: Lazy> = + Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat()); struct PendingScrapeResponse { pending_worker_responses: usize, @@ -166,6 +175,7 @@ struct Connection { connection_id: ConnectionId, request_buffer: [u8; MAX_REQUEST_SIZE], request_buffer_position: usize, + response_buffer: [u8; MAX_RESPONSE_SIZE], } impl Connection { @@ -186,6 +196,10 @@ impl Connection { let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; + let mut response_buffer = [0; MAX_RESPONSE_SIZE]; + + response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER); + let mut conn = Connection { config: config.clone(), access_list_cache: create_access_list_cache(&access_list), @@ -195,8 +209,9 @@ impl Connection { stream, peer_addr, connection_id, - request_buffer: [0u8; MAX_REQUEST_SIZE], + request_buffer: [0; MAX_REQUEST_SIZE], request_buffer_position: 0, + response_buffer, }; conn.run_request_response_loop().await?; @@ -406,22 +421,44 @@ impl Connection { } async fn write_response(&mut self, response: &Response) -> anyhow::Result<()> { - let mut body = Vec::new(); + // Write body and final newline to response buffer - response.write(&mut body).unwrap(); + let mut position = RESPONSE_HEADER.len(); - let content_len = body.len() + 2; // 2 is for newlines at end - let content_len_num_digits = num_digits_in_usize(content_len); + let body_len = response.write(&mut &mut self.response_buffer[position..])?; - let mut response_bytes = Vec::with_capacity(39 + content_len_num_digits + body.len()); + position += body_len; - response_bytes.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: "); - response_bytes.extend_from_slice(::itoa::Buffer::new().format(content_len).as_bytes()); - response_bytes.extend_from_slice(b"\r\n\r\n"); - response_bytes.append(&mut body); - response_bytes.extend_from_slice(b"\r\n"); + (&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n"); - self.stream.write(&response_bytes[..]).await?; + position += 2; + + let content_len = body_len + 2; + + // Clear content-len header value + + { + let start = RESPONSE_HEADER_A.len(); + let end = start + RESPONSE_HEADER_B.len(); + + (&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B); + } + + // Set content-len header value + + { + let mut buf = ::itoa::Buffer::new(); + let content_len_bytes = buf.format(content_len).as_bytes(); + + let start = RESPONSE_HEADER_A.len(); + let end = start + content_len_bytes.len(); + + (&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes); + } + + // Write buffer to stream + + self.stream.write(&self.response_buffer[..position]).await?; self.stream.flush().await?; Ok(())