From 018f32e9e900a539b6a8312b71baa32ab0ddf964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Jul 2022 22:09:34 +0200 Subject: [PATCH 01/15] ws: add optional HTTP 200 response for GET /health without TLS --- aquatic_ws/src/config.rs | 6 ++++++ aquatic_ws/src/workers/socket.rs | 29 ++++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 7f051a1..9a061c7 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -67,6 +67,10 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, + + /// Return a HTTP 200 Ok response when request is GET /health over plain + /// HTTP (no TLS!) + pub enable_http_health_check: bool, } impl Default for NetworkConfig { @@ -81,6 +85,8 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, + + enable_http_health_check: false, } } } diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index e21a72d..f90cada 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -14,7 +14,7 @@ use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; -use futures::StreamExt; +use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; @@ -157,7 +157,7 @@ pub async fn run_socket_worker( stream, peer_addr, ).await { - ::log::debug!("Connection::run() error: {:?}", err); + ::log::debug!("Connection::run() error: {:#}", err); } // Remove reference in separate statement to avoid @@ -269,9 +269,32 @@ async fn run_connection( out_message_consumer_id: ConsumerId, connection_id: ConnectionId, tls_config: Arc, - stream: TcpStream, + mut stream: TcpStream, peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { + if config.network.enable_http_health_check { + let mut peek_buf = [0u8; 11]; + + stream + .peek(&mut peek_buf) + .await + .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; + + if &peek_buf == b"GET /health" { + stream + .write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk") + .await + .map_err(|err| anyhow::anyhow!("error sending health check response: {:#}", err))?; + stream.flush().await.map_err(|err| { + anyhow::anyhow!("error flushing health check response: {:#}", err) + })?; + + return Err(anyhow::anyhow!( + "client requested health check, skipping websocket negotiation" + )); + } + } + let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; From a16ce91d4658c2c1f789e1f80803f9e97c189210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Jul 2022 23:21:17 +0200 Subject: [PATCH 02/15] ws: make TLS optional, allow HTTP health checks without TLS only --- README.md | 6 +- TODO.md | 2 + aquatic_ws/src/config.rs | 7 +- aquatic_ws/src/lib.rs | 21 ++++-- aquatic_ws/src/workers/socket.rs | 123 +++++++++++++++++++++++-------- 5 files changed, 116 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 20dc68f..e35c7b1 100644 --- a/README.md +++ b/README.md @@ -65,9 +65,9 @@ Generate configuration files. They come with comments and differ between protoco Make adjustments to the files. You will likely want to adjust `address` (listening address) under the `network` section. -Note that both `aquatic_http` and `aquatic_ws` require configuring TLS -certificate and private key files. More details are available in the -respective configuration files. +Note that both `aquatic_http` and `aquatic_ws` require configuring certificate +and private key files to run over TLS (which is optional for `aquatic_ws`). +More details are available in the respective configuration files. #### Workers diff --git a/TODO.md b/TODO.md index 5396993..2a3d1d4 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,8 @@ ## High priority +ws: does support for non-tls connections affect performance? + ## Medium priority * quit whole program if any thread panics diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 9a061c7..7e41bd0 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -60,6 +60,8 @@ pub struct NetworkConfig { /// Maximum number of pending TCP connections pub tcp_backlog: i32, + /// Enable TLS + pub enable_tls: bool, /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) @@ -68,8 +70,8 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, - /// Return a HTTP 200 Ok response when request is GET /health over plain - /// HTTP (no TLS!) + /// Return a HTTP 200 Ok response when receiving GET /health, but only + /// when not running over TLS pub enable_http_health_check: bool, } @@ -80,6 +82,7 @@ impl Default for NetworkConfig { only_ipv6: false, tcp_backlog: 1024, + enable_tls: false, tls_certificate_path: "".into(), tls_private_key_path: "".into(), diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 66cc57b..4fe33b3 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -26,6 +26,12 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + if config.network.enable_tls && config.network.enable_http_health_check { + return Err(anyhow::anyhow!( + "configuration: network.enable_tls and network.enable_http_health_check can't both be set to true" + )); + } + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; let state = State::default(); @@ -41,10 +47,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let tls_config = Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?); + let opt_tls_config = config + .network + .enable_tls + .then_some(Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?)); let mut executors = Vec::new(); @@ -52,7 +61,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); - let tls_config = tls_config.clone(); + let opt_tls_config = opt_tls_config.clone(); let control_mesh_builder = control_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); @@ -72,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, - tls_config, + opt_tls_config, control_mesh_builder, request_mesh_builder, response_mesh_builder, diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index f90cada..49ab4fc 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -16,7 +16,6 @@ use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; -use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; @@ -55,7 +54,7 @@ pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, state: State, - tls_config: Arc, + opt_tls_config: Option>, control_message_mesh_builder: MeshBuilder, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, @@ -139,9 +138,13 @@ pub async fn run_socket_worker( peer_addr, }); - ::log::info!("accepting stream: {}", key); + ::log::info!( + "accepting stream from {}, assigning id {}", + peer_addr.get(), + key + ); - let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move { + let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( config.clone(), access_list, @@ -153,13 +156,15 @@ pub async fn run_socket_worker( out_message_receiver, out_message_consumer_id, ConnectionId(key), - tls_config, + opt_tls_config, stream, - peer_addr, + peer_addr ).await { - ::log::debug!("Connection::run() error: {:#}", err); + ::log::debug!("connection error: {:#}", err); } + // Clean up after closed connection + // Remove reference in separate statement to avoid // multiple RefCell borrows let opt_reference = connection_slab.borrow_mut().try_remove(key); @@ -268,36 +273,90 @@ async fn run_connection( out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, - tls_config: Arc, + opt_tls_config: Option>, mut stream: TcpStream, peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { - if config.network.enable_http_health_check { - let mut peek_buf = [0u8; 11]; + if let Some(tls_config) = opt_tls_config { + let tls_acceptor: TlsAcceptor = tls_config.into(); - stream - .peek(&mut peek_buf) - .await - .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; + let stream = tls_acceptor.accept(stream).await?; + + run_stream_agnostic_connection( + config.clone(), + access_list, + in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), + out_message_sender, + out_message_receiver, + out_message_consumer_id, + connection_id, + stream, + peer_addr, + ) + .await + } else { + if config.network.enable_http_health_check { + let mut peek_buf = [0u8; 11]; - if &peek_buf == b"GET /health" { stream - .write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk") + .peek(&mut peek_buf) .await - .map_err(|err| anyhow::anyhow!("error sending health check response: {:#}", err))?; - stream.flush().await.map_err(|err| { - anyhow::anyhow!("error flushing health check response: {:#}", err) - })?; + .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; - return Err(anyhow::anyhow!( - "client requested health check, skipping websocket negotiation" - )); + if &peek_buf == b"GET /health" { + stream + .write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk") + .await + .map_err(|err| { + anyhow::anyhow!("error sending health check response: {:#}", err) + })?; + stream.flush().await.map_err(|err| { + anyhow::anyhow!("error flushing health check response: {:#}", err) + })?; + + return Err(anyhow::anyhow!( + "client requested health check, skipping websocket negotiation" + )); + } } + + run_stream_agnostic_connection( + config.clone(), + access_list, + in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), + out_message_sender, + out_message_receiver, + out_message_consumer_id, + connection_id, + stream, + peer_addr, + ) + .await } +} - let tls_acceptor: TlsAcceptor = tls_config.into(); - let stream = tls_acceptor.accept(stream).await?; - +async fn run_stream_agnostic_connection< + S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, +>( + config: Rc, + access_list: Arc, + in_message_senders: Rc>, + tq_prioritized: TaskQueueHandle, + tq_regular: TaskQueueHandle, + connection_slab: Rc>>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_consumer_id: ConsumerId, + connection_id: ConnectionId, + stream: S, + peer_addr: CanonicalSocketAddr, +) -> anyhow::Result<()> { let ws_config = tungstenite::protocol::WebSocketConfig { max_frame_size: Some(config.network.websocket_max_frame_size), max_message_size: Some(config.network.websocket_max_message_size), @@ -359,7 +418,7 @@ async fn run_connection( race(reader_handle, writer_handle).await.unwrap() } -struct ConnectionReader { +struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, connection_slab: Rc>>, @@ -367,12 +426,12 @@ struct ConnectionReader { out_message_sender: Rc>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, - ws_in: SplitStream>>, + ws_in: SplitStream>, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } -impl ConnectionReader { +impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { ::log::debug!("read_in_message"); @@ -557,17 +616,17 @@ impl ConnectionReader { } } -struct ConnectionWriter { +struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, connection_slab: Rc>>, - ws_out: SplitSink>, tungstenite::Message>, + ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } -impl ConnectionWriter { +impl ConnectionWriter { async fn run_out_message_loop(&mut self) -> anyhow::Result<()> { loop { let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| { From 88971cd8700b725e105200af3d29847d4ca5f2dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Jul 2022 23:29:45 +0200 Subject: [PATCH 03/15] ws: rename enable_http_health_check to enable_http_health_checks --- aquatic_ws/src/config.rs | 4 ++-- aquatic_ws/src/lib.rs | 2 +- aquatic_ws/src/workers/socket.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 7e41bd0..792e56a 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -72,7 +72,7 @@ pub struct NetworkConfig { /// Return a HTTP 200 Ok response when receiving GET /health, but only /// when not running over TLS - pub enable_http_health_check: bool, + pub enable_http_health_checks: bool, } impl Default for NetworkConfig { @@ -89,7 +89,7 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, - enable_http_health_check: false, + enable_http_health_checks: false, } } } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 4fe33b3..ee75e86 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -26,7 +26,7 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.network.enable_tls && config.network.enable_http_health_check { + if config.network.enable_tls && config.network.enable_http_health_checks { return Err(anyhow::anyhow!( "configuration: network.enable_tls and network.enable_http_health_check can't both be set to true" )); diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 49ab4fc..5be08f8 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -298,7 +298,7 @@ async fn run_connection( ) .await } else { - if config.network.enable_http_health_check { + if config.network.enable_http_health_checks { let mut peek_buf = [0u8; 11]; stream From 064d6fb14b296853e9f123c27365840098e25e39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Jul 2022 23:31:18 +0200 Subject: [PATCH 04/15] Update TODO --- TODO.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/TODO.md b/TODO.md index 2a3d1d4..1c7a02e 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,7 @@ ## High priority -ws: does support for non-tls connections affect performance? +ws: reverse proxy support for non-TLS connections (parse x-forwarded-for) ## Medium priority From 9f9015d51c0606362458014169290b1cc57cf40c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Jul 2022 23:44:16 +0200 Subject: [PATCH 05/15] GitHub CI: enable TLS for ws in file transfer test --- .github/actions/test-transfer/entrypoint.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index 1337eca..ea27737 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -82,6 +82,7 @@ echo "log_level = 'trace' [network] address = '127.0.0.1:3002' +enable_tls = true tls_certificate_path = './cert.crt' tls_private_key_path = './key.pk8' " > ws.toml From 3b94b8e588a3faa86de507502f76fb58100a50bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 00:53:31 +0200 Subject: [PATCH 06/15] WIP: ws: parse X-FORWARDED-FOR headers --- Cargo.lock | 1 + TODO.md | 6 +- aquatic_ws/Cargo.toml | 1 + aquatic_ws/src/config.rs | 7 +++ aquatic_ws/src/workers/socket.rs | 94 ++++++++++++++++++++++++-------- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2d79a8..3e659a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,7 @@ dependencies = [ "futures-rustls", "glommio", "hashbrown 0.12.3", + "httparse", "log", "mimalloc", "privdrop", diff --git a/TODO.md b/TODO.md index 1c7a02e..76e10ee 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,11 @@ ## High priority -ws: reverse proxy support for non-TLS connections (parse x-forwarded-for) +* ws + * reverse proxy support for non-TLS connections (parse x-forwarded-for) + * how does this interact with IPv4/IPv6 differentiation? + * is it possible to skip determining peer IP's altogether or is it necessary + for IPv4/IPv6 separation / preventing peer mixups or abuse? ## Medium priority diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index c0d3724..4ea8a6b 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -29,6 +29,7 @@ futures-lite = "1" futures-rustls = "0.22" glommio = "0.7" hashbrown = { version = "0.12", features = ["serde"] } +httparse = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 792e56a..df93432 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -70,6 +70,11 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, + /// Trust X-Forwarded-For headers to get peer IP. Only use this if you are + /// running aquatic_ws behind a reverse proxy that sets them and your + /// instance is not accessible by other means. + pub trust_x_forwarded_for: bool, + /// Return a HTTP 200 Ok response when receiving GET /health, but only /// when not running over TLS pub enable_http_health_checks: bool, @@ -89,6 +94,8 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, + trust_x_forwarded_for: false, + enable_http_health_checks: false, } } diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 5be08f8..136a604 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; +use std::net::{IpAddr, SocketAddr}; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; @@ -47,7 +48,8 @@ struct ConnectionReference { valid_until: ValidUntil, peer_id: Option, announced_info_hashes: HashSet, - peer_addr: CanonicalSocketAddr, + /// May need to be parsed from X-Forwarded-For headers + peer_addr: Option, } pub async fn run_socket_worker( @@ -114,18 +116,6 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - let peer_addr = match stream.peer_addr() { - Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr), - Err(err) => { - ::log::info!( - "could not extract peer address, closing connection: {:#}", - err - ); - - continue; - } - }; - let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let out_message_sender = Rc::new(out_message_sender); @@ -135,14 +125,10 @@ pub async fn run_socket_worker( valid_until: ValidUntil::new(config.cleaning.max_connection_idle), peer_id: None, announced_info_hashes: Default::default(), - peer_addr, + peer_addr: None, }); - ::log::info!( - "accepting stream from {}, assigning id {}", - peer_addr.get(), - key - ); + ::log::info!("accepting stream, assigning id {}", key); let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( @@ -158,7 +144,6 @@ pub async fn run_socket_worker( ConnectionId(key), opt_tls_config, stream, - peer_addr ).await { ::log::debug!("connection error: {:#}", err); } @@ -171,12 +156,12 @@ pub async fn run_socket_worker( // Tell swarm workers to remove peer if let Some(reference) = opt_reference { - if let Some(peer_id) = reference.peer_id { + if let (Some(peer_id), Some(peer_addr)) = (reference.peer_id, reference.peer_addr) { for info_hash in reference.announced_info_hashes { let message = SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr: reference.peer_addr, + peer_addr, }; let consumer_index = @@ -275,8 +260,28 @@ async fn run_connection( connection_id: ConnectionId, opt_tls_config: Option>, mut stream: TcpStream, - peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { + let remote_addr = stream + .peer_addr() + .map_err(|err| anyhow::anyhow!("could not extract peer address: {:#}", err))?; + + let peer_addr = if config.network.trust_x_forwarded_for { + let ip = parse_x_forwarded_for_ip(&stream).await?; + // Using the reverse proxy connection port here should be fine, since + // we only use the CanonicalPeerAddr to differentiate connections from + // each other as well as to determine if they run on IPv4 or IPv6, + // not for sending responses or passing on to peers. + let port = remote_addr.port(); + + CanonicalSocketAddr::new(SocketAddr::new(ip, port)) + } else { + CanonicalSocketAddr::new(remote_addr) + }; + + if let Some(connection_reference) = connection_slab.borrow_mut().get_mut(connection_id.0) { + connection_reference.peer_addr = Some(peer_addr); + } + if let Some(tls_config) = opt_tls_config { let tls_acceptor: TlsAcceptor = tls_config.into(); @@ -341,6 +346,49 @@ async fn run_connection( } } +async fn parse_x_forwarded_for_ip(stream: &TcpStream) -> anyhow::Result { + let mut peek_buf = [0u8; 1024]; + + let mut position = 0usize; + + for _ in 0..16 { + let bytes_read = stream + .peek(&mut peek_buf[position..]) + .await + .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; + + position += bytes_read; + + if bytes_read == 0 { + return Err(anyhow::anyhow!( + "zero bytes read while parsing x-forwarded-for" + )); + } + + let mut headers = [httparse::EMPTY_HEADER; 32]; + let mut req = httparse::Request::new(&mut headers); + + if req.parse(&peek_buf)?.is_complete() { + for header in req.headers.iter() { + if header.name == "X-Forwarded-For" { + let ip: IpAddr = ::std::str::from_utf8(header.value)?.parse()?; + + // ip.is_global() { // FIXME + if true { + return Ok(ip); + } + } + } + + break; + } + } + + Err(anyhow::anyhow!( + "Could not determine source IP through X-Forwarded-For headers" + )) +} + async fn run_stream_agnostic_connection< S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, >( From a62b2033a59bdd142751015c1578e70c06798151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 16:12:00 +0200 Subject: [PATCH 07/15] ws: avoid X-Forwarded-For parsing since we only need to know IPv4/IPv6 --- README.md | 2 +- TODO.md | 6 -- aquatic_ws/src/common.rs | 27 ++++++-- aquatic_ws/src/config.rs | 14 ++-- aquatic_ws/src/workers/socket.rs | 115 ++++++++----------------------- aquatic_ws/src/workers/swarm.rs | 23 +++---- 6 files changed, 65 insertions(+), 122 deletions(-) diff --git a/README.md b/README.md index e35c7b1..f21c53b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ of sub-implementations for different protocols: |--------------|--------------------------------------------|------------------------------| | aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) | | aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) | -| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) | +| aquatic_ws | [WebTorrent] over TLS ([rustls], optional) | Linux 5.8+ (using [glommio]) | Features at a glance: diff --git a/TODO.md b/TODO.md index 76e10ee..5396993 100644 --- a/TODO.md +++ b/TODO.md @@ -2,12 +2,6 @@ ## High priority -* ws - * reverse proxy support for non-TLS connections (parse x-forwarded-for) - * how does this interact with IPv4/IPv6 differentiation? - * is it possible to skip determining peer IP's altogether or is it necessary - for IPv4/IPv6 separation / preventing peer mixups or abuse? - ## Medium priority * quit whole program if any thread panics diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index 27aa134..a099e68 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -1,11 +1,28 @@ -use std::sync::Arc; +use std::{net::IpAddr, sync::Arc}; use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; use aquatic_ws_protocol::{InfoHash, PeerId}; +#[derive(Copy, Clone, Debug)] +pub enum IpVersion { + V4, + V6, +} + +impl IpVersion { + pub fn canonical_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => Self::V4, + IpAddr::V6(addr) => match addr.octets() { + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, _, _, _, _] => Self::V4, + _ => Self::V6, + }, + } + } +} + #[derive(Default, Clone)] pub struct State { pub access_list: Arc, @@ -17,7 +34,7 @@ pub struct PendingScrapeId(pub usize); #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub struct ConnectionId(pub usize); #[derive(Clone, Copy, Debug)] @@ -26,7 +43,7 @@ pub struct ConnectionMeta { /// sending back response through correct channel to correct worker. pub out_message_consumer_id: ConsumerId, pub connection_id: ConnectionId, - pub peer_addr: CanonicalSocketAddr, + pub ip_version: IpVersion, pub pending_scrape_id: Option, } @@ -35,6 +52,6 @@ pub enum SwarmControlMessage { ConnectionClosed { info_hash: InfoHash, peer_id: PeerId, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, }, } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index df93432..2026bd4 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -9,6 +9,9 @@ use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration +/// +/// Running behind a reverse proxy is supported, but IPv4 peer requests have +/// to be proxied to IPv4 requests, and IPv6 requests to IPv6 requests. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct Config { @@ -70,13 +73,8 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, - /// Trust X-Forwarded-For headers to get peer IP. Only use this if you are - /// running aquatic_ws behind a reverse proxy that sets them and your - /// instance is not accessible by other means. - pub trust_x_forwarded_for: bool, - - /// Return a HTTP 200 Ok response when receiving GET /health, but only - /// when not running over TLS + /// Return a HTTP 200 Ok response when receiving GET /health. Can not be + /// combined with enable_tls. pub enable_http_health_checks: bool, } @@ -94,8 +92,6 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, - trust_x_forwarded_for: false, - enable_http_health_checks: false, } } diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 136a604..14ba2bb 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -1,7 +1,6 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; -use std::net::{IpAddr, SocketAddr}; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; @@ -11,7 +10,7 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; +use aquatic_common::PanicSentinel; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -48,8 +47,7 @@ struct ConnectionReference { valid_until: ValidUntil, peer_id: Option, announced_info_hashes: HashSet, - /// May need to be parsed from X-Forwarded-For headers - peer_addr: Option, + ip_version: IpVersion, } pub async fn run_socket_worker( @@ -116,6 +114,15 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { + let ip_version = match stream.peer_addr() { + Ok(addr) => IpVersion::canonical_from_ip(addr.ip()), + Err(err) => { + ::log::info!("could not extract ip version (v4 or v6): {:#}", err); + + continue; + } + }; + let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let out_message_sender = Rc::new(out_message_sender); @@ -125,7 +132,7 @@ pub async fn run_socket_worker( valid_until: ValidUntil::new(config.cleaning.max_connection_idle), peer_id: None, announced_info_hashes: Default::default(), - peer_addr: None, + ip_version, }); ::log::info!("accepting stream, assigning id {}", key); @@ -143,6 +150,7 @@ pub async fn run_socket_worker( out_message_consumer_id, ConnectionId(key), opt_tls_config, + ip_version, stream, ).await { ::log::debug!("connection error: {:#}", err); @@ -156,12 +164,12 @@ pub async fn run_socket_worker( // Tell swarm workers to remove peer if let Some(reference) = opt_reference { - if let (Some(peer_id), Some(peer_addr)) = (reference.peer_id, reference.peer_addr) { + if let Some(peer_id) = reference.peer_id { for info_hash in reference.announced_info_hashes { let message = SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr, + ip_version: reference.ip_version, }; let consumer_index = @@ -259,29 +267,9 @@ async fn run_connection( out_message_consumer_id: ConsumerId, connection_id: ConnectionId, opt_tls_config: Option>, + ip_version: IpVersion, mut stream: TcpStream, ) -> anyhow::Result<()> { - let remote_addr = stream - .peer_addr() - .map_err(|err| anyhow::anyhow!("could not extract peer address: {:#}", err))?; - - let peer_addr = if config.network.trust_x_forwarded_for { - let ip = parse_x_forwarded_for_ip(&stream).await?; - // Using the reverse proxy connection port here should be fine, since - // we only use the CanonicalPeerAddr to differentiate connections from - // each other as well as to determine if they run on IPv4 or IPv6, - // not for sending responses or passing on to peers. - let port = remote_addr.port(); - - CanonicalSocketAddr::new(SocketAddr::new(ip, port)) - } else { - CanonicalSocketAddr::new(remote_addr) - }; - - if let Some(connection_reference) = connection_slab.borrow_mut().get_mut(connection_id.0) { - connection_reference.peer_addr = Some(peer_addr); - } - if let Some(tls_config) = opt_tls_config { let tls_acceptor: TlsAcceptor = tls_config.into(); @@ -299,10 +287,14 @@ async fn run_connection( out_message_consumer_id, connection_id, stream, - peer_addr, + ip_version, ) .await } else { + // Implementing this over TLS is too cumbersome, since the crate used + // for TLS streams doesn't support peak and tungstenite doesn't + // properly support sending a HTTP error response in accept_hdr + // callback. if config.network.enable_http_health_checks { let mut peek_buf = [0u8; 11]; @@ -340,55 +332,12 @@ async fn run_connection( out_message_consumer_id, connection_id, stream, - peer_addr, + ip_version, ) .await } } -async fn parse_x_forwarded_for_ip(stream: &TcpStream) -> anyhow::Result { - let mut peek_buf = [0u8; 1024]; - - let mut position = 0usize; - - for _ in 0..16 { - let bytes_read = stream - .peek(&mut peek_buf[position..]) - .await - .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; - - position += bytes_read; - - if bytes_read == 0 { - return Err(anyhow::anyhow!( - "zero bytes read while parsing x-forwarded-for" - )); - } - - let mut headers = [httparse::EMPTY_HEADER; 32]; - let mut req = httparse::Request::new(&mut headers); - - if req.parse(&peek_buf)?.is_complete() { - for header in req.headers.iter() { - if header.name == "X-Forwarded-For" { - let ip: IpAddr = ::std::str::from_utf8(header.value)?.parse()?; - - // ip.is_global() { // FIXME - if true { - return Ok(ip); - } - } - } - - break; - } - } - - Err(anyhow::anyhow!( - "Could not determine source IP through X-Forwarded-For headers" - )) -} - async fn run_stream_agnostic_connection< S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, >( @@ -403,7 +352,7 @@ async fn run_stream_agnostic_connection< out_message_consumer_id: ConsumerId, connection_id: ConnectionId, stream: S, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, ) -> anyhow::Result<()> { let ws_config = tungstenite::protocol::WebSocketConfig { max_frame_size: Some(config.network.websocket_max_frame_size), @@ -429,7 +378,7 @@ async fn run_stream_agnostic_connection< pending_scrape_slab, out_message_consumer_id, ws_in, - peer_addr, + ip_version, connection_id, }; @@ -450,7 +399,6 @@ async fn run_stream_agnostic_connection< connection_slab, ws_out, pending_scrape_slab, - peer_addr, connection_id, }; @@ -475,7 +423,7 @@ struct ConnectionReader { pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, connection_id: ConnectionId, } @@ -658,7 +606,7 @@ impl ConnectionReader { ConnectionMeta { connection_id: self.connection_id, out_message_consumer_id: self.out_message_consumer_id, - peer_addr: self.peer_addr, + ip_version: self.ip_version, pending_scrape_id, } } @@ -670,7 +618,6 @@ struct ConnectionWriter { connection_slab: Rc>>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, - peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } @@ -681,10 +628,6 @@ impl ConnectionWriter { anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed") })?; - if meta.peer_addr != self.peer_addr { - return Err(anyhow::anyhow!("peer addresses didn't match")); - } - match out_message { OutMessage::ScrapeResponse(out_message) => { let pending_scrape_id = meta @@ -753,11 +696,7 @@ impl ConnectionWriter { } Ok(Err(err)) => Err(err.into()), Err(err) => { - ::log::info!( - "send_out_message: send to {} took to long: {}", - self.peer_addr.get(), - err - ); + ::log::info!("send_out_message: sending to peer took to long: {}", err); Ok(()) } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 0ea07ac..22a9f02 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -208,14 +208,11 @@ where SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr, + ip_version, } => { - ::log::debug!( - "Removing peer {} from torrents because connection was closed", - peer_addr.get() - ); + ::log::debug!("Removing peer from torrents because connection was closed"); - if peer_addr.is_ipv4() { + if let IpVersion::V4 = ip_version { if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) { torrent_data.remove_peer(peer_id); } @@ -305,18 +302,18 @@ fn handle_announce_request( request_sender_meta: ConnectionMeta, request: AnnounceRequest, ) { - let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() { + let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version { torrent_maps.ipv4.entry(request.info_hash).or_default() } else { torrent_maps.ipv6.entry(request.info_hash).or_default() }; - // If there is already a peer with this peer_id, check that socket - // addr is same as that of request sender. Otherwise, ignore request. - // Since peers have access to each others peer_id's, they could send - // requests using them, causing all sorts of issues. + // If there is already a peer with this peer_id, check that connection id + // is same as that of request sender. Otherwise, ignore request. Since + // peers have access to each others peer_id's, they could send requests + // using them, causing all sorts of issues. if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { - if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr { + if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id { return; } } @@ -454,7 +451,7 @@ fn handle_scrape_request( files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() { + let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version { &mut torrent_maps.ipv4 } else { &mut torrent_maps.ipv6 From 2313ca77ec8432cf69b6b6ae24efd6dc28a039fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 16:15:29 +0200 Subject: [PATCH 08/15] http: document that running behind a reverse proxy is not supported --- aquatic_http/src/config.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index e4a6a35..517d75b 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -10,6 +10,8 @@ use serde::Deserialize; use aquatic_common::cli::LogLevel; /// aquatic_http configuration +/// +/// Does not support running behind a reverse proxy. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct Config { From 9f60c4e46014c75bf841fee1f7b9ad7bbbfd109c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 16:16:25 +0200 Subject: [PATCH 09/15] Run cargo update Updating bytes v1.1.0 -> v1.2.0 Updating slab v0.4.6 -> v0.4.7 --- Cargo.lock | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e659a3..be3fcea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -588,9 +588,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "cache-padded" @@ -2459,9 +2459,12 @@ checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee" [[package]] name = "slab" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] [[package]] name = "smallvec" From 304ff0a8eeaa2b1c0bc4c407332e84d24d19e8c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 16:26:46 +0200 Subject: [PATCH 10/15] GitHub CI: speed up test compilation --- .github/workflows/cargo-build-and-test.yml | 2 +- Cargo.toml | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cargo-build-and-test.yml b/.github/workflows/cargo-build-and-test.yml index bd68943..4f3ede3 100644 --- a/.github/workflows/cargo-build-and-test.yml +++ b/.github/workflows/cargo-build-and-test.yml @@ -29,7 +29,7 @@ jobs: cargo build --verbose -p aquatic_http cargo build --verbose -p aquatic_ws - name: Run tests - run: cargo test --verbose --workspace --all-targets + run: cargo test --verbose --workspace --profile "test-fast" build-macos: diff --git a/Cargo.toml b/Cargo.toml index 2092976..0a7239c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,7 @@ inherits = "release-debug" [profile.release-debug] inherits = "release" debug = true + +[profile.test-fast] +inherits = "release" +lto = false From 355f3e04c691725a7af45330339fe15e78208108 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 17:01:07 +0200 Subject: [PATCH 11/15] Update ws code comments; update TODO.md --- TODO.md | 3 +++ aquatic_ws/src/workers/socket.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/TODO.md b/TODO.md index 5396993..1cc19ae 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,9 @@ ## High priority +* ws + * add integration test for non-TLS configuration, maybe behind reverse proxy + ## Medium priority * quit whole program if any thread panics diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 14ba2bb..fdfd5a9 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -292,7 +292,7 @@ async fn run_connection( .await } else { // Implementing this over TLS is too cumbersome, since the crate used - // for TLS streams doesn't support peak and tungstenite doesn't + // for TLS streams doesn't support peek and tungstenite doesn't // properly support sending a HTTP error response in accept_hdr // callback. if config.network.enable_http_health_checks { From 72c66e6e1a78fbe059398e09d92b13b994008445 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 17:12:39 +0200 Subject: [PATCH 12/15] GitHub CI: add transfer test for non-TLS WebTorrent --- .github/actions/test-transfer/entrypoint.sh | 102 ++++++++++++++------ 1 file changed, 75 insertions(+), 27 deletions(-) diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index ea27737..2fab330 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -85,8 +85,16 @@ address = '127.0.0.1:3002' enable_tls = true tls_certificate_path = './cert.crt' tls_private_key_path = './key.pk8' +" > ws-tls.toml +./target/debug/aquatic ws -c ws-tls.toml > "$HOME/ws-tls.log" 2>&1 & + +echo "log_level = 'trace' + +[network] +address = '127.0.0.1:3003' +enable_http_health_checks = true " > ws.toml -./target/debug/aquatic ws -c ws.toml > "$HOME/wss.log" 2>&1 & +./target/debug/aquatic ws -c ws.toml > "$HOME/ws.log" 2>&1 & # Setup directories @@ -101,21 +109,30 @@ mkdir torrents # echo "http-test-ipv4" > seed/http-test-ipv4 echo "tls-test-ipv4" > seed/tls-test-ipv4 echo "udp-test-ipv4" > seed/udp-test-ipv4 -echo "wss-test-ipv4" > seed/wss-test-ipv4 +echo "ws-tls-test-ipv4" > seed/ws-tls-test-ipv4 +echo "ws-test-ipv4" > seed/ws-test-ipv4 # mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4" mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4" mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4" -mktorrent -p -o "torrents/wss-ipv4.torrent" -a "wss://example.com:3002" "seed/wss-test-ipv4" +mktorrent -p -o "torrents/ws-tls-ipv4.torrent" -a "wss://example.com:3002" "seed/ws-tls-test-ipv4" +mktorrent -p -o "torrents/ws-ipv4.torrent" -a "ws://example.com:3003" "seed/ws-test-ipv4" cp -r torrents torrents-seed cp -r torrents torrents-leech -# Setup wss seeding client +# Setup ws-tls seeding client -echo "Starting seeding wss client" +echo "Starting seeding ws-tls (wss) client" cd seed -GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/wss-ipv4.torrent > "$HOME/wss-seed.log" 2>&1 & +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/ws-tls-ipv4.torrent > "$HOME/ws-tls-seed.log" 2>&1 & +cd .. + +# Setup ws seeding client + +echo "Starting seeding ws client" +cd seed +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/ws-ipv4.torrent > "$HOME/ws-seed.log" 2>&1 & cd .. # Start seeding rtorrent client @@ -139,9 +156,14 @@ schedule2 = watch_directory,5,5,load.start=$HOME/torrents-leech/*.torrent" > ~/. echo "Starting leeching client.." screen -dmS rtorrent-leech rtorrent -echo "Starting leeching wss client" +echo "Starting leeching ws-tls (wss) client" cd leech -GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43000" ../torrents/wss-ipv4.torrent > "$HOME/wss-leech.log" 2>&1 & +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43000" ../torrents/ws-tls-ipv4.torrent > "$HOME/ws-tls-leech.log" 2>&1 & +cd .. + +echo "Starting leeching ws client" +cd leech +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43001" ../torrents/ws-ipv4.torrent > "$HOME/ws-leech.log" 2>&1 & cd .. # Check for completion @@ -149,7 +171,8 @@ cd .. # HTTP_IPv4="Ok" TLS_IPv4="Failed" UDP_IPv4="Failed" -WSS_IPv4="Failed" +WS_TLS_IPv4="Failed" +WS_IPv4="Failed" i="0" @@ -181,17 +204,24 @@ do fi fi fi - if test -f "leech/wss-test-ipv4"; then - if grep -q "wss-test-ipv4" "leech/wss-test-ipv4"; then - if [ "$WSS_IPv4" != "Ok" ]; then - WSS_IPv4="Ok" - echo "WSS_IPv4 is Ok" + if test -f "leech/ws-tls-test-ipv4"; then + if grep -q "ws-tls-test-ipv4" "leech/ws-tls-test-ipv4"; then + if [ "$WS_TLS_IPv4" != "Ok" ]; then + WS_TLS_IPv4="Ok" + echo "WS_TLS_IPv4 is Ok" + fi + fi + fi + if test -f "leech/ws-test-ipv4"; then + if grep -q "ws-test-ipv4" "leech/ws-test-ipv4"; then + if [ "$WS_IPv4" != "Ok" ]; then + WS_IPv4="Ok" + echo "WS_IPv4 is Ok" fi fi fi - # if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then - if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then + if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WS_TLS_IPv4" = "Ok" ] && [ "$WS_IPv4" = "Ok" ]; then break fi @@ -205,7 +235,8 @@ echo "Waited for $i seconds" # echo "::set-output name=http_ipv4::$HTTP_IPv4" echo "::set-output name=http_tls_ipv4::$TLS_IPv4" echo "::set-output name=udp_ipv4::$UDP_IPv4" -echo "::set-output name=wss_ipv4::$WSS_IPv4" +echo "::set-output name=ws_tls_ipv4::$WS_TLS_IPv4" +echo "::set-output name=ws_ipv4::$WS_IPv4" # echo "" # echo "# --- HTTP log --- #" @@ -226,31 +257,48 @@ cat "udp.log" sleep 1 echo "" -echo "# --- WSS tracker log --- #" -cat "wss.log" +echo "# --- WS over TLS tracker log --- #" +cat "ws-tls.log" sleep 1 echo "" -echo "# --- WSS seed log --- #" -cat "wss-seed.log" +echo "# --- WS tracker log --- #" +cat "ws.log" sleep 1 echo "" -echo "# --- WSS leech log --- #" -cat "wss-leech.log" +echo "# --- WS over TLS seed log --- #" +cat "ws-tls-seed.log" + +sleep 1 + +echo "" +echo "# --- WS over TLS leech log --- #" +cat "ws-tls-leech.log" + +sleep 1 + +echo "" +echo "# --- WS seed log --- #" +cat "ws-seed.log" + +sleep 1 + +echo "" +echo "# --- WS leech log --- #" +cat "ws-leech.log" sleep 1 echo "" echo "# --- Test results --- #" -# echo "HTTP (IPv4): $HTTP_IPv4" echo "HTTP over TLS (IPv4): $TLS_IPv4" echo "UDP (IPv4): $UDP_IPv4" -echo "WSS (IPv4): $WSS_IPv4" +echo "WSS (IPv4): $WS_TLS_IPv4" +echo "WS (IPv4): $WS_IPv4" -# if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then -if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then +if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WS_TLS_IPv4" != "Ok" ] || [ "$WS_IPv4" != "Ok" ]; then exit 1 fi From 7b06bf873600ca95d16e64782c96c8e44732509c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 17:29:13 +0200 Subject: [PATCH 13/15] ws: don't attempt to parse TLS files when running without TLS --- aquatic_http/src/config.rs | 2 +- aquatic_ws/src/config.rs | 2 +- aquatic_ws/src/lib.rs | 11 ++++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 517d75b..95bc281 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -10,7 +10,7 @@ use serde::Deserialize; use aquatic_common::cli::LogLevel; /// aquatic_http configuration -/// +/// /// Does not support running behind a reverse proxy. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 2026bd4..b295aa3 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -9,7 +9,7 @@ use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration -/// +/// /// Running behind a reverse proxy is supported, but IPv4 peer requests have /// to be proxied to IPv4 requests, and IPv6 requests to IPv6 requests. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index ee75e86..927c810 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -47,13 +47,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let opt_tls_config = config - .network - .enable_tls - .then_some(Arc::new(create_rustls_config( + let opt_tls_config = if config.network.enable_tls { + Some(Arc::new(create_rustls_config( &config.network.tls_certificate_path, &config.network.tls_private_key_path, - )?)); + )?)) + } else { + None + }; let mut executors = Vec::new(); From 72552022681b140e024e7ff67fad32d44c044534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 17:30:58 +0200 Subject: [PATCH 14/15] ws: add error message context when loading certificate files --- aquatic_ws/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 927c810..7d48a08 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -4,6 +4,7 @@ pub mod workers; use std::sync::Arc; +use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; @@ -51,7 +52,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Some(Arc::new(create_rustls_config( &config.network.tls_certificate_path, &config.network.tls_private_key_path, - )?)) + ).with_context(|| "create rustls config")?)) } else { None }; From 05f5d2a2a4f90d04ce6280a6630873e36b30a085 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Jul 2022 17:50:38 +0200 Subject: [PATCH 15/15] http_private: update sqlx to v0.6 --- Cargo.lock | 127 ++++++++++---------------------- aquatic_http_private/Cargo.toml | 2 +- 2 files changed, 38 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be3fcea..7e83c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,7 +73,7 @@ dependencies = [ "log", "privdrop", "rand", - "rustls 0.20.6", + "rustls", "rustls-pemfile", "serde", "simple_logger", @@ -130,7 +130,7 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", - "rustls 0.20.6", + "rustls", "serde", ] @@ -150,13 +150,13 @@ dependencies = [ "log", "mimalloc", "rand", - "rustls 0.20.6", + "rustls", "serde", "signal-hook", "socket2 0.4.4", "sqlx", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", ] [[package]] @@ -302,7 +302,7 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", - "rustls 0.20.6", + "rustls", "rustls-pemfile", "serde", "signal-hook", @@ -329,7 +329,7 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", - "rustls 0.20.6", + "rustls", "serde", "serde_json", "tungstenite", @@ -403,9 +403,9 @@ dependencies = [ [[package]] name = "atoi" -version = "0.4.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" dependencies = [ "num-traits", ] @@ -689,18 +689,18 @@ dependencies = [ [[package]] name = "crc" -version = "2.1.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3" dependencies = [ "crc-catalog", ] [[package]] name = "crc-catalog" -version = "1.1.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" +checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" [[package]] name = "criterion" @@ -1103,8 +1103,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01fe9932a224b72b45336d96040aa86386d674a31d0af27d800ea7bc8ca97fe" dependencies = [ "futures-io", - "rustls 0.20.6", - "webpki 0.22.0", + "rustls", + "webpki", ] [[package]] @@ -1256,15 +1256,6 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1277,11 +1268,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" dependencies = [ - "hashbrown 0.11.2", + "hashbrown 0.12.3", ] [[package]] @@ -1729,9 +1720,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.3.3" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" dependencies = [ "autocfg", "num-integer", @@ -2237,19 +2228,6 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.20.6" @@ -2258,8 +2236,8 @@ checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" dependencies = [ "log", "ring", - "sct 0.7.0", - "webpki 0.22.0", + "sct", + "webpki", ] [[package]] @@ -2298,16 +2276,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -2542,9 +2510,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551873805652ba0d912fec5bbb0f8b4cdd96baf8e2ebf5970e5671092966019b" +checksum = "1f82cbe94f41641d6c410ded25bbf5097c240cefdf8e3b06d04198d0a96af6a4" dependencies = [ "sqlx-core", "sqlx-macros", @@ -2552,9 +2520,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5" +checksum = "6b69bf218860335ddda60d6ce85ee39f6cf6e5630e300e19757d1de15886a093" dependencies = [ "ahash", "atoi", @@ -2584,7 +2552,8 @@ dependencies = [ "percent-encoding", "rand", "rsa", - "rustls 0.19.1", + "rustls", + "rustls-pemfile", "sha-1", "sha2", "smallvec", @@ -2594,15 +2563,14 @@ dependencies = [ "thiserror", "tokio-stream", "url", - "webpki 0.21.4", "webpki-roots", ] [[package]] name = "sqlx-macros" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc0fba2b0cae21fc00fe6046f8baa4c7fcb49e379f0f592b04696607f69ed2e1" +checksum = "f40c63177cf23d356b159b60acd27c54af7423f1736988502e36bae9a712118f" dependencies = [ "dotenv", "either", @@ -2619,13 +2587,13 @@ dependencies = [ [[package]] name = "sqlx-rt" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db708cd3e459078f85f39f96a00960bd841f66ee2a669e90bf36907f5a79aae" +checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f" dependencies = [ "once_cell", "tokio", - "tokio-rustls 0.22.0", + "tokio-rustls", ] [[package]] @@ -2793,26 +2761,15 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-rustls" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" -dependencies = [ - "rustls 0.19.1", - "tokio", - "webpki 0.21.4", -] - [[package]] name = "tokio-rustls" version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.6", + "rustls", "tokio", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -3136,16 +3093,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki" version = "0.22.0" @@ -3158,11 +3105,11 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.21.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf" dependencies = [ - "webpki 0.21.4", + "webpki", ] [[package]] diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index 1994dbe..31b4d29 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -31,6 +31,6 @@ rustls = "0.20" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } socket2 = { version = "0.4", features = ["all"] } -sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } +sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "mysql" ] } tokio = { version = "1", features = ["full"] } tokio-rustls = "0.23"