From 3b94b8e588a3faa86de507502f76fb58100a50bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 00:53:31 +0200 Subject: [PATCH] WIP: ws: parse X-FORWARDED-FOR headers --- Cargo.lock | 1 + TODO.md | 6 +- aquatic_ws/Cargo.toml | 1 + aquatic_ws/src/config.rs | 7 +++ aquatic_ws/src/workers/socket.rs | 94 ++++++++++++++++++++++++-------- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2d79a8..3e659a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,7 @@ dependencies = [ "futures-rustls", "glommio", "hashbrown 0.12.3", + "httparse", "log", "mimalloc", "privdrop", diff --git a/TODO.md b/TODO.md index 1c7a02e..76e10ee 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,11 @@ ## High priority -ws: reverse proxy support for non-TLS connections (parse x-forwarded-for) +* ws + * reverse proxy support for non-TLS connections (parse x-forwarded-for) + * how does this interact with IPv4/IPv6 differentiation? + * is it possible to skip determining peer IP's altogether or is it necessary + for IPv4/IPv6 separation / preventing peer mixups or abuse? ## Medium priority diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index c0d3724..4ea8a6b 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -29,6 +29,7 @@ futures-lite = "1" futures-rustls = "0.22" glommio = "0.7" hashbrown = { version = "0.12", features = ["serde"] } +httparse = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 792e56a..df93432 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -70,6 +70,11 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, + /// Trust X-Forwarded-For headers to get peer IP. Only use this if you are + /// running aquatic_ws behind a reverse proxy that sets them and your + /// instance is not accessible by other means. + pub trust_x_forwarded_for: bool, + /// Return a HTTP 200 Ok response when receiving GET /health, but only /// when not running over TLS pub enable_http_health_checks: bool, @@ -89,6 +94,8 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, + trust_x_forwarded_for: false, + enable_http_health_checks: false, } } diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 5be08f8..136a604 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; +use std::net::{IpAddr, SocketAddr}; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; @@ -47,7 +48,8 @@ struct ConnectionReference { valid_until: ValidUntil, peer_id: Option, announced_info_hashes: HashSet, - peer_addr: CanonicalSocketAddr, + /// May need to be parsed from X-Forwarded-For headers + peer_addr: Option, } pub async fn run_socket_worker( @@ -114,18 +116,6 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - let peer_addr = match stream.peer_addr() { - Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr), - Err(err) => { - ::log::info!( - "could not extract peer address, closing connection: {:#}", - err - ); - - continue; - } - }; - let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let out_message_sender = Rc::new(out_message_sender); @@ -135,14 +125,10 @@ pub async fn run_socket_worker( valid_until: ValidUntil::new(config.cleaning.max_connection_idle), peer_id: None, announced_info_hashes: Default::default(), - peer_addr, + peer_addr: None, }); - ::log::info!( - "accepting stream from {}, assigning id {}", - peer_addr.get(), - key - ); + ::log::info!("accepting stream, assigning id {}", key); let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( @@ -158,7 +144,6 @@ pub async fn run_socket_worker( ConnectionId(key), opt_tls_config, stream, - peer_addr ).await { ::log::debug!("connection error: {:#}", err); } @@ -171,12 +156,12 @@ pub async fn run_socket_worker( // Tell swarm workers to remove peer if let Some(reference) = opt_reference { - if let Some(peer_id) = reference.peer_id { + if let (Some(peer_id), Some(peer_addr)) = (reference.peer_id, reference.peer_addr) { for info_hash in reference.announced_info_hashes { let message = SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr: reference.peer_addr, + peer_addr, }; let consumer_index = @@ -275,8 +260,28 @@ async fn run_connection( connection_id: ConnectionId, opt_tls_config: Option>, mut stream: TcpStream, - peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { + let remote_addr = stream + .peer_addr() + .map_err(|err| anyhow::anyhow!("could not extract peer address: {:#}", err))?; + + let peer_addr = if config.network.trust_x_forwarded_for { + let ip = parse_x_forwarded_for_ip(&stream).await?; + // Using the reverse proxy connection port here should be fine, since + // we only use the CanonicalPeerAddr to differentiate connections from + // each other as well as to determine if they run on IPv4 or IPv6, + // not for sending responses or passing on to peers. + let port = remote_addr.port(); + + CanonicalSocketAddr::new(SocketAddr::new(ip, port)) + } else { + CanonicalSocketAddr::new(remote_addr) + }; + + if let Some(connection_reference) = connection_slab.borrow_mut().get_mut(connection_id.0) { + connection_reference.peer_addr = Some(peer_addr); + } + if let Some(tls_config) = opt_tls_config { let tls_acceptor: TlsAcceptor = tls_config.into(); @@ -341,6 +346,49 @@ async fn run_connection( } } +async fn parse_x_forwarded_for_ip(stream: &TcpStream) -> anyhow::Result { + let mut peek_buf = [0u8; 1024]; + + let mut position = 0usize; + + for _ in 0..16 { + let bytes_read = stream + .peek(&mut peek_buf[position..]) + .await + .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; + + position += bytes_read; + + if bytes_read == 0 { + return Err(anyhow::anyhow!( + "zero bytes read while parsing x-forwarded-for" + )); + } + + let mut headers = [httparse::EMPTY_HEADER; 32]; + let mut req = httparse::Request::new(&mut headers); + + if req.parse(&peek_buf)?.is_complete() { + for header in req.headers.iter() { + if header.name == "X-Forwarded-For" { + let ip: IpAddr = ::std::str::from_utf8(header.value)?.parse()?; + + // ip.is_global() { // FIXME + if true { + return Ok(ip); + } + } + } + + break; + } + } + + Err(anyhow::anyhow!( + "Could not determine source IP through X-Forwarded-For headers" + )) +} + async fn run_stream_agnostic_connection< S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, >(