diff --git a/Cargo.lock b/Cargo.lock index b67af2a..1cc25ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,8 +209,12 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", + "blake3", "cfg-if", + "constant_time_eq 0.2.1", "crossbeam-channel", + "getrandom", + "hashbrown 0.12.0", "hex", "log", "mimalloc", @@ -350,6 +354,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" +[[package]] +name = "arrayref" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" + [[package]] name = "arrayvec" version = "0.4.12" @@ -359,6 +369,12 @@ dependencies = [ "nodrop", ] +[[package]] +name = "arrayvec" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" + [[package]] name = "async-trait" version = "0.1.53" @@ -426,9 +442,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5611d4977882c5af1c0f7a34d51b5d87f784f86912bb543986b014ea4995ef93" +checksum = "47594e438a243791dba58124b6669561f5baa14cb12046641d8008bf035e5a25" dependencies = [ "async-trait", "axum-core", @@ -456,9 +472,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95cd109b3e93c9541dcce5b0219dcf89169dcc58c1bebed65082808324258afb" +checksum = "9a671c9ae99531afdd5d3ee8340b8da547779430689947144c140fc74a740244" dependencies = [ "async-trait", "bytes", @@ -524,6 +540,20 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb" +[[package]] +name = "blake3" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" +dependencies = [ + "arrayref", + "arrayvec 0.7.2", + "cc", + "cfg-if", + "constant_time_eq 0.1.5", + "digest 0.10.3", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -655,6 +685,18 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" +[[package]] +name = "constant_time_eq" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" + +[[package]] +name = "constant_time_eq" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df04a53a7e91248c27eb6bfc1db165e8f47453e98478e4609f9cce020bb3c65a" + [[package]] name = "cpufeatures" version = "0.2.2" @@ -854,6 +896,7 @@ checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ "block-buffer 0.10.2", "crypto-common", + "subtle", ] [[package]] @@ -1348,9 +1391,9 @@ checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" [[package]] name = "httparse" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" +checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba" [[package]] name = "httpdate" @@ -1481,9 +1524,9 @@ checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" [[package]] name = "js-sys" -version = "0.3.56" +version = "0.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" +checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" dependencies = [ "wasm-bindgen", ] @@ -1509,9 +1552,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.121" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd" [[package]] name = "libm" @@ -1756,7 +1799,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465" dependencies = [ - "arrayvec", + "arrayvec 0.4.12", "itoa 0.4.8", ] @@ -2056,9 +2099,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" dependencies = [ "unicode-xid", ] @@ -2087,9 +2130,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" dependencies = [ "proc-macro2", ] @@ -2136,9 +2179,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221" dependencies = [ "autocfg 1.1.0", "crossbeam-deque", @@ -2148,14 +2191,13 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +checksum = "9f51245e1e62e1f1629cbfec37b5793bbabcaeb90f30e94d2ba03564687353e4" dependencies = [ "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "lazy_static", "num_cpus", ] @@ -2468,9 +2510,9 @@ dependencies = [ [[package]] name = "simdutf8" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "simple_logger" @@ -2493,9 +2535,9 @@ checksum = "76a77a8fd93886010f05e7ea0720e569d6d16c65329dbe3ec033bbbccccb017b" [[package]] name = "slab" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "smallvec" @@ -2681,9 +2723,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.90" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" dependencies = [ "proc-macro2", "quote", @@ -2926,9 +2968,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" +checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3" dependencies = [ "cfg-if", "log", @@ -2950,9 +2992,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.23" +version = "0.1.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" +checksum = "6dfce9f3241b150f36e8e54bb561a742d5daa1a47b5dd9a5ce369fd4a4db2210" dependencies = [ "lazy_static", ] @@ -3116,9 +3158,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" +checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -3126,9 +3168,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" +checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" dependencies = [ "bumpalo", "lazy_static", @@ -3141,9 +3183,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" +checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3151,9 +3193,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" +checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" dependencies = [ "proc-macro2", "quote", @@ -3164,15 +3206,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.79" +version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" +checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" [[package]] name = "web-sys" -version = "0.3.56" +version = "0.3.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" +checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/TODO.md b/TODO.md index ef68315..7c165ca 100644 --- a/TODO.md +++ b/TODO.md @@ -2,16 +2,13 @@ ## High priority -* aquatic_http_private - * Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead - * stored procedure - * test ip format - * check user token length - * site will likely want num_seeders and num_leechers for all torrents.. - ## Medium priority * rename request workers to swarm workers + +* save space by making ValidUntil just contain u32 with seconds, measured + some Instant created at application start + * quit whole program if any thread panics * But it would be nice not to panic in workers, but to return errors instead. Once JoinHandle::is_finished is available in stable Rust (#90470), an @@ -29,6 +26,13 @@ * SinkExt::send maybe doesn't wake up properly? * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? +* aquatic_http_private + * Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead + * stored procedure + * test ip format + * check user token length + * site will likely want num_seeders and num_leechers for all torrents.. + * extract_response_peers * don't assume requesting peer is in list? diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index d4d5922..e8dcb7b 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -24,8 +24,12 @@ aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } anyhow = "1" +blake3 = "1" cfg-if = "1" +constant_time_eq = "0.2" crossbeam-channel = "0.5" +getrandom = "0.2" +hashbrown = { version = "0.12", default-features = false } hex = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 36affe7..923bd22 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -1,18 +1,107 @@ use std::collections::BTreeMap; use std::hash::Hash; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::time::Instant; -use aquatic_common::CanonicalSocketAddr; +use anyhow::Context; +use constant_time_eq::constant_time_eq; use crossbeam_channel::{Sender, TrySendError}; +use getrandom::getrandom; use aquatic_common::access_list::AccessListArcSwap; +use aquatic_common::CanonicalSocketAddr; use aquatic_udp_protocol::*; use crate::config::Config; -pub const MAX_PACKET_SIZE: usize = 8192; +pub const BUFFER_SIZE: usize = 8192; + +/// HMAC (BLAKE3) based ConnectionID creator and validator +/// +/// Structure of created ConnectionID (bytes making up inner i64): +/// - &[0..4]: connection expiration time as number of seconds after +/// ConnectionValidator instance was created, encoded as u32 bytes. +/// Value fits around 136 years. +/// - &[4..8]: truncated keyed BLAKE3 hash of above 4 bytes and octets of +/// client IP address +/// +/// The purpose of using ConnectionIDs is to prevent IP spoofing, mainly to +/// prevent the tracker from being used as an amplification vector for DDoS +/// attacks. By including 32 bits of BLAKE3 keyed hash output in its contents, +/// such abuse should be rendered impractical. +#[derive(Clone)] +pub struct ConnectionValidator { + start_time: Instant, + max_connection_age: u32, + keyed_hasher: blake3::Hasher, +} + +impl ConnectionValidator { + /// Create new instance. Must be created once and cloned if used in several + /// threads. + pub fn new(config: &Config) -> anyhow::Result { + let mut key = [0; 32]; + + getrandom(&mut key) + .with_context(|| "Couldn't get random bytes for ConnectionValidator key")?; + + let keyed_hasher = blake3::Hasher::new_keyed(&key); + + Ok(Self { + keyed_hasher, + start_time: Instant::now(), + max_connection_age: config.cleaning.max_connection_age, + }) + } + + pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId { + let valid_until = + (self.start_time.elapsed().as_secs() as u32 + self.max_connection_age).to_ne_bytes(); + + let hash = self.hash(valid_until, source_addr.get().ip()); + + let mut connection_id_bytes = [0u8; 8]; + + (&mut connection_id_bytes[..4]).copy_from_slice(&valid_until); + (&mut connection_id_bytes[4..]).copy_from_slice(&hash); + + ConnectionId(i64::from_ne_bytes(connection_id_bytes)) + } + + pub fn connection_id_valid( + &mut self, + source_addr: CanonicalSocketAddr, + connection_id: ConnectionId, + ) -> bool { + let bytes = connection_id.0.to_ne_bytes(); + let (valid_until, hash) = bytes.split_at(4); + let valid_until: [u8; 4] = valid_until.try_into().unwrap(); + + if !constant_time_eq(hash, &self.hash(valid_until, source_addr.get().ip())) { + return false; + } + + u32::from_ne_bytes(valid_until) > self.start_time.elapsed().as_secs() as u32 + } + + fn hash(&mut self, valid_until: [u8; 4], ip_addr: IpAddr) -> [u8; 4] { + self.keyed_hasher.update(&valid_until); + + match ip_addr { + IpAddr::V4(ip) => self.keyed_hasher.update(&ip.octets()), + IpAddr::V6(ip) => self.keyed_hasher.update(&ip.octets()), + }; + + let mut hash = [0u8; 4]; + + self.keyed_hasher.finalize_xof().fill(&mut hash); + self.keyed_hasher.reset(); + + hash + } +} #[derive(Debug)] pub struct PendingScrapeRequest { @@ -185,9 +274,13 @@ impl State { #[cfg(test)] mod tests { - use std::net::Ipv6Addr; + use std::net::{Ipv6Addr, SocketAddr}; - use crate::{common::MAX_PACKET_SIZE, config::Config}; + use quickcheck_macros::quickcheck; + + use crate::config::Config; + + use super::*; #[test] fn test_peer_status_from_event_and_bytes_left() { @@ -213,7 +306,7 @@ mod tests { // Assumes that announce response with maximum amount of ipv6 peers will // be the longest #[test] - fn test_max_package_size() { + fn test_buffer_size() { use aquatic_udp_protocol::*; let config = Config::default(); @@ -239,6 +332,44 @@ mod tests { println!("Buffer len: {}", buf.len()); - assert!(buf.len() <= MAX_PACKET_SIZE); + assert!(buf.len() <= BUFFER_SIZE); + } + + #[quickcheck] + fn test_connection_validator( + original_addr: IpAddr, + different_addr: IpAddr, + max_connection_age: u32, + ) -> quickcheck::TestResult { + let original_addr = CanonicalSocketAddr::new(SocketAddr::new(original_addr, 0)); + let different_addr = CanonicalSocketAddr::new(SocketAddr::new(different_addr, 0)); + + if original_addr == different_addr { + return quickcheck::TestResult::discard(); + } + + let mut validator = { + let mut config = Config::default(); + + config.cleaning.max_connection_age = max_connection_age; + + ConnectionValidator::new(&config).unwrap() + }; + + let connection_id = validator.create_connection_id(original_addr); + + let original_valid = validator.connection_id_valid(original_addr, connection_id); + let different_valid = validator.connection_id_valid(different_addr, connection_id); + + if different_valid { + return quickcheck::TestResult::failed(); + } + + if max_connection_age == 0 { + quickcheck::TestResult::from_bool(!original_valid) + } else { + // Note: depends on that running this test takes less than a second + quickcheck::TestResult::from_bool(original_valid) + } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index eb3f3d1..7823ed8 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -163,8 +163,6 @@ impl Default for StatisticsConfig { #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default)] pub struct CleaningConfig { - /// Clean connections this often (seconds) - pub connection_cleaning_interval: u64, /// Clean torrents this often (seconds) pub torrent_cleaning_interval: u64, /// Clean pending scrape responses this often (seconds) @@ -173,8 +171,8 @@ pub struct CleaningConfig { /// lingering for a long time. However, the cleaning also returns unused /// allocated memory to the OS, so the interval can be configured here. pub pending_scrape_cleaning_interval: u64, - /// Remove connections that are older than this (seconds) - pub max_connection_age: u64, + /// Allow clients to use a connection token for this long (seconds) + pub max_connection_age: u32, /// Remove peers who have not announced for this long (seconds) pub max_peer_age: u64, /// Remove pending scrape responses that have not been returned from request @@ -185,7 +183,6 @@ pub struct CleaningConfig { impl Default for CleaningConfig { fn default() -> Self { Self { - connection_cleaning_interval: 60, torrent_cleaning_interval: 60 * 2, pending_scrape_cleaning_interval: 60 * 10, max_connection_age: 60 * 2, diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index db644b9..80ba09f 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -2,39 +2,39 @@ pub mod common; pub mod config; pub mod workers; -use aquatic_common::PanicSentinelWatcher; -use config::Config; - use std::collections::BTreeMap; use std::thread::Builder; use anyhow::Context; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::{bounded, unbounded}; - -use aquatic_common::access_list::update_access_list; use signal_hook::consts::{SIGTERM, SIGUSR1}; use signal_hook::iterator::Signals; -use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State}; +use aquatic_common::access_list::update_access_list; +#[cfg(feature = "cpu-pinning")] +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; +use aquatic_common::privileges::PrivilegeDropper; +use aquatic_common::PanicSentinelWatcher; -use crate::common::RequestWorkerIndex; +use common::{ + ConnectedRequestSender, ConnectedResponseSender, ConnectionValidator, RequestWorkerIndex, + SocketWorkerIndex, State, +}; +use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::new(config.request_workers); - - update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let state = State::new(config.request_workers); + let connection_validator = ConnectionValidator::new(&config)?; let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + update_access_list(&config.access_list, &state.access_list)?; + let mut request_senders = Vec::new(); let mut request_receivers = BTreeMap::new(); @@ -97,6 +97,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); + let connection_validator = connection_validator.clone(); let request_sender = ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone()); let response_receiver = response_receivers.remove(&i).unwrap(); @@ -118,6 +119,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { state, config, i, + connection_validator, request_sender, response_receiver, priv_dropper, diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs deleted file mode 100644 index 12eb6b9..0000000 --- a/aquatic_udp/src/workers/request.rs +++ /dev/null @@ -1,445 +0,0 @@ -use std::collections::BTreeMap; -use std::net::IpAddr; -use std::net::Ipv4Addr; -use std::net::Ipv6Addr; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; - -use aquatic_common::access_list::create_access_list_cache; -use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::AmortizedIndexMap; -use aquatic_common::CanonicalSocketAddr; -use aquatic_common::PanicSentinel; -use aquatic_common::ValidUntil; -use crossbeam_channel::Receiver; -use rand::{rngs::SmallRng, SeedableRng}; - -use aquatic_common::extract_response_peers; - -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -#[derive(Clone, Debug)] -struct Peer { - pub ip_address: I, - pub port: Port, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -impl Peer { - fn to_response_peer(&self) -> ResponsePeer { - ResponsePeer { - ip_address: self.ip_address, - port: self.port, - } - } -} - -type PeerMap = AmortizedIndexMap>; - -struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -type TorrentMap = AmortizedIndexMap>; - -#[derive(Default)] -struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let now = Instant::now(); - let access_list_mode = config.access_list.mode; - - let mut access_list_cache = create_access_list_cache(access_list); - - self.ipv4.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && Self::clean_torrent_and_peers(now, torrent) - }); - self.ipv4.shrink_to_fit(); - - self.ipv6.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && Self::clean_torrent_and_peers(now, torrent) - }); - self.ipv6.shrink_to_fit(); - } - - /// Returns true if torrent is to be kept - #[inline] - fn clean_torrent_and_peers(now: Instant, torrent: &mut TorrentData) -> bool { - let num_seeders = &mut torrent.num_seeders; - let num_leechers = &mut torrent.num_leechers; - - torrent.peers.retain(|_, peer| { - let keep = peer.valid_until.0 > now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - torrent.peers.shrink_to_fit(); - - !torrent.peers.is_empty() - } -} - -pub fn run_request_worker( - _sentinel: PanicSentinel, - config: Config, - state: State, - request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - response_sender: ConnectedResponseSender, - worker_index: RequestWorkerIndex, -) { - let mut torrents = TorrentMaps::default(); - let mut small_rng = SmallRng::from_entropy(); - - let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); - let statistics_update_interval = Duration::from_secs(config.statistics.interval); - - let mut last_cleaning = Instant::now(); - let mut last_statistics_update = Instant::now(); - - let mut iter_counter = 0usize; - - loop { - if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - let response = match request { - ConnectedRequest::Announce(request) => handle_announce_request( - &config, - &mut small_rng, - &mut torrents, - request, - src, - peer_valid_until, - ), - ConnectedRequest::Scrape(request) => { - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)) - } - }; - - response_sender.try_send_to(sender_index, response, src); - } - - if iter_counter % 128 == 0 { - let now = Instant::now(); - - peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age); - - if now > last_cleaning + cleaning_interval { - torrents.clean(&config, &state.access_list); - - if config.statistics.active() { - let peers_ipv4 = torrents.ipv4.values().map(|t| t.peers.len()).sum(); - let peers_ipv6 = torrents.ipv6.values().map(|t| t.peers.len()).sum(); - - state.statistics_ipv4.peers[worker_index.0] - .store(peers_ipv4, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0] - .store(peers_ipv6, Ordering::Release); - } - - last_cleaning = now; - } - if config.statistics.active() - && now > last_statistics_update + statistics_update_interval - { - state.statistics_ipv4.torrents[worker_index.0] - .store(torrents.ipv4.len(), Ordering::Release); - state.statistics_ipv6.torrents[worker_index.0] - .store(torrents.ipv6.len(), Ordering::Release); - - last_statistics_update = now; - } - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - -fn handle_announce_request( - config: &Config, - rng: &mut SmallRng, - torrents: &mut TorrentMaps, - request: AnnounceRequest, - src: CanonicalSocketAddr, - peer_valid_until: ValidUntil, -) -> ConnectedResponse { - match src.get().ip() { - IpAddr::V4(ip) => ConnectedResponse::AnnounceIpv4(handle_announce_request_inner( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - )), - IpAddr::V6(ip) => ConnectedResponse::AnnounceIpv6(handle_announce_request_inner( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - )), - } -} - -fn handle_announce_request_inner( - config: &Config, - rng: &mut SmallRng, - torrents: &mut TorrentMap, - request: AnnounceRequest, - peer_ip: I, - peer_valid_until: ValidUntil, -) -> AnnounceResponse { - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); - - let peer = Peer { - ip_address: peer_ip, - port: request.port, - status: peer_status, - valid_until: peer_valid_until, - }; - - let torrent_data = torrents.entry(request.info_hash).or_default(); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(request.peer_id, peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(request.peer_id, peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - request.peer_id, - Peer::to_response_peer, - ); - - AnnounceResponse { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), - peers: response_peers, - } -} - -#[inline] -fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { - if peers_wanted <= 0 { - config.protocol.max_response_peers as usize - } else { - ::std::cmp::min( - config.protocol.max_response_peers as usize, - peers_wanted as usize, - ) - } -} - -fn handle_scrape_request( - torrents: &mut TorrentMaps, - src: CanonicalSocketAddr, - request: PendingScrapeRequest, -) -> PendingScrapeResponse { - const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); - - let mut torrent_stats: BTreeMap = BTreeMap::new(); - - if src.is_ipv4() { - torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { - let s = if let Some(torrent_data) = torrents.ipv4.get(&info_hash) { - create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - ) - } else { - EMPTY_STATS - }; - - (i, s) - })); - } else { - torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { - let s = if let Some(torrent_data) = torrents.ipv6.get(&info_hash) { - create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - ) - } else { - EMPTY_STATS - }; - - (i, s) - })); - } - - PendingScrapeResponse { - slab_key: request.slab_key, - torrent_stats, - } -} - -#[inline(always)] -const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders), - completed: NumberOfDownloads(0), // No implementation planned - leechers: NumberOfPeers(leechers), - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::net::Ipv4Addr; - - use quickcheck::{quickcheck, TestResult}; - use rand::thread_rng; - - use super::*; - - fn gen_peer_id(i: u32) -> PeerId { - let mut peer_id = PeerId([0; 20]); - - peer_id.0[0..4].copy_from_slice(&i.to_ne_bytes()); - - peer_id - } - fn gen_peer(i: u32) -> Peer { - Peer { - ip_address: Ipv4Addr::from(i.to_be_bytes()), - port: Port(1), - status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), - } - } - - #[test] - fn test_extract_response_peers() { - fn prop(data: (u16, u16)) -> TestResult { - let gen_num_peers = data.0 as u32; - let req_num_peers = data.1 as usize; - - let mut peer_map: PeerMap = Default::default(); - - let mut opt_sender_key = None; - let mut opt_sender_peer = None; - - for i in 0..gen_num_peers { - let key = gen_peer_id(i); - let peer = gen_peer((i << 16) + i); - - if i == 0 { - opt_sender_key = Some(key); - opt_sender_peer = Some(peer.to_response_peer()); - } - - peer_map.insert(key, peer); - } - - let mut rng = thread_rng(); - - let peers = extract_response_peers( - &mut rng, - &peer_map, - req_num_peers, - opt_sender_key.unwrap_or_else(|| gen_peer_id(1)), - Peer::to_response_peer, - ); - - // Check that number of returned peers is correct - - let mut success = peers.len() <= req_num_peers; - - if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize - || peers.len() + 1 == gen_num_peers as usize; - } - - // Check that returned peers are unique (no overlap) and that sender - // isn't returned - - let mut ip_addresses = HashSet::with_capacity(peers.len()); - - for peer in peers { - if peer == opt_sender_peer.clone().unwrap() - || ip_addresses.contains(&peer.ip_address) - { - success = false; - - break; - } - - ip_addresses.insert(peer.ip_address); - } - - TestResult::from_bool(success) - } - - quickcheck(prop as fn((u16, u16)) -> TestResult); - } -} diff --git a/aquatic_udp/src/workers/request/mod.rs b/aquatic_udp/src/workers/request/mod.rs new file mode 100644 index 0000000..1b8335a --- /dev/null +++ b/aquatic_udp/src/workers/request/mod.rs @@ -0,0 +1,185 @@ +mod storage; + +use std::net::IpAddr; +use std::sync::atomic::Ordering; +use std::time::Duration; +use std::time::Instant; + +use crossbeam_channel::Receiver; +use rand::{rngs::SmallRng, SeedableRng}; + +use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil}; + +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +use storage::{Peer, TorrentMap, TorrentMaps}; + +pub fn run_request_worker( + _sentinel: PanicSentinel, + config: Config, + state: State, + request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, + response_sender: ConnectedResponseSender, + worker_index: RequestWorkerIndex, +) { + let mut torrents = TorrentMaps::default(); + let mut rng = SmallRng::from_entropy(); + + let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); + let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); + let statistics_update_interval = Duration::from_secs(config.statistics.interval); + + let mut last_cleaning = Instant::now(); + let mut last_statistics_update = Instant::now(); + + let mut iter_counter = 0usize; + + loop { + if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { + let response = match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + let response = handle_announce_request( + &config, + &mut rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ); + + ConnectedResponse::AnnounceIpv4(response) + } + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + let response = handle_announce_request( + &config, + &mut rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ); + + ConnectedResponse::AnnounceIpv6(response) + } + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.ipv4, request)) + } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.ipv6, request)) + } + }; + + response_sender.try_send_to(sender_index, response, src); + } + + // Run periodic tasks + if iter_counter % 128 == 0 { + let now = Instant::now(); + + peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age); + + if now > last_cleaning + cleaning_interval { + let (ipv4, ipv6) = torrents.clean_and_get_num_peers(&config, &state.access_list); + + if config.statistics.active() { + state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6, Ordering::Release); + } + + last_cleaning = now; + } + if config.statistics.active() + && now > last_statistics_update + statistics_update_interval + { + state.statistics_ipv4.torrents[worker_index.0] + .store(torrents.ipv4.num_torrents(), Ordering::Release); + state.statistics_ipv6.torrents[worker_index.0] + .store(torrents.ipv6.num_torrents(), Ordering::Release); + + last_statistics_update = now; + } + } + + iter_counter = iter_counter.wrapping_add(1); + } +} + +fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMap, + request: AnnounceRequest, + peer_ip: I, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + let max_num_peers_to_take = if request.peers_wanted.0 <= 0 { + config.protocol.max_response_peers as usize + } else { + ::std::cmp::min( + config.protocol.max_response_peers as usize, + request.peers_wanted.0.try_into().unwrap(), + ) + }; + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left), + valid_until: peer_valid_until, + }; + + let torrent_data = torrents.0.entry(request.info_hash).or_default(); + + torrent_data.update_peer(request.peer_id, peer); + + let response_peers = + torrent_data.extract_response_peers(rng, request.peer_id, max_num_peers_to_take); + + AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), + leechers: NumberOfPeers(torrent_data.num_leechers() as i32), + seeders: NumberOfPeers(torrent_data.num_seeders() as i32), + peers: response_peers, + } +} + +fn handle_scrape_request( + torrents: &mut TorrentMap, + request: PendingScrapeRequest, +) -> PendingScrapeResponse { + const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); + + let torrent_stats = request + .info_hashes + .into_iter() + .map(|(i, info_hash)| { + let stats = torrents + .0 + .get(&info_hash) + .map(|torrent_data| torrent_data.scrape_statistics()) + .unwrap_or(EMPTY_STATS); + + (i, stats) + }) + .collect(); + + PendingScrapeResponse { + slab_key: request.slab_key, + torrent_stats, + } +} + +#[inline(always)] +const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers), + } +} diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/request/storage.rs new file mode 100644 index 0000000..75bc311 --- /dev/null +++ b/aquatic_udp/src/workers/request/storage.rs @@ -0,0 +1,297 @@ +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; +use std::sync::Arc; +use std::time::Instant; + +use aquatic_common::{ + access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, + extract_response_peers, AmortizedIndexMap, ValidUntil, +}; + +use aquatic_udp_protocol::*; +use rand::prelude::SmallRng; + +use crate::common::*; +use crate::config::Config; + +use super::create_torrent_scrape_statistics; + +#[derive(Clone, Debug)] +pub struct Peer { + pub ip_address: I, + pub port: Port, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port, + } + } +} + +pub type PeerMap = AmortizedIndexMap>; + +pub struct TorrentData { + peers: PeerMap, + num_seeders: usize, + num_leechers: usize, +} + +impl TorrentData { + pub fn update_peer(&mut self, peer_id: PeerId, peer: Peer) { + let opt_removed_peer = match peer.status { + PeerStatus::Leeching => { + self.num_leechers += 1; + + self.peers.insert(peer_id, peer) + } + PeerStatus::Seeding => { + self.num_seeders += 1; + + self.peers.insert(peer_id, peer) + } + PeerStatus::Stopped => self.peers.remove(&peer_id), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + self.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + self.num_seeders -= 1; + } + _ => {} + } + } + + pub fn extract_response_peers( + &self, + rng: &mut SmallRng, + peer_id: PeerId, + max_num_peers_to_take: usize, + ) -> Vec> { + extract_response_peers( + rng, + &self.peers, + max_num_peers_to_take, + peer_id, + Peer::to_response_peer, + ) + } + + pub fn num_leechers(&self) -> usize { + self.num_leechers + } + + pub fn num_seeders(&self) -> usize { + self.num_seeders + } + + pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { + create_torrent_scrape_statistics( + self.num_seeders.try_into().unwrap_or(i32::MAX), + self.num_leechers.try_into().unwrap_or(i32::MAX), + ) + } + + /// Remove inactive peers and reclaim space + fn clean(&mut self, now: Instant) { + self.peers.retain(|_, peer| { + if peer.valid_until.0 > now { + true + } else { + match peer.status { + PeerStatus::Seeding => { + self.num_seeders -= 1; + } + PeerStatus::Leeching => { + self.num_leechers -= 1; + } + _ => (), + }; + + false + } + }); + + if !self.peers.is_empty() { + self.peers.shrink_to_fit(); + } + } +} + +impl Default for TorrentData { + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +#[derive(Default)] +pub struct TorrentMap(pub AmortizedIndexMap>); + +impl TorrentMap { + /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers + fn clean_and_get_num_peers( + &mut self, + access_list_cache: &mut AccessListCache, + access_list_mode: AccessListMode, + now: Instant, + ) -> usize { + let mut num_peers = 0; + + self.0.retain(|info_hash, torrent| { + if !access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + { + return false; + } + + torrent.clean(now); + + num_peers += torrent.peers.len(); + + !torrent.peers.is_empty() + }); + + self.0.shrink_to_fit(); + + num_peers + } + + pub fn num_torrents(&self) -> usize { + self.0.len() + } +} + +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl Default for TorrentMaps { + fn default() -> Self { + Self { + ipv4: TorrentMap(Default::default()), + ipv6: TorrentMap(Default::default()), + } + } +} + +impl TorrentMaps { + /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers + pub fn clean_and_get_num_peers( + &mut self, + config: &Config, + access_list: &Arc, + ) -> (usize, usize) { + let mut cache = create_access_list_cache(access_list); + let mode = config.access_list.mode; + let now = Instant::now(); + + let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now); + let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now); + + (ipv4, ipv6) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::Ipv4Addr; + + use quickcheck::{quickcheck, TestResult}; + use rand::thread_rng; + + use super::*; + + fn gen_peer_id(i: u32) -> PeerId { + let mut peer_id = PeerId([0; 20]); + + peer_id.0[0..4].copy_from_slice(&i.to_ne_bytes()); + + peer_id + } + fn gen_peer(i: u32) -> Peer { + Peer { + ip_address: Ipv4Addr::from(i.to_be_bytes()), + port: Port(1), + status: PeerStatus::Leeching, + valid_until: ValidUntil::new(0), + } + } + + #[test] + fn test_extract_response_peers() { + fn prop(data: (u16, u16)) -> TestResult { + let gen_num_peers = data.0 as u32; + let req_num_peers = data.1 as usize; + + let mut peer_map: PeerMap = Default::default(); + + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + + for i in 0..gen_num_peers { + let key = gen_peer_id(i); + let peer = gen_peer((i << 16) + i); + + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(peer.to_response_peer()); + } + + peer_map.insert(key, peer); + } + + let mut rng = thread_rng(); + + let peers = extract_response_peers( + &mut rng, + &peer_map, + req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_id(1)), + Peer::to_response_peer, + ); + + // Check that number of returned peers is correct + + let mut success = peers.len() <= req_num_peers; + + if req_num_peers >= gen_num_peers as usize { + success &= peers.len() == gen_num_peers as usize + || peers.len() + 1 == gen_num_peers as usize; + } + + // Check that returned peers are unique (no overlap) and that sender + // isn't returned + + let mut ip_addresses = HashSet::with_capacity(peers.len()); + + for peer in peers { + if peer == opt_sender_peer.clone().unwrap() + || ip_addresses.contains(&peer.ip_address) + { + success = false; + + break; + } + + ip_addresses.insert(peer.ip_address); + } + + TestResult::from_bool(success) + } + + quickcheck(prop as fn((u16, u16)) -> TestResult); + } +} diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs deleted file mode 100644 index c8b5e05..0000000 --- a/aquatic_udp/src/workers/socket.rs +++ /dev/null @@ -1,667 +0,0 @@ -use std::collections::BTreeMap; -use std::io::{Cursor, ErrorKind}; -use std::sync::atomic::Ordering; -use std::time::{Duration, Instant}; -use std::vec::Drain; - -use anyhow::Context; -use aquatic_common::privileges::PrivilegeDropper; -use crossbeam_channel::Receiver; -use mio::net::UdpSocket; -use mio::{Events, Interest, Poll, Token}; -use rand::prelude::{Rng, SeedableRng, StdRng}; -use slab::Slab; - -use aquatic_common::access_list::create_access_list_cache; -use aquatic_common::access_list::AccessListCache; -use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; -use aquatic_common::{PanicSentinel, ValidUntil}; -use aquatic_udp_protocol::*; -use socket2::{Domain, Protocol, Socket, Type}; - -use crate::common::*; -use crate::config::Config; - -#[derive(Default)] -pub struct ConnectionMap(AmortizedIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>); - -impl ConnectionMap { - pub fn insert( - &mut self, - connection_id: ConnectionId, - socket_addr: CanonicalSocketAddr, - valid_until: ValidUntil, - ) { - self.0.insert((connection_id, socket_addr), valid_until); - } - - pub fn contains(&self, connection_id: ConnectionId, socket_addr: CanonicalSocketAddr) -> bool { - self.0.contains_key(&(connection_id, socket_addr)) - } - - pub fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.0 > now); - self.0.shrink_to_fit(); - } -} - -#[derive(Debug)] -pub struct PendingScrapeResponseSlabEntry { - num_pending: usize, - valid_until: ValidUntil, - torrent_stats: BTreeMap, - transaction_id: TransactionId, -} - -#[derive(Default)] -pub struct PendingScrapeResponseSlab(Slab); - -impl PendingScrapeResponseSlab { - pub fn prepare_split_requests( - &mut self, - config: &Config, - request: ScrapeRequest, - valid_until: ValidUntil, - ) -> impl IntoIterator { - let mut split_requests: AmortizedIndexMap = - Default::default(); - - if request.info_hashes.is_empty() { - ::log::warn!( - "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" - ); - - return split_requests; - } - - let vacant_entry = self.0.vacant_entry(); - let slab_key = vacant_entry.key(); - - for (i, info_hash) in request.info_hashes.into_iter().enumerate() { - let split_request = split_requests - .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) - .or_insert_with(|| PendingScrapeRequest { - slab_key, - info_hashes: BTreeMap::new(), - }); - - split_request.info_hashes.insert(i, info_hash); - } - - vacant_entry.insert(PendingScrapeResponseSlabEntry { - num_pending: split_requests.len(), - valid_until, - torrent_stats: Default::default(), - transaction_id: request.transaction_id, - }); - - split_requests - } - - pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { - let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { - entry.num_pending -= 1; - - entry - .torrent_stats - .extend(response.torrent_stats.into_iter()); - - entry.num_pending == 0 - } else { - ::log::warn!( - "PendingScrapeResponseSlab.add didn't find entry for key {:?}", - response.slab_key - ); - - false - }; - - if finished { - let entry = self.0.remove(response.slab_key); - - Some(Response::Scrape(ScrapeResponse { - transaction_id: entry.transaction_id, - torrent_stats: entry.torrent_stats.into_values().collect(), - })) - } else { - None - } - } - - pub fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|k, v| { - let keep = v.valid_until.0 > now; - - if !keep { - ::log::warn!( - "Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}", - k, - v - ); - } - - keep - }); - self.0.shrink_to_fit(); - } -} - -pub fn run_socket_worker( - _sentinel: PanicSentinel, - state: State, - config: Config, - token_num: usize, - request_sender: ConnectedRequestSender, - response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - priv_dropper: PrivilegeDropper, -) { - let mut rng = StdRng::from_entropy(); - let mut buffer = [0u8; MAX_PACKET_SIZE]; - - let mut socket = - UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); - let mut poll = Poll::new().expect("create poll"); - - let interests = Interest::READABLE; - - poll.registry() - .register(&mut socket, Token(token_num), interests) - .unwrap(); - - let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut connections = ConnectionMap::default(); - let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); - let mut access_list_cache = create_access_list_cache(&state.access_list); - - let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); - - let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); - - let connection_cleaning_duration = - Duration::from_secs(config.cleaning.connection_cleaning_interval); - let pending_scrape_cleaning_duration = - Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); - - let mut connection_valid_until = ValidUntil::new(config.cleaning.max_connection_age); - let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); - - let mut last_connection_cleaning = Instant::now(); - let mut last_pending_scrape_cleaning = Instant::now(); - - let mut iter_counter = 0usize; - - loop { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); - - for event in events.iter() { - let token = event.token(); - - if (token.0 == token_num) & event.is_readable() { - read_requests( - &config, - &state, - &mut connections, - &mut pending_scrape_responses, - &mut access_list_cache, - &mut rng, - &mut socket, - &mut buffer, - &request_sender, - &mut local_responses, - connection_valid_until, - pending_scrape_valid_until, - ); - } - } - - send_responses( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - ); - - // Run periodic ValidUntil updates and state cleaning - if iter_counter % 128 == 0 { - let now = Instant::now(); - - connection_valid_until = - ValidUntil::new_with_now(now, config.cleaning.max_connection_age); - pending_scrape_valid_until = - ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age); - - if now > last_connection_cleaning + connection_cleaning_duration { - connections.clean(); - - last_connection_cleaning = now; - } - if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { - pending_scrape_responses.clean(); - - last_pending_scrape_cleaning = now; - } - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - -#[inline] -fn read_requests( - config: &Config, - state: &State, - connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - access_list_cache: &mut AccessListCache, - rng: &mut StdRng, - socket: &mut UdpSocket, - buffer: &mut [u8], - request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, - connection_valid_until: ValidUntil, - pending_scrape_valid_until: ValidUntil, -) { - let mut requests_received_ipv4: usize = 0; - let mut requests_received_ipv6: usize = 0; - let mut bytes_received_ipv4: usize = 0; - let mut bytes_received_ipv6 = 0; - - loop { - match socket.recv_from(&mut buffer[..]) { - Ok((amt, src)) => { - let res_request = - Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); - - let src = CanonicalSocketAddr::new(src); - - // Update statistics for converted address - if src.is_ipv4() { - if res_request.is_ok() { - requests_received_ipv4 += 1; - } - bytes_received_ipv4 += amt; - } else { - if res_request.is_ok() { - requests_received_ipv6 += 1; - } - bytes_received_ipv6 += amt; - } - - handle_request( - config, - connections, - pending_scrape_responses, - access_list_cache, - rng, - request_sender, - local_responses, - connection_valid_until, - pending_scrape_valid_until, - res_request, - src, - ); - } - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break; - } - - ::log::info!("recv_from error: {}", err); - } - } - } - - if config.statistics.active() { - state - .statistics_ipv4 - .requests_received - .fetch_add(requests_received_ipv4, Ordering::Release); - state - .statistics_ipv6 - .requests_received - .fetch_add(requests_received_ipv6, Ordering::Release); - state - .statistics_ipv4 - .bytes_received - .fetch_add(bytes_received_ipv4, Ordering::Release); - state - .statistics_ipv6 - .bytes_received - .fetch_add(bytes_received_ipv6, Ordering::Release); - } -} - -pub fn handle_request( - config: &Config, - connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - access_list_cache: &mut AccessListCache, - rng: &mut StdRng, - request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, - connection_valid_until: ValidUntil, - pending_scrape_valid_until: ValidUntil, - res_request: Result, - src: CanonicalSocketAddr, -) { - let access_list_mode = config.access_list.mode; - - match res_request { - Ok(Request::Connect(request)) => { - let connection_id = ConnectionId(rng.gen()); - - connections.insert(connection_id, src, connection_valid_until); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - local_responses.push((response, src)) - } - Ok(Request::Announce(request)) => { - if connections.contains(request.connection_id, src) { - if access_list_cache - .load() - .allows(access_list_mode, &request.info_hash.0) - { - let worker_index = - RequestWorkerIndex::from_info_hash(config, request.info_hash); - - request_sender.try_send_to( - worker_index, - ConnectedRequest::Announce(request), - src, - ); - } else { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".into(), - }); - - local_responses.push((response, src)) - } - } - } - Ok(Request::Scrape(request)) => { - if connections.contains(request.connection_id, src) { - let split_requests = pending_scrape_responses.prepare_split_requests( - config, - request, - pending_scrape_valid_until, - ); - - for (request_worker_index, request) in split_requests { - request_sender.try_send_to( - request_worker_index, - ConnectedRequest::Scrape(request), - src, - ); - } - } - } - Err(err) => { - ::log::debug!("Request::from_bytes error: {:?}", err); - - if let RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } = err - { - if connections.contains(connection_id, src) { - let response = ErrorResponse { - transaction_id, - message: err.right_or("Parse error").into(), - }; - - local_responses.push((response.into(), src)); - } - } - } - } -} - -#[inline] -fn send_responses( - state: &State, - config: &Config, - socket: &mut UdpSocket, - buffer: &mut [u8], - response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - local_responses: Drain<(Response, CanonicalSocketAddr)>, -) { - for (response, addr) in local_responses { - send_response(state, config, socket, buffer, response, addr); - } - - for (response, addr) in response_receiver.try_iter() { - let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), - ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), - ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), - }; - - if let Some(response) = opt_response { - send_response(state, config, socket, buffer, response, addr); - } - } -} - -fn send_response( - state: &State, - config: &Config, - socket: &mut UdpSocket, - buffer: &mut [u8], - response: Response, - addr: CanonicalSocketAddr, -) { - let mut cursor = Cursor::new(buffer); - - let canonical_addr_is_ipv4 = addr.is_ipv4(); - - let addr = if config.network.address.is_ipv4() { - addr.get_ipv4() - .expect("found peer ipv6 address while running bound to ipv4 address") - } else { - addr.get_ipv6_mapped() - }; - - match response.write(&mut cursor) { - Ok(()) => { - let amt = cursor.position() as usize; - - match socket.send_to(&cursor.get_ref()[..amt], addr) { - Ok(amt) if config.statistics.active() => { - let stats = if canonical_addr_is_ipv4 { - &state.statistics_ipv4 - } else { - &state.statistics_ipv6 - }; - - stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); - - match response { - Response::Connect(_) => { - stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); - } - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { - stats - .responses_sent_announce - .fetch_add(1, Ordering::Relaxed); - } - Response::Scrape(_) => { - stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); - } - Response::Error(_) => { - stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); - } - } - } - Ok(_) => {} - Err(err) => { - ::log::info!("send_to error: {}", err); - } - } - } - Err(err) => { - ::log::error!("Response::write error: {:?}", err); - } - } -} - -pub fn create_socket( - config: &Config, - priv_dropper: PrivilegeDropper, -) -> anyhow::Result<::std::net::UdpSocket> { - let socket = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))? - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))? - }; - - if config.network.only_ipv6 { - socket - .set_only_v6(true) - .with_context(|| "socket: set only ipv6")?; - } - - socket - .set_reuse_port(true) - .with_context(|| "socket: set reuse port")?; - - socket - .set_nonblocking(true) - .with_context(|| "socket: set nonblocking")?; - - let recv_buffer_size = config.network.socket_recv_buffer_size; - - if recv_buffer_size != 0 { - if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { - ::log::error!( - "socket: failed setting recv buffer to {}: {:?}", - recv_buffer_size, - err - ); - } - } - - socket - .bind(&config.network.address.into()) - .with_context(|| format!("socket: bind to {}", config.network.address))?; - - priv_dropper.after_socket_creation()?; - - Ok(socket.into()) -} - -#[cfg(test)] -mod tests { - use quickcheck::TestResult; - use quickcheck_macros::quickcheck; - - use super::*; - - #[quickcheck] - fn test_pending_scrape_response_map( - request_data: Vec<(i32, i64, u8)>, - request_workers: u8, - ) -> TestResult { - if request_workers == 0 { - return TestResult::discard(); - } - - let mut config = Config::default(); - - config.request_workers = request_workers as usize; - - let valid_until = ValidUntil::new(1); - - let mut map = PendingScrapeResponseSlab::default(); - - let mut requests = Vec::new(); - - for (t, c, b) in request_data { - if b == 0 { - return TestResult::discard(); - } - - let mut info_hashes = Vec::new(); - - for i in 0..b { - let info_hash = InfoHash([i; 20]); - - info_hashes.push(info_hash); - } - - let request = ScrapeRequest { - transaction_id: TransactionId(t), - connection_id: ConnectionId(c), - info_hashes, - }; - - requests.push(request); - } - - let mut all_split_requests = Vec::new(); - - for request in requests.iter() { - let split_requests = - map.prepare_split_requests(&config, request.to_owned(), valid_until); - - all_split_requests.push( - split_requests - .into_iter() - .collect::>(), - ); - } - - assert_eq!(map.0.len(), requests.len()); - - let mut responses = Vec::new(); - - for split_requests in all_split_requests { - for (worker_index, split_request) in split_requests { - assert!(worker_index.0 < request_workers as usize); - - let torrent_stats = split_request - .info_hashes - .into_iter() - .map(|(i, info_hash)| { - ( - i, - TorrentScrapeStatistics { - seeders: NumberOfPeers((info_hash.0[0]) as i32), - leechers: NumberOfPeers(0), - completed: NumberOfDownloads(0), - }, - ) - }) - .collect(); - - let response = PendingScrapeResponse { - slab_key: split_request.slab_key, - torrent_stats, - }; - - if let Some(response) = map.add_and_get_finished(response) { - responses.push(response); - } - } - } - - assert!(map.0.is_empty()); - assert_eq!(responses.len(), requests.len()); - - TestResult::from_bool(true) - } -} diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs new file mode 100644 index 0000000..d53d8b4 --- /dev/null +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -0,0 +1,158 @@ +mod requests; +mod responses; +mod storage; + +use std::time::{Duration, Instant}; + +use anyhow::Context; +use crossbeam_channel::Receiver; +use mio::net::UdpSocket; +use mio::{Events, Interest, Poll, Token}; +use socket2::{Domain, Protocol, Socket, Type}; + +use aquatic_common::{ + access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, + PanicSentinel, ValidUntil, +}; +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +use requests::read_requests; +use responses::send_responses; +use storage::PendingScrapeResponseSlab; + +pub fn run_socket_worker( + _sentinel: PanicSentinel, + state: State, + config: Config, + token_num: usize, + mut connection_validator: ConnectionValidator, + request_sender: ConnectedRequestSender, + response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, + priv_dropper: PrivilegeDropper, +) { + let mut buffer = [0u8; BUFFER_SIZE]; + + let mut socket = + UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); + let mut poll = Poll::new().expect("create poll"); + + let interests = Interest::READABLE; + + poll.registry() + .register(&mut socket, Token(token_num), interests) + .unwrap(); + + let mut events = Events::with_capacity(config.network.poll_event_capacity); + let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); + let mut access_list_cache = create_access_list_cache(&state.access_list); + + let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); + + let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); + + let pending_scrape_cleaning_duration = + Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); + + let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); + let mut last_pending_scrape_cleaning = Instant::now(); + + let mut iter_counter = 0usize; + + loop { + poll.poll(&mut events, Some(poll_timeout)) + .expect("failed polling"); + + for event in events.iter() { + let token = event.token(); + + if (token.0 == token_num) & event.is_readable() { + read_requests( + &config, + &state, + &mut connection_validator, + &mut pending_scrape_responses, + &mut access_list_cache, + &mut socket, + &mut buffer, + &request_sender, + &mut local_responses, + pending_scrape_valid_until, + ); + } + } + + send_responses( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + ); + + // Run periodic ValidUntil updates and state cleaning + if iter_counter % 256 == 0 { + let now = Instant::now(); + + pending_scrape_valid_until = + ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age); + + if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { + pending_scrape_responses.clean(); + + last_pending_scrape_cleaning = now; + } + } + + iter_counter = iter_counter.wrapping_add(1); + } +} + +fn create_socket( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result<::std::net::UdpSocket> { + let socket = if config.network.address.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))? + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))? + }; + + if config.network.only_ipv6 { + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; + } + + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; + + socket + .set_nonblocking(true) + .with_context(|| "socket: set nonblocking")?; + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { + ::log::error!( + "socket: failed setting recv buffer to {}: {:?}", + recv_buffer_size, + err + ); + } + } + + socket + .bind(&config.network.address.into()) + .with_context(|| format!("socket: bind to {}", config.network.address))?; + + priv_dropper.after_socket_creation()?; + + Ok(socket.into()) +} diff --git a/aquatic_udp/src/workers/socket/requests.rs b/aquatic_udp/src/workers/socket/requests.rs new file mode 100644 index 0000000..610f0ae --- /dev/null +++ b/aquatic_udp/src/workers/socket/requests.rs @@ -0,0 +1,178 @@ +use std::io::ErrorKind; +use std::sync::atomic::Ordering; + +use mio::net::UdpSocket; + +use aquatic_common::{access_list::AccessListCache, CanonicalSocketAddr, ValidUntil}; +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +use super::storage::PendingScrapeResponseSlab; + +pub fn read_requests( + config: &Config, + state: &State, + connection_validator: &mut ConnectionValidator, + pending_scrape_responses: &mut PendingScrapeResponseSlab, + access_list_cache: &mut AccessListCache, + socket: &mut UdpSocket, + buffer: &mut [u8], + request_sender: &ConnectedRequestSender, + local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, + pending_scrape_valid_until: ValidUntil, +) { + let mut requests_received_ipv4: usize = 0; + let mut requests_received_ipv6: usize = 0; + let mut bytes_received_ipv4: usize = 0; + let mut bytes_received_ipv6 = 0; + + loop { + match socket.recv_from(&mut buffer[..]) { + Ok((amt, src)) => { + let res_request = + Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); + + let src = CanonicalSocketAddr::new(src); + + // Update statistics for converted address + if src.is_ipv4() { + if res_request.is_ok() { + requests_received_ipv4 += 1; + } + bytes_received_ipv4 += amt; + } else { + if res_request.is_ok() { + requests_received_ipv6 += 1; + } + bytes_received_ipv6 += amt; + } + + handle_request( + config, + connection_validator, + pending_scrape_responses, + access_list_cache, + request_sender, + local_responses, + pending_scrape_valid_until, + res_request, + src, + ); + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { + ::log::warn!("recv_from error: {:#}", err); + } + } + } + + if config.statistics.active() { + state + .statistics_ipv4 + .requests_received + .fetch_add(requests_received_ipv4, Ordering::Release); + state + .statistics_ipv6 + .requests_received + .fetch_add(requests_received_ipv6, Ordering::Release); + state + .statistics_ipv4 + .bytes_received + .fetch_add(bytes_received_ipv4, Ordering::Release); + state + .statistics_ipv6 + .bytes_received + .fetch_add(bytes_received_ipv6, Ordering::Release); + } +} + +fn handle_request( + config: &Config, + connection_validator: &mut ConnectionValidator, + pending_scrape_responses: &mut PendingScrapeResponseSlab, + access_list_cache: &mut AccessListCache, + request_sender: &ConnectedRequestSender, + local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, + pending_scrape_valid_until: ValidUntil, + res_request: Result, + src: CanonicalSocketAddr, +) { + let access_list_mode = config.access_list.mode; + + match res_request { + Ok(Request::Connect(request)) => { + let connection_id = connection_validator.create_connection_id(src); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_responses.push((response, src)) + } + Ok(Request::Announce(request)) => { + if connection_validator.connection_id_valid(src, request.connection_id) { + if access_list_cache + .load() + .allows(access_list_mode, &request.info_hash.0) + { + let worker_index = + RequestWorkerIndex::from_info_hash(config, request.info_hash); + + request_sender.try_send_to( + worker_index, + ConnectedRequest::Announce(request), + src, + ); + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_responses.push((response, src)) + } + } + } + Ok(Request::Scrape(request)) => { + if connection_validator.connection_id_valid(src, request.connection_id) { + let split_requests = pending_scrape_responses.prepare_split_requests( + config, + request, + pending_scrape_valid_until, + ); + + for (request_worker_index, request) in split_requests { + request_sender.try_send_to( + request_worker_index, + ConnectedRequest::Scrape(request), + src, + ); + } + } + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connection_validator.connection_id_valid(src, connection_id) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_responses.push((response.into(), src)); + } + } + } + } +} diff --git a/aquatic_udp/src/workers/socket/responses.rs b/aquatic_udp/src/workers/socket/responses.rs new file mode 100644 index 0000000..7bb5510 --- /dev/null +++ b/aquatic_udp/src/workers/socket/responses.rs @@ -0,0 +1,104 @@ +use std::io::Cursor; +use std::sync::atomic::Ordering; +use std::vec::Drain; + +use crossbeam_channel::Receiver; +use mio::net::UdpSocket; + +use aquatic_common::CanonicalSocketAddr; +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +use super::storage::PendingScrapeResponseSlab; + +pub fn send_responses( + state: &State, + config: &Config, + socket: &mut UdpSocket, + buffer: &mut [u8], + response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, + pending_scrape_responses: &mut PendingScrapeResponseSlab, + local_responses: Drain<(Response, CanonicalSocketAddr)>, +) { + for (response, addr) in local_responses { + send_response(state, config, socket, buffer, response, addr); + } + + for (response, addr) in response_receiver.try_iter() { + let opt_response = match response { + ConnectedResponse::Scrape(r) => pending_scrape_responses + .add_and_get_finished(r) + .map(Response::Scrape), + ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), + ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), + }; + + if let Some(response) = opt_response { + send_response(state, config, socket, buffer, response, addr); + } + } +} + +fn send_response( + state: &State, + config: &Config, + socket: &mut UdpSocket, + buffer: &mut [u8], + response: Response, + addr: CanonicalSocketAddr, +) { + let mut cursor = Cursor::new(buffer); + + let canonical_addr_is_ipv4 = addr.is_ipv4(); + + let addr = if config.network.address.is_ipv4() { + addr.get_ipv4() + .expect("found peer ipv6 address while running bound to ipv4 address") + } else { + addr.get_ipv6_mapped() + }; + + match response.write(&mut cursor) { + Ok(()) => { + let amt = cursor.position() as usize; + + match socket.send_to(&cursor.get_ref()[..amt], addr) { + Ok(amt) if config.statistics.active() => { + let stats = if canonical_addr_is_ipv4 { + &state.statistics_ipv4 + } else { + &state.statistics_ipv6 + }; + + stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); + + match response { + Response::Connect(_) => { + stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); + } + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats + .responses_sent_announce + .fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); + } + } + } + Ok(_) => {} + Err(err) => { + ::log::warn!("send_to error: {:#}", err); + } + } + } + Err(err) => { + ::log::error!("Response::write error: {:?}", err); + } + } +} diff --git a/aquatic_udp/src/workers/socket/storage.rs b/aquatic_udp/src/workers/socket/storage.rs new file mode 100644 index 0000000..4717daf --- /dev/null +++ b/aquatic_udp/src/workers/socket/storage.rs @@ -0,0 +1,221 @@ +use std::collections::BTreeMap; +use std::time::Instant; + +use hashbrown::HashMap; +use slab::Slab; + +use aquatic_common::ValidUntil; +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +#[derive(Debug)] +pub struct PendingScrapeResponseSlabEntry { + num_pending: usize, + valid_until: ValidUntil, + torrent_stats: BTreeMap, + transaction_id: TransactionId, +} + +#[derive(Default)] +pub struct PendingScrapeResponseSlab(Slab); + +impl PendingScrapeResponseSlab { + pub fn prepare_split_requests( + &mut self, + config: &Config, + request: ScrapeRequest, + valid_until: ValidUntil, + ) -> impl IntoIterator { + let capacity = config.request_workers.min(request.info_hashes.len()); + let mut split_requests: HashMap = + HashMap::with_capacity(capacity); + + if request.info_hashes.is_empty() { + ::log::warn!( + "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" + ); + + return split_requests; + } + + let vacant_entry = self.0.vacant_entry(); + let slab_key = vacant_entry.key(); + + for (i, info_hash) in request.info_hashes.into_iter().enumerate() { + let split_request = split_requests + .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) + .or_insert_with(|| PendingScrapeRequest { + slab_key, + info_hashes: BTreeMap::new(), + }); + + split_request.info_hashes.insert(i, info_hash); + } + + vacant_entry.insert(PendingScrapeResponseSlabEntry { + num_pending: split_requests.len(), + valid_until, + torrent_stats: Default::default(), + transaction_id: request.transaction_id, + }); + + split_requests + } + + pub fn add_and_get_finished( + &mut self, + response: PendingScrapeResponse, + ) -> Option { + let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { + entry.num_pending -= 1; + + entry + .torrent_stats + .extend(response.torrent_stats.into_iter()); + + entry.num_pending == 0 + } else { + ::log::warn!( + "PendingScrapeResponseSlab.add didn't find entry for key {:?}", + response.slab_key + ); + + false + }; + + if finished { + let entry = self.0.remove(response.slab_key); + + Some(ScrapeResponse { + transaction_id: entry.transaction_id, + torrent_stats: entry.torrent_stats.into_values().collect(), + }) + } else { + None + } + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|k, v| { + if v.valid_until.0 > now { + true + } else { + ::log::warn!( + "Unconsumed PendingScrapeResponseSlab entry. {:?}: {:?}", + k, + v + ); + + false + } + }); + + self.0.shrink_to_fit(); + } +} + +#[cfg(test)] +mod tests { + use quickcheck::TestResult; + use quickcheck_macros::quickcheck; + + use super::*; + + #[quickcheck] + fn test_pending_scrape_response_slab( + request_data: Vec<(i32, i64, u8)>, + request_workers: u8, + ) -> TestResult { + if request_workers == 0 { + return TestResult::discard(); + } + + let mut config = Config::default(); + + config.request_workers = request_workers as usize; + + let valid_until = ValidUntil::new(1); + + let mut map = PendingScrapeResponseSlab::default(); + + let mut requests = Vec::new(); + + for (t, c, b) in request_data { + if b == 0 { + return TestResult::discard(); + } + + let mut info_hashes = Vec::new(); + + for i in 0..b { + let info_hash = InfoHash([i; 20]); + + info_hashes.push(info_hash); + } + + let request = ScrapeRequest { + transaction_id: TransactionId(t), + connection_id: ConnectionId(c), + info_hashes, + }; + + requests.push(request); + } + + let mut all_split_requests = Vec::new(); + + for request in requests.iter() { + let split_requests = + map.prepare_split_requests(&config, request.to_owned(), valid_until); + + all_split_requests.push( + split_requests + .into_iter() + .collect::>(), + ); + } + + assert_eq!(map.0.len(), requests.len()); + + let mut responses = Vec::new(); + + for split_requests in all_split_requests { + for (worker_index, split_request) in split_requests { + assert!(worker_index.0 < request_workers as usize); + + let torrent_stats = split_request + .info_hashes + .into_iter() + .map(|(i, info_hash)| { + ( + i, + TorrentScrapeStatistics { + seeders: NumberOfPeers((info_hash.0[0]) as i32), + leechers: NumberOfPeers(0), + completed: NumberOfDownloads(0), + }, + ) + }) + .collect(); + + let response = PendingScrapeResponse { + slab_key: split_request.slab_key, + torrent_stats, + }; + + if let Some(response) = map.add_and_get_finished(response) { + responses.push(response); + } + } + } + + assert!(map.0.is_empty()); + assert_eq!(responses.len(), requests.len()); + + TestResult::from_bool(true) + } +}