diff --git a/README.md b/README.md index e35c7b1..f21c53b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ of sub-implementations for different protocols: |--------------|--------------------------------------------|------------------------------| | aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) | | aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) | -| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) | +| aquatic_ws | [WebTorrent] over TLS ([rustls], optional) | Linux 5.8+ (using [glommio]) | Features at a glance: diff --git a/TODO.md b/TODO.md index 76e10ee..5396993 100644 --- a/TODO.md +++ b/TODO.md @@ -2,12 +2,6 @@ ## High priority -* ws - * reverse proxy support for non-TLS connections (parse x-forwarded-for) - * how does this interact with IPv4/IPv6 differentiation? - * is it possible to skip determining peer IP's altogether or is it necessary - for IPv4/IPv6 separation / preventing peer mixups or abuse? - ## Medium priority * quit whole program if any thread panics diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index 27aa134..a099e68 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -1,11 +1,28 @@ -use std::sync::Arc; +use std::{net::IpAddr, sync::Arc}; use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; use aquatic_ws_protocol::{InfoHash, PeerId}; +#[derive(Copy, Clone, Debug)] +pub enum IpVersion { + V4, + V6, +} + +impl IpVersion { + pub fn canonical_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => Self::V4, + IpAddr::V6(addr) => match addr.octets() { + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, _, _, _, _] => Self::V4, + _ => Self::V6, + }, + } + } +} + #[derive(Default, Clone)] pub struct State { pub access_list: Arc, @@ -17,7 +34,7 @@ pub struct PendingScrapeId(pub usize); #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub struct ConnectionId(pub usize); #[derive(Clone, Copy, Debug)] @@ -26,7 +43,7 @@ pub struct ConnectionMeta { /// sending back response through correct channel to correct worker. pub out_message_consumer_id: ConsumerId, pub connection_id: ConnectionId, - pub peer_addr: CanonicalSocketAddr, + pub ip_version: IpVersion, pub pending_scrape_id: Option, } @@ -35,6 +52,6 @@ pub enum SwarmControlMessage { ConnectionClosed { info_hash: InfoHash, peer_id: PeerId, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, }, } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index df93432..2026bd4 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -9,6 +9,9 @@ use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration +/// +/// Running behind a reverse proxy is supported, but IPv4 peer requests have +/// to be proxied to IPv4 requests, and IPv6 requests to IPv6 requests. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct Config { @@ -70,13 +73,8 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, - /// Trust X-Forwarded-For headers to get peer IP. Only use this if you are - /// running aquatic_ws behind a reverse proxy that sets them and your - /// instance is not accessible by other means. - pub trust_x_forwarded_for: bool, - - /// Return a HTTP 200 Ok response when receiving GET /health, but only - /// when not running over TLS + /// Return a HTTP 200 Ok response when receiving GET /health. Can not be + /// combined with enable_tls. pub enable_http_health_checks: bool, } @@ -94,8 +92,6 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, - trust_x_forwarded_for: false, - enable_http_health_checks: false, } } diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 136a604..14ba2bb 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -1,7 +1,6 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; -use std::net::{IpAddr, SocketAddr}; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; @@ -11,7 +10,7 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; +use aquatic_common::PanicSentinel; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -48,8 +47,7 @@ struct ConnectionReference { valid_until: ValidUntil, peer_id: Option, announced_info_hashes: HashSet, - /// May need to be parsed from X-Forwarded-For headers - peer_addr: Option, + ip_version: IpVersion, } pub async fn run_socket_worker( @@ -116,6 +114,15 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { + let ip_version = match stream.peer_addr() { + Ok(addr) => IpVersion::canonical_from_ip(addr.ip()), + Err(err) => { + ::log::info!("could not extract ip version (v4 or v6): {:#}", err); + + continue; + } + }; + let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let out_message_sender = Rc::new(out_message_sender); @@ -125,7 +132,7 @@ pub async fn run_socket_worker( valid_until: ValidUntil::new(config.cleaning.max_connection_idle), peer_id: None, announced_info_hashes: Default::default(), - peer_addr: None, + ip_version, }); ::log::info!("accepting stream, assigning id {}", key); @@ -143,6 +150,7 @@ pub async fn run_socket_worker( out_message_consumer_id, ConnectionId(key), opt_tls_config, + ip_version, stream, ).await { ::log::debug!("connection error: {:#}", err); @@ -156,12 +164,12 @@ pub async fn run_socket_worker( // Tell swarm workers to remove peer if let Some(reference) = opt_reference { - if let (Some(peer_id), Some(peer_addr)) = (reference.peer_id, reference.peer_addr) { + if let Some(peer_id) = reference.peer_id { for info_hash in reference.announced_info_hashes { let message = SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr, + ip_version: reference.ip_version, }; let consumer_index = @@ -259,29 +267,9 @@ async fn run_connection( out_message_consumer_id: ConsumerId, connection_id: ConnectionId, opt_tls_config: Option>, + ip_version: IpVersion, mut stream: TcpStream, ) -> anyhow::Result<()> { - let remote_addr = stream - .peer_addr() - .map_err(|err| anyhow::anyhow!("could not extract peer address: {:#}", err))?; - - let peer_addr = if config.network.trust_x_forwarded_for { - let ip = parse_x_forwarded_for_ip(&stream).await?; - // Using the reverse proxy connection port here should be fine, since - // we only use the CanonicalPeerAddr to differentiate connections from - // each other as well as to determine if they run on IPv4 or IPv6, - // not for sending responses or passing on to peers. - let port = remote_addr.port(); - - CanonicalSocketAddr::new(SocketAddr::new(ip, port)) - } else { - CanonicalSocketAddr::new(remote_addr) - }; - - if let Some(connection_reference) = connection_slab.borrow_mut().get_mut(connection_id.0) { - connection_reference.peer_addr = Some(peer_addr); - } - if let Some(tls_config) = opt_tls_config { let tls_acceptor: TlsAcceptor = tls_config.into(); @@ -299,10 +287,14 @@ async fn run_connection( out_message_consumer_id, connection_id, stream, - peer_addr, + ip_version, ) .await } else { + // Implementing this over TLS is too cumbersome, since the crate used + // for TLS streams doesn't support peak and tungstenite doesn't + // properly support sending a HTTP error response in accept_hdr + // callback. if config.network.enable_http_health_checks { let mut peek_buf = [0u8; 11]; @@ -340,55 +332,12 @@ async fn run_connection( out_message_consumer_id, connection_id, stream, - peer_addr, + ip_version, ) .await } } -async fn parse_x_forwarded_for_ip(stream: &TcpStream) -> anyhow::Result { - let mut peek_buf = [0u8; 1024]; - - let mut position = 0usize; - - for _ in 0..16 { - let bytes_read = stream - .peek(&mut peek_buf[position..]) - .await - .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; - - position += bytes_read; - - if bytes_read == 0 { - return Err(anyhow::anyhow!( - "zero bytes read while parsing x-forwarded-for" - )); - } - - let mut headers = [httparse::EMPTY_HEADER; 32]; - let mut req = httparse::Request::new(&mut headers); - - if req.parse(&peek_buf)?.is_complete() { - for header in req.headers.iter() { - if header.name == "X-Forwarded-For" { - let ip: IpAddr = ::std::str::from_utf8(header.value)?.parse()?; - - // ip.is_global() { // FIXME - if true { - return Ok(ip); - } - } - } - - break; - } - } - - Err(anyhow::anyhow!( - "Could not determine source IP through X-Forwarded-For headers" - )) -} - async fn run_stream_agnostic_connection< S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, >( @@ -403,7 +352,7 @@ async fn run_stream_agnostic_connection< out_message_consumer_id: ConsumerId, connection_id: ConnectionId, stream: S, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, ) -> anyhow::Result<()> { let ws_config = tungstenite::protocol::WebSocketConfig { max_frame_size: Some(config.network.websocket_max_frame_size), @@ -429,7 +378,7 @@ async fn run_stream_agnostic_connection< pending_scrape_slab, out_message_consumer_id, ws_in, - peer_addr, + ip_version, connection_id, }; @@ -450,7 +399,6 @@ async fn run_stream_agnostic_connection< connection_slab, ws_out, pending_scrape_slab, - peer_addr, connection_id, }; @@ -475,7 +423,7 @@ struct ConnectionReader { pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, connection_id: ConnectionId, } @@ -658,7 +606,7 @@ impl ConnectionReader { ConnectionMeta { connection_id: self.connection_id, out_message_consumer_id: self.out_message_consumer_id, - peer_addr: self.peer_addr, + ip_version: self.ip_version, pending_scrape_id, } } @@ -670,7 +618,6 @@ struct ConnectionWriter { connection_slab: Rc>>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, - peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } @@ -681,10 +628,6 @@ impl ConnectionWriter { anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed") })?; - if meta.peer_addr != self.peer_addr { - return Err(anyhow::anyhow!("peer addresses didn't match")); - } - match out_message { OutMessage::ScrapeResponse(out_message) => { let pending_scrape_id = meta @@ -753,11 +696,7 @@ impl ConnectionWriter { } Ok(Err(err)) => Err(err.into()), Err(err) => { - ::log::info!( - "send_out_message: send to {} took to long: {}", - self.peer_addr.get(), - err - ); + ::log::info!("send_out_message: sending to peer took to long: {}", err); Ok(()) } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 0ea07ac..22a9f02 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -208,14 +208,11 @@ where SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr, + ip_version, } => { - ::log::debug!( - "Removing peer {} from torrents because connection was closed", - peer_addr.get() - ); + ::log::debug!("Removing peer from torrents because connection was closed"); - if peer_addr.is_ipv4() { + if let IpVersion::V4 = ip_version { if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) { torrent_data.remove_peer(peer_id); } @@ -305,18 +302,18 @@ fn handle_announce_request( request_sender_meta: ConnectionMeta, request: AnnounceRequest, ) { - let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() { + let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version { torrent_maps.ipv4.entry(request.info_hash).or_default() } else { torrent_maps.ipv6.entry(request.info_hash).or_default() }; - // If there is already a peer with this peer_id, check that socket - // addr is same as that of request sender. Otherwise, ignore request. - // Since peers have access to each others peer_id's, they could send - // requests using them, causing all sorts of issues. + // If there is already a peer with this peer_id, check that connection id + // is same as that of request sender. Otherwise, ignore request. Since + // peers have access to each others peer_id's, they could send requests + // using them, causing all sorts of issues. if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { - if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr { + if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id { return; } } @@ -454,7 +451,7 @@ fn handle_scrape_request( files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() { + let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version { &mut torrent_maps.ipv4 } else { &mut torrent_maps.ipv6