http: improve peer addr extraction logic

This commit is contained in:
Joakim Frostegård 2024-04-20 10:27:48 +02:00
parent e0c0dd7865
commit 921fb57e9e

View file

@ -14,7 +14,6 @@ use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics, FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
}; };
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use either::Either;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use futures_rustls::TlsAcceptor; use futures_rustls::TlsAcceptor;
@ -110,7 +109,6 @@ pub(super) async fn run_connection(
request_senders, request_senders,
valid_until, valid_until,
server_start_instant, server_start_instant,
opt_peer_addr,
peer_port, peer_port,
request_buffer, request_buffer,
request_buffer_position: 0, request_buffer_position: 0,
@ -119,7 +117,7 @@ pub(super) async fn run_connection(
worker_index_string: worker_index.to_string(), worker_index_string: worker_index.to_string(),
}; };
conn.run().await conn.run(opt_peer_addr).await
} else { } else {
let mut conn = Connection { let mut conn = Connection {
config, config,
@ -127,7 +125,6 @@ pub(super) async fn run_connection(
request_senders, request_senders,
valid_until, valid_until,
server_start_instant, server_start_instant,
opt_peer_addr,
peer_port, peer_port,
request_buffer, request_buffer,
request_buffer_position: 0, request_buffer_position: 0,
@ -136,7 +133,7 @@ pub(super) async fn run_connection(
worker_index_string: worker_index.to_string(), worker_index_string: worker_index.to_string(),
}; };
conn.run().await conn.run(opt_peer_addr).await
} }
} }
@ -146,9 +143,6 @@ struct Connection<S> {
request_senders: Rc<Senders<ChannelRequest>>, request_senders: Rc<Senders<ChannelRequest>>,
valid_until: Rc<RefCell<ValidUntil>>, valid_until: Rc<RefCell<ValidUntil>>,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
// If we're running behind a reverse proxy, gets set as soon as we get a
// valid requiest. Otherwise, must be set before calling `run`.
opt_peer_addr: Option<CanonicalSocketAddr>,
peer_port: u16, peer_port: u16,
request_buffer: Box<[u8; REQUEST_BUFFER_SIZE]>, request_buffer: Box<[u8; REQUEST_BUFFER_SIZE]>,
request_buffer_position: usize, request_buffer_position: usize,
@ -161,14 +155,21 @@ impl<S> Connection<S>
where where
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
{ {
async fn run(&mut self) -> Result<(), ConnectionError> { async fn run(
&mut self,
// Set unless running behind reverse proxy
opt_stable_peer_addr: Option<CanonicalSocketAddr>,
) -> Result<(), ConnectionError> {
loop { loop {
let response = match self.read_request().await? { let (request, opt_peer_addr) = self.read_request().await?;
Either::Left(response) => Response::Failure(response),
Either::Right(request) => self.handle_request(request).await?,
};
self.write_response(&response).await?; let peer_addr = opt_stable_peer_addr
.or(opt_peer_addr)
.ok_or(anyhow::anyhow!("Could not extract peer addr"))?;
let response = self.handle_request(request, peer_addr).await?;
self.write_response(&response, peer_addr).await?;
if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive { if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive {
break; break;
@ -178,7 +179,9 @@ where
Ok(()) Ok(())
} }
async fn read_request(&mut self) -> Result<Either<FailureResponse, Request>, ConnectionError> { async fn read_request(
&mut self,
) -> Result<(Request, Option<CanonicalSocketAddr>), ConnectionError> {
self.request_buffer_position = 0; self.request_buffer_position = 0;
loop { loop {
@ -202,17 +205,19 @@ where
match parse_request(&self.config, buffer_slice) { match parse_request(&self.config, buffer_slice) {
Ok((request, opt_peer_ip)) => { Ok((request, opt_peer_ip)) => {
if self.config.network.runs_behind_reverse_proxy { let opt_peer_addr = if self.config.network.runs_behind_reverse_proxy {
let peer_ip = opt_peer_ip let peer_ip = opt_peer_ip
.expect("logic error: peer ip must have been extracted at this point"); .expect("logic error: peer ip must have been extracted at this point");
self.opt_peer_addr = Some(CanonicalSocketAddr::new(SocketAddr::new( Some(CanonicalSocketAddr::new(SocketAddr::new(
peer_ip, peer_ip,
self.peer_port, self.peer_port,
))); )))
} } else {
None
};
return Ok(Either::Right(request)); return Ok((request, opt_peer_addr));
} }
Err(RequestParseError::MoreDataNeeded) => continue, Err(RequestParseError::MoreDataNeeded) => continue,
Err(RequestParseError::RequiredPeerIpHeaderMissing(err)) => { Err(RequestParseError::RequiredPeerIpHeaderMissing(err)) => {
@ -220,12 +225,6 @@ where
} }
Err(RequestParseError::Other(err)) => { Err(RequestParseError::Other(err)) => {
::log::debug!("Failed parsing request: {:#}", err); ::log::debug!("Failed parsing request: {:#}", err);
let response = FailureResponse {
failure_reason: "Invalid request".into(),
};
return Ok(Either::Left(response));
} }
} }
} }
@ -238,11 +237,11 @@ where
/// response /// response
/// - If it is a scrape requests, split it up, pass on the parts to /// - If it is a scrape requests, split it up, pass on the parts to
/// relevant swarm workers and await a response /// relevant swarm workers and await a response
async fn handle_request(&mut self, request: Request) -> Result<Response, ConnectionError> { async fn handle_request(
let peer_addr = self &mut self,
.opt_peer_addr request: Request,
.expect("peer addr should already have been extracted by now"); peer_addr: CanonicalSocketAddr,
) -> Result<Response, ConnectionError> {
*self.valid_until.borrow_mut() = ValidUntil::new( *self.valid_until.borrow_mut() = ValidUntil::new(
self.server_start_instant, self.server_start_instant,
self.config.cleaning.max_connection_idle, self.config.cleaning.max_connection_idle,
@ -385,7 +384,11 @@ where
} }
} }
async fn write_response(&mut self, response: &Response) -> Result<(), ConnectionError> { async fn write_response(
&mut self,
response: &Response,
peer_addr: CanonicalSocketAddr,
) -> Result<(), ConnectionError> {
// Write body and final newline to response buffer // Write body and final newline to response buffer
let mut position = RESPONSE_HEADER.len(); let mut position = RESPONSE_HEADER.len();
@ -443,15 +446,7 @@ where
Response::Failure(_) => "error", Response::Failure(_) => "error",
}; };
// If we're behind a reverse proxy and we're sending an error let ip_version_str = peer_addr_to_ip_version_str(&peer_addr);
// response due to failing to parse a request, opt_peer_addr might
// not yet be set (and in that case, we don't know if the true peer
// connects over IPv4 or IPv6)
let ip_version_str = self
.opt_peer_addr
.as_ref()
.map(peer_addr_to_ip_version_str)
.unwrap_or("?");
::metrics::counter!( ::metrics::counter!(
"aquatic_responses_total", "aquatic_responses_total",