diff --git a/.github/workflows/cargo-build-and-test.yml b/.github/workflows/cargo-build-and-test.yml index 44172a9..bd68943 100644 --- a/.github/workflows/cargo-build-and-test.yml +++ b/.github/workflows/cargo-build-and-test.yml @@ -12,7 +12,7 @@ env: jobs: build-test-linux: runs-on: ubuntu-latest - timeout-minutes: 20 + timeout-minutes: 25 steps: - uses: actions/checkout@v2 - name: Install latest stable Rust diff --git a/Cargo.lock b/Cargo.lock index 2420b84..879d688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,7 +211,7 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", - "constant_time_eq 0.2.2", + "constant_time_eq 0.2.3", "crossbeam-channel", "getrandom", "hashbrown 0.12.1", @@ -434,9 +434,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.9" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d590cacd53140ff87cc2e192eb22fc3dc23c5b3f93b0d4f020677f98e8c629" +checksum = "c2cc6e8e8c993cb61a005fab8c1e5093a29199b7253b05a6883999312935c1ff" dependencies = [ "async-trait", "axum-core", @@ -676,9 +676,9 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "constant_time_eq" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e31aa570361918e61453e3b5377976b23e4599e8bb5b840380ecd3a20e691d2" +checksum = "8b96535642698e10693d204abac955eb269a05ea99f8363ef70ab54cfe439337" [[package]] name = "cpufeatures" @@ -821,9 +821,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8" +checksum = "5999502d32b9c48d492abe66392408144895020ec4709e549e840799f3bb74c0" dependencies = [ "generic-array", "typenum", @@ -891,9 +891,9 @@ dependencies = [ [[package]] name = "either" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +checksum = "3f107b87b6afc2a64fd13cac55fe06d6c8859f12d4b14cbcdd2c67d0976781be" [[package]] name = "enclose" @@ -1927,18 +1927,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e" +checksum = "78203e83c48cffbe01e4a2d35d566ca4de445d79a85372fc64e378bfc812a260" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb" +checksum = "710faf75e1b33345361201d36d04e98ac1ed8909151a017ed384700836104c74" dependencies = [ "proc-macro2", "quote", @@ -2331,15 +2331,15 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.10" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41d061efea015927ac527063765e73601444cdc344ba855bc7bd44578b25e1c" +checksum = "a2333e6df6d6598f2b1974829f853c2b4c5f4a6e503c10af918081aa6f8564e1" [[package]] name = "serde" -version = "1.0.137" +version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +checksum = "1578c6245786b9d168c5447eeacfb96856573ca56c9d68fdcf394be134882a47" dependencies = [ "serde_derive", ] @@ -2375,9 +2375,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.137" +version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +checksum = "023e9b1467aef8a10fb88f25611870ada9800ef7e22afce356bb0d2387b6f27c" dependencies = [ "proc-macro2", "quote", @@ -2386,9 +2386,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" +checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" dependencies = [ "itoa 1.0.2", "ryu", @@ -2438,9 +2438,9 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3868f37d8473eb1410c71a8b88a5411778f72ae82bd72340b2355d1c133f2b6a" +checksum = "d67c573ee0994adb422b5f9ea7c55693ee943c39f4eb20aa76475a5b7039bb87" dependencies = [ "halfbrown", "serde", @@ -2482,9 +2482,9 @@ checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "smallvec" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc88c725d61fc6c3132893370cac4a0200e3fedf5da8331c570664b1987f5ca2" +checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" [[package]] name = "smartstring" @@ -2910,9 +2910,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.21" +version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" +checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" dependencies = [ "proc-macro2", "quote", @@ -2973,9 +2973,9 @@ checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" [[package]] name = "unicode-normalization" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dee68f85cab8cf68dec42158baf3a79a1cdc065a8b103025965d6ccb7f6cbd" +checksum = "854cbdc4f7bc6ae19c820d44abdc3277ac3e1b2b93db20a636825d9322fb60e6" dependencies = [ "tinyvec", ] @@ -3266,6 +3266,6 @@ checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" [[package]] name = "zeroize" -version = "1.5.5" +version = "1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94693807d016b2f2d2e14420eb3bfcca689311ff775dcf113d74ea624b7cdf07" +checksum = "20b578acffd8516a6c3f2a1bdefc1ec37e547bb4e0fb8b6b01a4cafc886b4442" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 8088efc..51b2ef4 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -8,6 +8,7 @@ description = "High-performance open UDP BitTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" keywords = ["udp", "server", "peer-to-peer", "torrent", "bittorrent"] readme = "../README.md" +rust-version = "1.62" [lib] name = "aquatic_udp" diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 923bd22..0bd6f11 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -1,14 +1,10 @@ use std::collections::BTreeMap; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::time::Instant; -use anyhow::Context; -use constant_time_eq::constant_time_eq; use crossbeam_channel::{Sender, TrySendError}; -use getrandom::getrandom; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::CanonicalSocketAddr; @@ -18,91 +14,6 @@ use crate::config::Config; pub const BUFFER_SIZE: usize = 8192; -/// HMAC (BLAKE3) based ConnectionID creator and validator -/// -/// Structure of created ConnectionID (bytes making up inner i64): -/// - &[0..4]: connection expiration time as number of seconds after -/// ConnectionValidator instance was created, encoded as u32 bytes. -/// Value fits around 136 years. -/// - &[4..8]: truncated keyed BLAKE3 hash of above 4 bytes and octets of -/// client IP address -/// -/// The purpose of using ConnectionIDs is to prevent IP spoofing, mainly to -/// prevent the tracker from being used as an amplification vector for DDoS -/// attacks. By including 32 bits of BLAKE3 keyed hash output in its contents, -/// such abuse should be rendered impractical. -#[derive(Clone)] -pub struct ConnectionValidator { - start_time: Instant, - max_connection_age: u32, - keyed_hasher: blake3::Hasher, -} - -impl ConnectionValidator { - /// Create new instance. Must be created once and cloned if used in several - /// threads. - pub fn new(config: &Config) -> anyhow::Result { - let mut key = [0; 32]; - - getrandom(&mut key) - .with_context(|| "Couldn't get random bytes for ConnectionValidator key")?; - - let keyed_hasher = blake3::Hasher::new_keyed(&key); - - Ok(Self { - keyed_hasher, - start_time: Instant::now(), - max_connection_age: config.cleaning.max_connection_age, - }) - } - - pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId { - let valid_until = - (self.start_time.elapsed().as_secs() as u32 + self.max_connection_age).to_ne_bytes(); - - let hash = self.hash(valid_until, source_addr.get().ip()); - - let mut connection_id_bytes = [0u8; 8]; - - (&mut connection_id_bytes[..4]).copy_from_slice(&valid_until); - (&mut connection_id_bytes[4..]).copy_from_slice(&hash); - - ConnectionId(i64::from_ne_bytes(connection_id_bytes)) - } - - pub fn connection_id_valid( - &mut self, - source_addr: CanonicalSocketAddr, - connection_id: ConnectionId, - ) -> bool { - let bytes = connection_id.0.to_ne_bytes(); - let (valid_until, hash) = bytes.split_at(4); - let valid_until: [u8; 4] = valid_until.try_into().unwrap(); - - if !constant_time_eq(hash, &self.hash(valid_until, source_addr.get().ip())) { - return false; - } - - u32::from_ne_bytes(valid_until) > self.start_time.elapsed().as_secs() as u32 - } - - fn hash(&mut self, valid_until: [u8; 4], ip_addr: IpAddr) -> [u8; 4] { - self.keyed_hasher.update(&valid_until); - - match ip_addr { - IpAddr::V4(ip) => self.keyed_hasher.update(&ip.octets()), - IpAddr::V6(ip) => self.keyed_hasher.update(&ip.octets()), - }; - - let mut hash = [0u8; 4]; - - self.keyed_hasher.finalize_xof().fill(&mut hash); - self.keyed_hasher.reset(); - - hash - } -} - #[derive(Debug)] pub struct PendingScrapeRequest { pub slab_key: usize, @@ -274,9 +185,7 @@ impl State { #[cfg(test)] mod tests { - use std::net::{Ipv6Addr, SocketAddr}; - - use quickcheck_macros::quickcheck; + use std::net::Ipv6Addr; use crate::config::Config; @@ -334,42 +243,4 @@ mod tests { assert!(buf.len() <= BUFFER_SIZE); } - - #[quickcheck] - fn test_connection_validator( - original_addr: IpAddr, - different_addr: IpAddr, - max_connection_age: u32, - ) -> quickcheck::TestResult { - let original_addr = CanonicalSocketAddr::new(SocketAddr::new(original_addr, 0)); - let different_addr = CanonicalSocketAddr::new(SocketAddr::new(different_addr, 0)); - - if original_addr == different_addr { - return quickcheck::TestResult::discard(); - } - - let mut validator = { - let mut config = Config::default(); - - config.cleaning.max_connection_age = max_connection_age; - - ConnectionValidator::new(&config).unwrap() - }; - - let connection_id = validator.create_connection_id(original_addr); - - let original_valid = validator.connection_id_valid(original_addr, connection_id); - let different_valid = validator.connection_id_valid(different_addr, connection_id); - - if different_valid { - return quickcheck::TestResult::failed(); - } - - if max_connection_age == 0 { - quickcheck::TestResult::from_bool(!original_valid) - } else { - // Note: depends on that running this test takes less than a second - quickcheck::TestResult::from_bool(original_valid) - } - } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 1214eee..858a70a 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -86,7 +86,7 @@ pub struct NetworkConfig { pub socket_recv_buffer_size: usize, pub poll_event_capacity: usize, pub poll_timeout_ms: u64, - /// Store this many responses at most for retryin on send failure + /// Store this many responses at most for retrying (once) on send failure /// /// Useful on operating systems that do not provide an udp send buffer, /// such as FreeBSD. Setting the value to zero disables resending diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 80ba09f..aa66e11 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -17,10 +17,10 @@ use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::PanicSentinelWatcher; use common::{ - ConnectedRequestSender, ConnectedResponseSender, ConnectionValidator, RequestWorkerIndex, - SocketWorkerIndex, State, + ConnectedRequestSender, ConnectedResponseSender, RequestWorkerIndex, SocketWorkerIndex, State, }; use config::Config; +use workers::socket::validator::ConnectionValidator; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index bc48932..fd45296 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -1,6 +1,7 @@ mod requests; mod responses; mod storage; +pub mod validator; use std::time::{Duration, Instant}; @@ -22,8 +23,7 @@ use crate::config::Config; use requests::read_requests; use responses::send_responses; use storage::PendingScrapeResponseSlab; - -use self::responses::send_responses_with_resends; +use validator::ConnectionValidator; pub fn run_socket_worker( _sentinel: PanicSentinel, @@ -52,7 +52,7 @@ pub fn run_socket_worker( let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); - let mut resend_buffer: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); + let mut opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new()); let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); @@ -63,7 +63,6 @@ pub fn run_socket_worker( let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; - let response_resending_active = config.network.resend_buffer_max_len > 0; loop { poll.poll(&mut events, Some(poll_timeout)) @@ -88,28 +87,16 @@ pub fn run_socket_worker( } } - if response_resending_active { - send_responses_with_resends( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - &mut resend_buffer, - ); - } else { - send_responses( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - ); - } + send_responses( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + &mut opt_resend_buffer, + ); // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { diff --git a/aquatic_udp/src/workers/socket/requests.rs b/aquatic_udp/src/workers/socket/requests.rs index 610f0ae..779f080 100644 --- a/aquatic_udp/src/workers/socket/requests.rs +++ b/aquatic_udp/src/workers/socket/requests.rs @@ -10,6 +10,7 @@ use crate::common::*; use crate::config::Config; use super::storage::PendingScrapeResponseSlab; +use super::validator::ConnectionValidator; pub fn read_requests( config: &Config, @@ -30,9 +31,15 @@ pub fn read_requests( loop { match socket.recv_from(&mut buffer[..]) { - Ok((amt, src)) => { + Ok((bytes_read, src)) => { + if src.port() == 0 { + ::log::info!("Ignored request from {} because source port is zero", src); + + continue; + } + let res_request = - Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); + Request::from_bytes(&buffer[..bytes_read], config.protocol.max_scrape_torrents); let src = CanonicalSocketAddr::new(src); @@ -41,12 +48,12 @@ pub fn read_requests( if res_request.is_ok() { requests_received_ipv4 += 1; } - bytes_received_ipv4 += amt; + bytes_received_ipv4 += bytes_read; } else { if res_request.is_ok() { requests_received_ipv6 += 1; } - bytes_received_ipv6 += amt; + bytes_received_ipv6 += bytes_read; } handle_request( diff --git a/aquatic_udp/src/workers/socket/responses.rs b/aquatic_udp/src/workers/socket/responses.rs index 998380a..6ed78d1 100644 --- a/aquatic_udp/src/workers/socket/responses.rs +++ b/aquatic_udp/src/workers/socket/responses.rs @@ -22,9 +22,24 @@ pub fn send_responses( response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, CanonicalSocketAddr)>, + opt_resend_buffer: &mut Option>, ) { + if let Some(resend_buffer) = opt_resend_buffer { + for (response, addr) in resend_buffer.drain(..) { + send_response(state, config, socket, buffer, response, addr, &mut None); + } + } + for (response, addr) in local_responses { - let _ = send_response(state, config, socket, buffer, &response, addr); + send_response( + state, + config, + socket, + buffer, + response, + addr, + opt_resend_buffer, + ); } for (response, addr) in response_receiver.try_iter() { @@ -37,132 +52,92 @@ pub fn send_responses( }; if let Some(response) = opt_response { - let _ = send_response(state, config, socket, buffer, &response, addr); + send_response( + state, + config, + socket, + buffer, + response, + addr, + opt_resend_buffer, + ); } } } -pub fn send_responses_with_resends( - state: &State, - config: &Config, - socket: &mut UdpSocket, - buffer: &mut [u8], - response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - local_responses: Drain<(Response, CanonicalSocketAddr)>, - resend_buffer: &mut Vec<(Response, CanonicalSocketAddr)>, -) { - let resend_buffer_max_len = config.network.resend_buffer_max_len; - - for (response, addr) in resend_buffer.drain(..) { - let _ = send_response(state, config, socket, buffer, &response, addr); - } - - for (response, addr) in local_responses { - match send_response(state, config, socket, buffer, &response, addr) { - Err(err) if error_should_cause_resend(&err) => { - if resend_buffer.len() < resend_buffer_max_len { - resend_buffer.push((response, addr)); - } else { - ::log::warn!("response resend buffer full, dropping response"); - } - } - _ => (), - } - } - - for (response, addr) in response_receiver.try_iter() { - let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses - .add_and_get_finished(r) - .map(Response::Scrape), - ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), - ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), - }; - - if let Some(response) = opt_response { - match send_response(state, config, socket, buffer, &response, addr) { - Err(err) if error_should_cause_resend(&err) => { - if resend_buffer.len() < resend_buffer_max_len { - resend_buffer.push((response, addr)); - } else { - ::log::warn!("response resend buffer full, dropping response"); - } - } - _ => (), - } - } - } -} - -fn error_should_cause_resend(err: &::std::io::Error) -> bool { - (err.raw_os_error() == Some(ENOBUFS)) | (err.kind() == ErrorKind::WouldBlock) -} - fn send_response( state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response: &Response, - addr: CanonicalSocketAddr, -) -> std::io::Result<()> { + response: Response, + canonical_addr: CanonicalSocketAddr, + resend_buffer: &mut Option>, +) { let mut cursor = Cursor::new(buffer); - let canonical_addr_is_ipv4 = addr.is_ipv4(); + if let Err(err) = response.write(&mut cursor) { + ::log::error!("Converting response to bytes failed: {:#}", err); + + return; + } + + let bytes_written = cursor.position() as usize; let addr = if config.network.address.is_ipv4() { - addr.get_ipv4() + canonical_addr + .get_ipv4() .expect("found peer ipv6 address while running bound to ipv4 address") } else { - addr.get_ipv6_mapped() + canonical_addr.get_ipv6_mapped() }; - match response.write(&mut cursor) { - Ok(()) => { - let amt = cursor.position() as usize; + match socket.send_to(&cursor.get_ref()[..bytes_written], addr) { + Ok(amt) if config.statistics.active() => { + let stats = if canonical_addr.is_ipv4() { + &state.statistics_ipv4 + } else { + &state.statistics_ipv6 + }; - match socket.send_to(&cursor.get_ref()[..amt], addr) { - Ok(amt) if config.statistics.active() => { - let stats = if canonical_addr_is_ipv4 { - &state.statistics_ipv4 - } else { - &state.statistics_ipv6 - }; + stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); - stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); - - match response { - Response::Connect(_) => { - stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); - } - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { - stats - .responses_sent_announce - .fetch_add(1, Ordering::Relaxed); - } - Response::Scrape(_) => { - stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); - } - Response::Error(_) => { - stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); - } - } - - Ok(()) + match response { + Response::Connect(_) => { + stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); } - Ok(_) => Ok(()), - Err(err) => { - ::log::warn!("Sending response to {} failed: {:#}", addr, err); - - Err(err) + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats + .responses_sent_announce + .fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); } } } + Ok(_) => (), Err(err) => { - ::log::error!("Converting response to bytes failed: {:#}", err); + match resend_buffer { + Some(resend_buffer) + if (err.raw_os_error() == Some(ENOBUFS)) + || (err.kind() == ErrorKind::WouldBlock) => + { + if resend_buffer.len() < config.network.resend_buffer_max_len { + ::log::info!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); - Err(err) + resend_buffer.push((response, canonical_addr)); + } else { + ::log::warn!("Response resend buffer full, dropping response"); + } + } + _ => { + ::log::warn!("Sending response to {} failed: {:#}", addr, err); + } + } } } } diff --git a/aquatic_udp/src/workers/socket/validator.rs b/aquatic_udp/src/workers/socket/validator.rs new file mode 100644 index 0000000..74e2f76 --- /dev/null +++ b/aquatic_udp/src/workers/socket/validator.rs @@ -0,0 +1,143 @@ +use std::net::IpAddr; +use std::time::Instant; + +use anyhow::Context; +use constant_time_eq::constant_time_eq; +use getrandom::getrandom; + +use aquatic_common::CanonicalSocketAddr; +use aquatic_udp_protocol::ConnectionId; + +use crate::config::Config; + +/// HMAC (BLAKE3) based ConnectionID creator and validator +/// +/// Structure of created ConnectionID (bytes making up inner i64): +/// - &[0..4]: connection expiration time as number of seconds after +/// ConnectionValidator instance was created, encoded as u32 bytes. +/// Value fits around 136 years. +/// - &[4..8]: truncated keyed BLAKE3 hash of above 4 bytes and octets of +/// client IP address +/// +/// The purpose of using ConnectionIDs is to prevent IP spoofing, mainly to +/// prevent the tracker from being used as an amplification vector for DDoS +/// attacks. By including 32 bits of BLAKE3 keyed hash output in its contents, +/// such abuse should be rendered impractical. +#[derive(Clone)] +pub struct ConnectionValidator { + start_time: Instant, + max_connection_age: u32, + keyed_hasher: blake3::Hasher, +} + +impl ConnectionValidator { + /// Create new instance. Must be created once and cloned if used in several + /// threads. + pub fn new(config: &Config) -> anyhow::Result { + let mut key = [0; 32]; + + getrandom(&mut key) + .with_context(|| "Couldn't get random bytes for ConnectionValidator key")?; + + let keyed_hasher = blake3::Hasher::new_keyed(&key); + + Ok(Self { + keyed_hasher, + start_time: Instant::now(), + max_connection_age: config.cleaning.max_connection_age, + }) + } + + pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId { + let valid_until = + (self.start_time.elapsed().as_secs() as u32 + self.max_connection_age).to_ne_bytes(); + + let hash = self.hash(valid_until, source_addr.get().ip()); + + let mut connection_id_bytes = [0u8; 8]; + + (&mut connection_id_bytes[..4]).copy_from_slice(&valid_until); + (&mut connection_id_bytes[4..]).copy_from_slice(&hash); + + ConnectionId(i64::from_ne_bytes(connection_id_bytes)) + } + + pub fn connection_id_valid( + &mut self, + source_addr: CanonicalSocketAddr, + connection_id: ConnectionId, + ) -> bool { + let bytes = connection_id.0.to_ne_bytes(); + let (valid_until, hash) = bytes.split_at(4); + let valid_until: [u8; 4] = valid_until.try_into().unwrap(); + + if !constant_time_eq(hash, &self.hash(valid_until, source_addr.get().ip())) { + return false; + } + + u32::from_ne_bytes(valid_until) > self.start_time.elapsed().as_secs() as u32 + } + + fn hash(&mut self, valid_until: [u8; 4], ip_addr: IpAddr) -> [u8; 4] { + self.keyed_hasher.update(&valid_until); + + match ip_addr { + IpAddr::V4(ip) => self.keyed_hasher.update(&ip.octets()), + IpAddr::V6(ip) => self.keyed_hasher.update(&ip.octets()), + }; + + let mut hash = [0u8; 4]; + + self.keyed_hasher.finalize_xof().fill(&mut hash); + self.keyed_hasher.reset(); + + hash + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + use quickcheck_macros::quickcheck; + + use super::*; + + #[quickcheck] + fn test_connection_validator( + original_addr: IpAddr, + different_addr: IpAddr, + max_connection_age: u32, + ) -> quickcheck::TestResult { + let original_addr = CanonicalSocketAddr::new(SocketAddr::new(original_addr, 0)); + let different_addr = CanonicalSocketAddr::new(SocketAddr::new(different_addr, 0)); + + if original_addr == different_addr { + return quickcheck::TestResult::discard(); + } + + let mut validator = { + let mut config = Config::default(); + + config.cleaning.max_connection_age = max_connection_age; + + ConnectionValidator::new(&config).unwrap() + }; + + let connection_id = validator.create_connection_id(original_addr); + + let original_valid = validator.connection_id_valid(original_addr, connection_id); + let different_valid = validator.connection_id_valid(different_addr, connection_id); + + if different_valid { + return quickcheck::TestResult::failed(); + } + + if max_connection_age == 0 { + quickcheck::TestResult::from_bool(!original_valid) + } else { + // Note: depends on that running this test takes less than a second + quickcheck::TestResult::from_bool(original_valid) + } + } +}