mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 18:55:32 +00:00
http: socket workers: simplify request buffering
This commit is contained in:
parent
91dcd3de4d
commit
380ca222de
1 changed files with 32 additions and 43 deletions
|
|
@ -28,9 +28,8 @@ use slab::Slab;
|
||||||
use crate::common::*;
|
use crate::common::*;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
|
const REQUEST_BUFFER_SIZE: usize = 2048;
|
||||||
const MAX_REQUEST_SIZE: usize = 2048;
|
const RESPONSE_BUFFER_SIZE: usize = 4096;
|
||||||
const MAX_RESPONSE_SIZE: usize = 4096;
|
|
||||||
|
|
||||||
const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
|
const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
|
||||||
const RESPONSE_HEADER_B: &[u8] = b" ";
|
const RESPONSE_HEADER_B: &[u8] = b" ";
|
||||||
|
|
@ -173,9 +172,9 @@ struct Connection {
|
||||||
stream: TlsStream<TcpStream>,
|
stream: TlsStream<TcpStream>,
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
request_buffer: [u8; MAX_REQUEST_SIZE],
|
request_buffer: [u8; REQUEST_BUFFER_SIZE],
|
||||||
request_buffer_position: usize,
|
request_buffer_position: usize,
|
||||||
response_buffer: [u8; MAX_RESPONSE_SIZE],
|
response_buffer: [u8; RESPONSE_BUFFER_SIZE],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Connection {
|
impl Connection {
|
||||||
|
|
@ -196,7 +195,7 @@ impl Connection {
|
||||||
let tls_acceptor: TlsAcceptor = tls_config.into();
|
let tls_acceptor: TlsAcceptor = tls_config.into();
|
||||||
let stream = tls_acceptor.accept(stream).await?;
|
let stream = tls_acceptor.accept(stream).await?;
|
||||||
|
|
||||||
let mut response_buffer = [0; MAX_RESPONSE_SIZE];
|
let mut response_buffer = [0; RESPONSE_BUFFER_SIZE];
|
||||||
|
|
||||||
response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER);
|
response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER);
|
||||||
|
|
||||||
|
|
@ -209,7 +208,7 @@ impl Connection {
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
connection_id,
|
connection_id,
|
||||||
request_buffer: [0; MAX_REQUEST_SIZE],
|
request_buffer: [0; REQUEST_BUFFER_SIZE],
|
||||||
request_buffer_position: 0,
|
request_buffer_position: 0,
|
||||||
response_buffer,
|
response_buffer,
|
||||||
};
|
};
|
||||||
|
|
@ -244,35 +243,28 @@ impl Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_request(&mut self) -> anyhow::Result<Either<FailureResponse, Request>> {
|
async fn read_request(&mut self) -> anyhow::Result<Either<FailureResponse, Request>> {
|
||||||
let mut buf = [0u8; INTERMEDIATE_BUFFER_SIZE];
|
self.request_buffer_position = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
::log::debug!("read");
|
if self.request_buffer_position == self.request_buffer.len() {
|
||||||
|
return Err(anyhow::anyhow!("request buffer is full"));
|
||||||
|
}
|
||||||
|
|
||||||
let bytes_read = self.stream.read(&mut buf).await?;
|
let bytes_read = self
|
||||||
|
.stream
|
||||||
|
.read(&mut self.request_buffer[self.request_buffer_position..])
|
||||||
|
.await?;
|
||||||
|
|
||||||
if bytes_read == 0 {
|
if bytes_read == 0 {
|
||||||
return Err(anyhow::anyhow!("peer closed connection"));
|
return Err(anyhow::anyhow!("peer closed connection"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let request_buffer_end = self.request_buffer_position + bytes_read;
|
self.request_buffer_position += bytes_read;
|
||||||
|
|
||||||
if request_buffer_end > self.request_buffer.len() {
|
|
||||||
return Err(anyhow::anyhow!("request too large"));
|
|
||||||
} else {
|
|
||||||
let request_buffer_slice =
|
|
||||||
&mut self.request_buffer[self.request_buffer_position..request_buffer_end];
|
|
||||||
|
|
||||||
request_buffer_slice.copy_from_slice(&buf[..bytes_read]);
|
|
||||||
|
|
||||||
self.request_buffer_position = request_buffer_end;
|
|
||||||
|
|
||||||
match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
|
match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
|
||||||
Ok(request) => {
|
Ok(request) => {
|
||||||
::log::debug!("received request: {:?}", request);
|
::log::debug!("received request: {:?}", request);
|
||||||
|
|
||||||
self.request_buffer_position = 0;
|
|
||||||
|
|
||||||
return Ok(Either::Right(request));
|
return Ok(Either::Right(request));
|
||||||
}
|
}
|
||||||
Err(RequestParseError::Invalid(err)) => {
|
Err(RequestParseError::Invalid(err)) => {
|
||||||
|
|
@ -287,15 +279,12 @@ impl Connection {
|
||||||
Err(RequestParseError::NeedMoreData) => {
|
Err(RequestParseError::NeedMoreData) => {
|
||||||
::log::debug!(
|
::log::debug!(
|
||||||
"need more request data. current data: {:?}",
|
"need more request data. current data: {:?}",
|
||||||
std::str::from_utf8(
|
std::str::from_utf8(&self.request_buffer[..self.request_buffer_position])
|
||||||
&self.request_buffer[..self.request_buffer_position]
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Take a request and:
|
/// Take a request and:
|
||||||
/// - Return error response if request is not allowed
|
/// - Return error response if request is not allowed
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue