From 921fb57e9e2008a6394c4c9fdb508bafa64ff0cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Apr 2024 10:27:48 +0200 Subject: [PATCH] http: improve peer addr extraction logic --- crates/http/src/workers/socket/connection.rs | 77 +++++++++----------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs index c2d4089..787440c 100644 --- a/crates/http/src/workers/socket/connection.rs +++ b/crates/http/src/workers/socket/connection.rs @@ -14,7 +14,6 @@ use aquatic_http_protocol::response::{ FailureResponse, Response, ScrapeResponse, ScrapeStatistics, }; use arc_swap::ArcSwap; -use either::Either; use futures::stream::FuturesUnordered; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use futures_rustls::TlsAcceptor; @@ -110,7 +109,6 @@ pub(super) async fn run_connection( request_senders, valid_until, server_start_instant, - opt_peer_addr, peer_port, request_buffer, request_buffer_position: 0, @@ -119,7 +117,7 @@ pub(super) async fn run_connection( worker_index_string: worker_index.to_string(), }; - conn.run().await + conn.run(opt_peer_addr).await } else { let mut conn = Connection { config, @@ -127,7 +125,6 @@ pub(super) async fn run_connection( request_senders, valid_until, server_start_instant, - opt_peer_addr, peer_port, request_buffer, request_buffer_position: 0, @@ -136,7 +133,7 @@ pub(super) async fn run_connection( worker_index_string: worker_index.to_string(), }; - conn.run().await + conn.run(opt_peer_addr).await } } @@ -146,9 +143,6 @@ struct Connection { request_senders: Rc>, valid_until: Rc>, server_start_instant: ServerStartInstant, - // If we're running behind a reverse proxy, gets set as soon as we get a - // valid requiest. Otherwise, must be set before calling `run`. - opt_peer_addr: Option, peer_port: u16, request_buffer: Box<[u8; REQUEST_BUFFER_SIZE]>, request_buffer_position: usize, @@ -161,14 +155,21 @@ impl Connection where S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, { - async fn run(&mut self) -> Result<(), ConnectionError> { + async fn run( + &mut self, + // Set unless running behind reverse proxy + opt_stable_peer_addr: Option, + ) -> Result<(), ConnectionError> { loop { - let response = match self.read_request().await? { - Either::Left(response) => Response::Failure(response), - Either::Right(request) => self.handle_request(request).await?, - }; + let (request, opt_peer_addr) = self.read_request().await?; - self.write_response(&response).await?; + let peer_addr = opt_stable_peer_addr + .or(opt_peer_addr) + .ok_or(anyhow::anyhow!("Could not extract peer addr"))?; + + let response = self.handle_request(request, peer_addr).await?; + + self.write_response(&response, peer_addr).await?; if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive { break; @@ -178,7 +179,9 @@ where Ok(()) } - async fn read_request(&mut self) -> Result, ConnectionError> { + async fn read_request( + &mut self, + ) -> Result<(Request, Option), ConnectionError> { self.request_buffer_position = 0; loop { @@ -202,17 +205,19 @@ where match parse_request(&self.config, buffer_slice) { Ok((request, opt_peer_ip)) => { - if self.config.network.runs_behind_reverse_proxy { + let opt_peer_addr = if self.config.network.runs_behind_reverse_proxy { let peer_ip = opt_peer_ip .expect("logic error: peer ip must have been extracted at this point"); - self.opt_peer_addr = Some(CanonicalSocketAddr::new(SocketAddr::new( + Some(CanonicalSocketAddr::new(SocketAddr::new( peer_ip, self.peer_port, - ))); - } + ))) + } else { + None + }; - return Ok(Either::Right(request)); + return Ok((request, opt_peer_addr)); } Err(RequestParseError::MoreDataNeeded) => continue, Err(RequestParseError::RequiredPeerIpHeaderMissing(err)) => { @@ -220,12 +225,6 @@ where } Err(RequestParseError::Other(err)) => { ::log::debug!("Failed parsing request: {:#}", err); - - let response = FailureResponse { - failure_reason: "Invalid request".into(), - }; - - return Ok(Either::Left(response)); } } } @@ -238,11 +237,11 @@ where /// response /// - If it is a scrape requests, split it up, pass on the parts to /// relevant swarm workers and await a response - async fn handle_request(&mut self, request: Request) -> Result { - let peer_addr = self - .opt_peer_addr - .expect("peer addr should already have been extracted by now"); - + async fn handle_request( + &mut self, + request: Request, + peer_addr: CanonicalSocketAddr, + ) -> Result { *self.valid_until.borrow_mut() = ValidUntil::new( self.server_start_instant, self.config.cleaning.max_connection_idle, @@ -385,7 +384,11 @@ where } } - async fn write_response(&mut self, response: &Response) -> Result<(), ConnectionError> { + async fn write_response( + &mut self, + response: &Response, + peer_addr: CanonicalSocketAddr, + ) -> Result<(), ConnectionError> { // Write body and final newline to response buffer let mut position = RESPONSE_HEADER.len(); @@ -443,15 +446,7 @@ where Response::Failure(_) => "error", }; - // If we're behind a reverse proxy and we're sending an error - // response due to failing to parse a request, opt_peer_addr might - // not yet be set (and in that case, we don't know if the true peer - // connects over IPv4 or IPv6) - let ip_version_str = self - .opt_peer_addr - .as_ref() - .map(peer_addr_to_ip_version_str) - .unwrap_or("?"); + let ip_version_str = peer_addr_to_ip_version_str(&peer_addr); ::metrics::counter!( "aquatic_responses_total",