mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 18:25:30 +00:00
Merge pull request #45 from greatest-ape/work-2022-01-30
Improve http response sending efficiency; update dependencies
This commit is contained in:
commit
0ee2e2d2ae
5 changed files with 55 additions and 48 deletions
5
Cargo.lock
generated
5
Cargo.lock
generated
|
|
@ -97,10 +97,11 @@ dependencies = [
|
||||||
"futures-lite",
|
"futures-lite",
|
||||||
"futures-rustls",
|
"futures-rustls",
|
||||||
"glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5)",
|
"glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5)",
|
||||||
"itoa 0.4.8",
|
"itoa 1.0.1",
|
||||||
"log",
|
"log",
|
||||||
"memchr",
|
"memchr",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
|
"once_cell",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"privdrop",
|
"privdrop",
|
||||||
"quickcheck",
|
"quickcheck",
|
||||||
|
|
@ -146,7 +147,7 @@ dependencies = [
|
||||||
"hashbrown 0.11.2",
|
"hashbrown 0.11.2",
|
||||||
"hex",
|
"hex",
|
||||||
"httparse",
|
"httparse",
|
||||||
"itoa 0.4.8",
|
"itoa 1.0.1",
|
||||||
"log",
|
"log",
|
||||||
"memchr",
|
"memchr",
|
||||||
"quickcheck",
|
"quickcheck",
|
||||||
|
|
|
||||||
|
|
@ -26,12 +26,13 @@ either = "1"
|
||||||
futures-lite = "1"
|
futures-lite = "1"
|
||||||
futures-rustls = "0.22"
|
futures-rustls = "0.22"
|
||||||
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" }
|
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" }
|
||||||
itoa = "0.4"
|
itoa = "1"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
mimalloc = { version = "0.1", default-features = false }
|
mimalloc = { version = "0.1", default-features = false }
|
||||||
memchr = "2"
|
memchr = "2"
|
||||||
parking_lot = "0.11"
|
parking_lot = "0.11"
|
||||||
privdrop = "0.5"
|
privdrop = "0.5"
|
||||||
|
once_cell = "1"
|
||||||
rand = { version = "0.8", features = ["small_rng"] }
|
rand = { version = "0.8", features = ["small_rng"] }
|
||||||
rustls-pemfile = "0.2"
|
rustls-pemfile = "0.2"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
|
|
||||||
|
|
@ -222,35 +222,3 @@ impl TorrentMaps {
|
||||||
pub struct State {
|
pub struct State {
|
||||||
pub access_list: Arc<AccessListArcSwap>,
|
pub access_list: Arc<AccessListArcSwap>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_digits_in_usize(mut number: usize) -> usize {
|
|
||||||
let mut num_digits = 1usize;
|
|
||||||
|
|
||||||
while number >= 10 {
|
|
||||||
num_digits += 1;
|
|
||||||
|
|
||||||
number /= 10;
|
|
||||||
}
|
|
||||||
|
|
||||||
num_digits
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_num_digits_in_usize() {
|
|
||||||
let f = num_digits_in_usize;
|
|
||||||
|
|
||||||
assert_eq!(f(0), 1);
|
|
||||||
assert_eq!(f(1), 1);
|
|
||||||
assert_eq!(f(9), 1);
|
|
||||||
assert_eq!(f(10), 2);
|
|
||||||
assert_eq!(f(11), 2);
|
|
||||||
assert_eq!(f(99), 2);
|
|
||||||
assert_eq!(f(100), 3);
|
|
||||||
assert_eq!(f(101), 3);
|
|
||||||
assert_eq!(f(1000), 4);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ use glommio::channels::shared_channel::ConnectedReceiver;
|
||||||
use glommio::net::{TcpListener, TcpStream};
|
use glommio::net::{TcpListener, TcpStream};
|
||||||
use glommio::timer::TimerActionRepeat;
|
use glommio::timer::TimerActionRepeat;
|
||||||
use glommio::{enclose, prelude::*};
|
use glommio::{enclose, prelude::*};
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
use crate::common::*;
|
use crate::common::*;
|
||||||
|
|
@ -29,6 +30,14 @@ use crate::config::Config;
|
||||||
|
|
||||||
const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
|
const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
|
||||||
const MAX_REQUEST_SIZE: usize = 2048;
|
const MAX_REQUEST_SIZE: usize = 2048;
|
||||||
|
const MAX_RESPONSE_SIZE: usize = 4096;
|
||||||
|
|
||||||
|
const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
|
||||||
|
const RESPONSE_HEADER_B: &[u8] = b" ";
|
||||||
|
const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n";
|
||||||
|
|
||||||
|
static RESPONSE_HEADER: Lazy<Vec<u8>> =
|
||||||
|
Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat());
|
||||||
|
|
||||||
struct PendingScrapeResponse {
|
struct PendingScrapeResponse {
|
||||||
pending_worker_responses: usize,
|
pending_worker_responses: usize,
|
||||||
|
|
@ -166,6 +175,7 @@ struct Connection {
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
request_buffer: [u8; MAX_REQUEST_SIZE],
|
request_buffer: [u8; MAX_REQUEST_SIZE],
|
||||||
request_buffer_position: usize,
|
request_buffer_position: usize,
|
||||||
|
response_buffer: [u8; MAX_RESPONSE_SIZE],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Connection {
|
impl Connection {
|
||||||
|
|
@ -186,6 +196,10 @@ impl Connection {
|
||||||
let tls_acceptor: TlsAcceptor = tls_config.into();
|
let tls_acceptor: TlsAcceptor = tls_config.into();
|
||||||
let stream = tls_acceptor.accept(stream).await?;
|
let stream = tls_acceptor.accept(stream).await?;
|
||||||
|
|
||||||
|
let mut response_buffer = [0; MAX_RESPONSE_SIZE];
|
||||||
|
|
||||||
|
response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER);
|
||||||
|
|
||||||
let mut conn = Connection {
|
let mut conn = Connection {
|
||||||
config: config.clone(),
|
config: config.clone(),
|
||||||
access_list_cache: create_access_list_cache(&access_list),
|
access_list_cache: create_access_list_cache(&access_list),
|
||||||
|
|
@ -195,8 +209,9 @@ impl Connection {
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
connection_id,
|
connection_id,
|
||||||
request_buffer: [0u8; MAX_REQUEST_SIZE],
|
request_buffer: [0; MAX_REQUEST_SIZE],
|
||||||
request_buffer_position: 0,
|
request_buffer_position: 0,
|
||||||
|
response_buffer,
|
||||||
};
|
};
|
||||||
|
|
||||||
conn.run_request_response_loop().await?;
|
conn.run_request_response_loop().await?;
|
||||||
|
|
@ -406,22 +421,44 @@ impl Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write_response(&mut self, response: &Response) -> anyhow::Result<()> {
|
async fn write_response(&mut self, response: &Response) -> anyhow::Result<()> {
|
||||||
let mut body = Vec::new();
|
// Write body and final newline to response buffer
|
||||||
|
|
||||||
response.write(&mut body).unwrap();
|
let mut position = RESPONSE_HEADER.len();
|
||||||
|
|
||||||
let content_len = body.len() + 2; // 2 is for newlines at end
|
let body_len = response.write(&mut &mut self.response_buffer[position..])?;
|
||||||
let content_len_num_digits = num_digits_in_usize(content_len);
|
|
||||||
|
|
||||||
let mut response_bytes = Vec::with_capacity(39 + content_len_num_digits + body.len());
|
position += body_len;
|
||||||
|
|
||||||
response_bytes.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: ");
|
(&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n");
|
||||||
::itoa::write(&mut response_bytes, content_len)?;
|
|
||||||
response_bytes.extend_from_slice(b"\r\n\r\n");
|
|
||||||
response_bytes.append(&mut body);
|
|
||||||
response_bytes.extend_from_slice(b"\r\n");
|
|
||||||
|
|
||||||
self.stream.write(&response_bytes[..]).await?;
|
position += 2;
|
||||||
|
|
||||||
|
let content_len = body_len + 2;
|
||||||
|
|
||||||
|
// Clear content-len header value
|
||||||
|
|
||||||
|
{
|
||||||
|
let start = RESPONSE_HEADER_A.len();
|
||||||
|
let end = start + RESPONSE_HEADER_B.len();
|
||||||
|
|
||||||
|
(&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set content-len header value
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut buf = ::itoa::Buffer::new();
|
||||||
|
let content_len_bytes = buf.format(content_len).as_bytes();
|
||||||
|
|
||||||
|
let start = RESPONSE_HEADER_A.len();
|
||||||
|
let end = start + content_len_bytes.len();
|
||||||
|
|
||||||
|
(&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write buffer to stream
|
||||||
|
|
||||||
|
self.stream.write(&self.response_buffer[..position]).await?;
|
||||||
self.stream.flush().await?;
|
self.stream.flush().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ anyhow = "1"
|
||||||
hashbrown = "0.11"
|
hashbrown = "0.11"
|
||||||
hex = { version = "0.4", default-features = false }
|
hex = { version = "0.4", default-features = false }
|
||||||
httparse = "1"
|
httparse = "1"
|
||||||
itoa = "0.4"
|
itoa = "1"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
memchr = "2"
|
memchr = "2"
|
||||||
rand = { version = "0.8", features = ["small_rng"] }
|
rand = { version = "0.8", features = ["small_rng"] }
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue