aquatic_udp: glommio: limit request size, use array as buffer

This commit is contained in:
Joakim Frostegård 2021-10-27 14:28:15 +02:00
parent dd968821e4
commit 8747f8de4e

View file

@ -30,7 +30,8 @@ use crate::config::Config;
use super::common::*; use super::common::*;
const BUFFER_SIZE: usize = 1024; const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
const MAX_REQUEST_SIZE: usize = 2048;
struct PendingScrapeResponse { struct PendingScrapeResponse {
pending_worker_responses: usize, pending_worker_responses: usize,
@ -51,7 +52,8 @@ struct Connection {
tls: ServerConnection, tls: ServerConnection,
stream: TcpStream, stream: TcpStream,
connection_id: ConnectionId, connection_id: ConnectionId,
request_buffer: Vec<u8>, request_buffer: [u8; MAX_REQUEST_SIZE],
request_buffer_position: usize,
} }
pub async fn run_socket_worker( pub async fn run_socket_worker(
@ -125,7 +127,8 @@ pub async fn run_socket_worker(
tls: ServerConnection::new(tls_config.clone()).unwrap(), tls: ServerConnection::new(tls_config.clone()).unwrap(),
stream, stream,
connection_id: ConnectionId(entry.key()), connection_id: ConnectionId(entry.key()),
request_buffer: Vec::new(), request_buffer: [0u8; MAX_REQUEST_SIZE],
request_buffer_position: 0,
}; };
let connections_to_remove = connections_to_remove.clone(); let connections_to_remove = connections_to_remove.clone();
@ -219,7 +222,7 @@ impl Connection {
loop { loop {
::log::debug!("read_tls"); ::log::debug!("read_tls");
let mut buf = [0u8; BUFFER_SIZE]; let mut buf = [0u8; INTERMEDIATE_BUFFER_SIZE];
let bytes_read = self.stream.read(&mut buf).await?; let bytes_read = self.stream.read(&mut buf).await?;
@ -240,10 +243,20 @@ impl Connection {
break; break;
} }
Ok(amt) => { Ok(amt) => {
self.request_buffer.extend_from_slice(&buf[..amt]); let end = self.request_buffer_position + amt;
if end > self.request_buffer.len() {
return Err(anyhow::anyhow!("request too large"));
} else {
let request_buffer_slice = &mut self.request_buffer[self.request_buffer_position..end];
request_buffer_slice.copy_from_slice(&buf[..amt]);
self.request_buffer_position = end;
added_plaintext = true; added_plaintext = true;
} }
}
Err(err) if err.kind() == ErrorKind::WouldBlock => { Err(err) if err.kind() == ErrorKind::WouldBlock => {
break; break;
} }
@ -258,11 +271,11 @@ impl Connection {
} }
if added_plaintext { if added_plaintext {
match Request::from_bytes(&self.request_buffer[..]) { match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
Ok(request) => { Ok(request) => {
::log::debug!("received request: {:?}", request); ::log::debug!("received request: {:?}", request);
self.request_buffer = Vec::new(); self.request_buffer_position = 0;
return Ok(Some(Either::Left(request))); return Ok(Some(Either::Left(request)));
} }