diff --git a/Cargo.lock b/Cargo.lock index c8f0e2c..1c49d63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ dependencies = [ "rand", "serde", "smartstring", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -159,8 +159,12 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_common", "aquatic_udp_protocol", + "cfg-if", "crossbeam-channel", + "futures-lite", + "glommio", "hashbrown 0.11.2", + "hex", "histogram", "indexmap", "log", @@ -172,7 +176,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -208,7 +212,7 @@ dependencies = [ "rand", "rand_distr", "serde", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -244,7 +248,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "socket2", + "socket2 0.4.2", "tungstenite", ] @@ -306,7 +310,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -353,6 +357,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitmaps" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb" + [[package]] name = "block-buffer" version = "0.9.0" @@ -374,6 +384,12 @@ dependencies = [ "serde", ] +[[package]] +name = "buddy-alloc" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5" + [[package]] name = "bumpalo" version = "3.7.1" @@ -392,6 +408,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cast" version = "0.2.7" @@ -423,7 +445,7 @@ dependencies = [ "num-integer", "num-traits", "time", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -437,6 +459,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "console" version = "0.15.0" @@ -447,7 +478,7 @@ dependencies = [ "libc", "once_cell", "terminal_size", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -511,6 +542,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.1" @@ -541,10 +586,20 @@ dependencies = [ "cfg-if", "crossbeam-utils", "lazy_static", - "memoffset", + "memoffset 0.6.4", "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -592,6 +647,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "enclose" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1056f553da426e9c025a662efa48b52e62e0a3a7648aa2d15aeaaf7f0d329357" + [[package]] name = "encode_unicode" version = "0.3.6" @@ -630,6 +691,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "fastrand" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e" +dependencies = [ + "instant", +] + [[package]] name = "float-cmp" version = "0.8.0" @@ -670,6 +740,33 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-core" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" + +[[package]] +name = "futures-io" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" + +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -697,6 +794,37 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "glommio" +version = "0.6.0" +source = "git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5#4e6b14772da2f4325271fbcf12d24cf91ed466e5" +dependencies = [ + "ahash 0.7.6", + "bitflags", + "bitmaps", + "buddy-alloc", + "cc", + "concurrent-queue", + "crossbeam", + "enclose", + "futures-lite", + "intrusive-collections", + "lazy_static", + "libc", + "lockfree", + "log", + "membarrier", + "nix", + "pin-project-lite", + "rlimit", + "scoped-tls", + "scopeguard", + "smallvec", + "socket2 0.3.19", + "tracing", + "typenum", +] + [[package]] name = "half" version = "1.8.0" @@ -813,6 +941,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "intrusive-collections" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4ed164b4cf1c6bd6e18c097490331a0e58fbb0f39e8f6b5ac7f168006511cd" +dependencies = [ + "memoffset 0.5.6", +] + [[package]] name = "itertools" version = "0.10.1" @@ -837,6 +974,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -873,6 +1020,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockfree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" +dependencies = [ + "owned-alloc", +] + [[package]] name = "log" version = "0.4.14" @@ -888,12 +1044,33 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "membarrier" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925b0811d7e5fb2b666b5906c5047b7ec23aab78edc4d51b7b0f82dc5c955b1c" +dependencies = [ + "cfg-if", + "kernel32-sys", + "lazy_static", + "libc", +] + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.6.4" @@ -932,7 +1109,7 @@ dependencies = [ "log", "miow", "ntapi", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -941,7 +1118,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -972,7 +1149,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "memoffset", + "memoffset 0.6.4", ] [[package]] @@ -987,7 +1164,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1096,6 +1273,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owned-alloc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" + +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1118,7 +1307,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1127,6 +1316,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project-lite" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" + [[package]] name = "pkg-config" version = "0.3.20" @@ -1330,7 +1525,16 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "rlimit" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc0bf25554376fd362f54332b8410a625c71f15445bca32ffdfdf4ec9ac91726" +dependencies = [ + "libc", ] [[package]] @@ -1370,9 +1574,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi", + "winapi 0.3.9", ] +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1532,6 +1742,17 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if", + "libc", + "winapi 0.3.9", +] + [[package]] name = "socket2" version = "0.4.2" @@ -1539,7 +1760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1582,7 +1803,7 @@ dependencies = [ "rand", "redox_syscall", "remove_dir_all", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1601,7 +1822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1640,7 +1861,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1677,6 +1898,38 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + [[package]] name = "tungstenite" version = "0.15.0" @@ -1777,6 +2030,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.2" @@ -1784,7 +2043,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi", + "winapi 0.3.9", "winapi-util", ] @@ -1858,6 +2117,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1868,6 +2133,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1880,7 +2151,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] diff --git a/TODO.md b/TODO.md index ea65cc2..b290b67 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,11 @@ # TODO +* aquatic_udp glommio + * update access lists + * clean connections + * update peer valid until + * privdrop + * access lists: * use arc-swap Cache * add CI tests diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 93b8373..28e24e3 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -18,6 +18,12 @@ pub enum AccessListMode { Off, } +impl AccessListMode { + pub fn is_on(&self) -> bool { + !matches!(self, Self::Off) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccessListConfig { pub mode: AccessListMode, @@ -36,54 +42,71 @@ impl Default for AccessListConfig { } } -pub struct AccessList(ArcSwap>); +#[derive(Default)] +pub struct AccessList(HashSet<[u8; 20]>); -impl Default for AccessList { - fn default() -> Self { - Self(ArcSwap::from(Arc::new(HashSet::default()))) +impl AccessList { + pub fn insert_from_line(&mut self, line: &str) -> anyhow::Result<()> { + self.0.insert(parse_info_hash(line)?); + + Ok(()) + } + pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool { + match mode { + AccessListMode::White => self.0.contains(info_hash), + AccessListMode::Black => !self.0.contains(info_hash), + AccessListMode::Off => true, + } } } -impl AccessList { - fn parse_info_hash(line: String) -> anyhow::Result<[u8; 20]> { - let mut bytes = [0u8; 20]; +pub trait AccessListQuery { + fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>; + fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; +} - hex::decode_to_slice(line, &mut bytes)?; +pub type AccessListArcSwap = ArcSwap; - Ok(bytes) - } - - pub fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { +impl AccessListQuery for AccessListArcSwap { + fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { let file = File::open(path)?; let reader = BufReader::new(file); let mut new_list = HashSet::new(); for line in reader.lines() { - new_list.insert(Self::parse_info_hash(line?)?); + new_list.insert(parse_info_hash(&line?)?); } - self.0.store(Arc::new(new_list)); + self.store(Arc::new(AccessList(new_list))); Ok(()) } - pub fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { - match list_mode { - AccessListMode::White => self.0.load().contains(info_hash_bytes), - AccessListMode::Black => !self.0.load().contains(info_hash_bytes), + fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { + match mode { + AccessListMode::White => self.load().0.contains(info_hash_bytes), + AccessListMode::Black => !self.load().0.contains(info_hash_bytes), AccessListMode::Off => true, } } } +fn parse_info_hash(line: &str) -> anyhow::Result<[u8; 20]> { + let mut bytes = [0u8; 20]; + + hex::decode_to_slice(line, &mut bytes)?; + + Ok(bytes) +} + #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_info_hash() { - let f = AccessList::parse_info_hash; + let f = parse_info_hash; assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok()); assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err()); diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index e36e390..e4a9832 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{AccessList, AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; use either::Either; use hashbrown::HashMap; @@ -165,7 +165,7 @@ impl TorrentMaps { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrent_maps: Arc>, } diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 2662b2b..3f96ef2 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -31,7 +31,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { tasks::update_access_list(&config, &state); - state.torrent_maps.lock().clean(&config, &state.access_list); + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); } } diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index c14ed92..dd7d889 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use aquatic_http_protocol::request::Request; use hashbrown::HashMap; use log::{debug, error, info}; diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs index 3273ebd..341a434 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/tasks.rs @@ -1,6 +1,6 @@ use histogram::Histogram; -use aquatic_common::access_list::AccessListMode; +use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use crate::{common::*, config::Config}; diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 265b707..a79294c 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -14,13 +14,18 @@ path = "src/lib/lib.rs" [[bin]] name = "aquatic_udp" +[features] +with-glommio = ["glommio", "futures-lite"] + [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" +cfg-if = "1" crossbeam-channel = "0.5" hashbrown = "0.11.2" +hex = "0.4" histogram = "0.6" indexmap = "1" log = "0.4" @@ -32,6 +37,9 @@ rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4.1", features = ["all"] } +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } +futures-lite = { version = "1", optional = true } + [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_udp/src/lib/handlers/announce.rs b/aquatic_udp/src/lib/common/announce.rs similarity index 79% rename from aquatic_udp/src/lib/handlers/announce.rs rename to aquatic_udp/src/lib/common/announce.rs index 913a0d6..2a63b61 100644 --- a/aquatic_udp/src/lib/handlers/announce.rs +++ b/aquatic_udp/src/lib/common/announce.rs @@ -1,52 +1,10 @@ -use std::net::{IpAddr, SocketAddr}; -use std::vec::Drain; - -use parking_lot::MutexGuard; use rand::rngs::SmallRng; -use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; -use aquatic_udp_protocol::*; +use aquatic_common::extract_response_peers; use crate::common::*; -use crate::config::Config; -#[inline] -pub fn handle_announce_requests( - config: &Config, - torrents: &mut MutexGuard, - rng: &mut SmallRng, - requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - responses.extend(requests.map(|(request, src)| { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), - }; - - (ConnectedResponse::Announce(response), src) - })); -} - -fn handle_announce_request( +pub fn handle_announce_request( config: &Config, rng: &mut SmallRng, torrents: &mut TorrentMap, diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common/mod.rs similarity index 76% rename from aquatic_udp/src/lib/common.rs rename to aquatic_udp/src/lib/common/mod.rs index d733c52..8c84c3c 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,17 +1,19 @@ +use std::borrow::Borrow; use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; use hashbrown::HashMap; use indexmap::IndexMap; -use parking_lot::Mutex; pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_udp_protocol::*; use crate::config::Config; +pub mod announce; +pub mod network; + pub const MAX_PACKET_SIZE: usize = 4096; pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { @@ -30,25 +32,6 @@ impl Ip for Ipv6Addr { } } -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(ScrapeRequest), -} - -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape(ScrapeResponse), -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), - } - } -} - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, @@ -124,19 +107,19 @@ pub struct TorrentMaps { impl TorrentMaps { /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean>(&mut self, config: &Config, access_list: T) { let now = Instant::now(); let access_list_mode = config.access_list.mode; self.ipv4.retain(|info_hash, torrent| { - access_list.allows(access_list_mode, &info_hash.0) + access_list.borrow().allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv4.shrink_to_fit(); self.ipv6.retain(|info_hash, torrent| { - access_list.allows(access_list_mode, &info_hash.0) + access_list.borrow().allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv6.shrink_to_fit(); @@ -170,31 +153,6 @@ impl TorrentMaps { } } -#[derive(Default)] -pub struct Statistics { - pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, - pub bytes_received: AtomicUsize, - pub bytes_sent: AtomicUsize, -} - -#[derive(Clone)] -pub struct State { - pub access_list: Arc, - pub torrents: Arc>, - pub statistics: Arc, -} - -impl Default for State { - fn default() -> Self { - Self { - access_list: Arc::new(AccessList::default()), - torrents: Arc::new(Mutex::new(TorrentMaps::default())), - statistics: Arc::new(Statistics::default()), - } - } -} - #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs new file mode 100644 index 0000000..833c99f --- /dev/null +++ b/aquatic_udp/src/lib/common/network.rs @@ -0,0 +1,30 @@ +use std::{net::SocketAddr, time::Instant}; + +pub use aquatic_common::{access_list::AccessList, ValidUntil}; +pub use aquatic_udp_protocol::*; +use hashbrown::HashMap; + +#[derive(Default)] +pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); + +impl ConnectionMap { + pub fn insert( + &mut self, + connection_id: ConnectionId, + socket_addr: SocketAddr, + valid_until: ValidUntil, + ) { + self.0.insert((connection_id, socket_addr), valid_until); + } + + pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + self.0.contains_key(&(connection_id, socket_addr)) + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0 > now); + self.0.shrink_to_fit(); + } +} diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs new file mode 100644 index 0000000..5dd6eeb --- /dev/null +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -0,0 +1,32 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use futures_lite::AsyncBufReadExt; +use glommio::io::{BufferedFile, StreamReaderBuilder}; +use glommio::prelude::*; + +use crate::common::*; +use crate::config::Config; + +pub async fn update_access_list(config: Config, access_list: Rc>) { + if config.access_list.mode.is_on() { + let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); + + let mut reader = StreamReaderBuilder::new(access_list_file).build(); + + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + access_list.borrow_mut().insert_from_line(&buf); + } + Err(err) => { + break; + } + } + + yield_if_needed().await; + } + } +} diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs new file mode 100644 index 0000000..9e18322 --- /dev/null +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -0,0 +1,102 @@ +use std::cell::RefCell; +use std::net::{IpAddr, SocketAddr}; +use std::rc::Rc; +use std::time::Duration; + +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; +use rand::prelude::SmallRng; +use rand::SeedableRng; + +use crate::common::announce::handle_announce_request; +use crate::common::*; +use crate::config::Config; +use crate::glommio::common::update_access_list; + +pub async fn run_request_worker( + config: Config, + request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, +) { + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); + + let response_senders = Rc::new(response_senders); + + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let access_list = Rc::new(RefCell::new(AccessList::default())); + + // Periodically clean torrents and update access list + TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { + enclose!((config, torrents, access_list) move || async move { + update_access_list(config.clone(), access_list.clone()).await; + + torrents.borrow_mut().clean(&config, &*access_list.borrow()); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + let mut handles = Vec::new(); + + for (_, receiver) in request_receivers.streams() { + let handle = spawn_local(handle_request_stream( + config.clone(), + torrents.clone(), + response_senders.clone(), + receiver, + )) + .detach(); + + handles.push(handle); + } + + for handle in handles { + handle.await; + } +} + +async fn handle_request_stream( + config: Config, + torrents: Rc>, + response_senders: Rc>, + mut stream: S, +) where + S: Stream + ::std::marker::Unpin, +{ + let mut rng = SmallRng::from_entropy(); + + // Needs to be updated periodically: use timer? + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + while let Some((producer_index, request, addr)) = stream.next().await { + let response = match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv6, + request, + ip, + peer_valid_until, + ), + }; + + ::log::debug!("preparing to send response to channel: {:?}", response); + + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + + yield_if_needed().await; + } +} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs new file mode 100644 index 0000000..0da8ab9 --- /dev/null +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -0,0 +1,68 @@ +//! Work-in-progress glommio (io_uring) implementation +//! +//! * Doesn't support scrape requests +//! * Currently not faster than mio implementation + +use std::sync::{atomic::AtomicUsize, Arc}; + +use glommio::channels::channel_mesh::MeshBuilder; +use glommio::prelude::*; + +use crate::config::Config; + +mod common; +pub mod handlers; +pub mod network; + +pub const SHARED_CHANNEL_SIZE: usize = 4096; + +pub fn run(config: Config) -> anyhow::Result<()> { + let num_peers = config.socket_workers + config.request_workers; + + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + let mut executors = Vec::new(); + + for _ in 0..(config.socket_workers) { + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + let executor = LocalExecutorBuilder::default().spawn(|| async move { + network::run_socket_worker( + config, + request_mesh_builder, + response_mesh_builder, + num_bound_sockets, + ) + .await + }); + + executors.push(executor); + } + + for _ in 0..(config.request_workers) { + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + + let executor = LocalExecutorBuilder::default().spawn(|| async move { + handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await + }); + + executors.push(executor); + } + + for executor in executors { + executor + .expect("failed to spawn local executor") + .join() + .unwrap(); + } + + Ok(()) +} diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs new file mode 100644 index 0000000..93d5f04 --- /dev/null +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -0,0 +1,207 @@ +use std::io::Cursor; +use std::net::{IpAddr, SocketAddr}; +use std::rc::Rc; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::channels::local_channel::{new_unbounded, LocalSender}; +use glommio::net::UdpSocket; +use glommio::prelude::*; +use rand::prelude::{Rng, SeedableRng, StdRng}; + +use aquatic_udp_protocol::{IpVersion, Request, Response}; + +use crate::common::network::ConnectionMap; +use crate::common::*; +use crate::config::Config; + +pub async fn run_socket_worker( + config: Config, + request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + num_bound_sockets: Arc, +) { + let (local_sender, local_receiver) = new_unbounded(); + + let mut socket = UdpSocket::bind(config.network.address).unwrap(); + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + socket.set_buffer_size(recv_buffer_size); + } + + let socket = Rc::new(socket); + + num_bound_sockets.fetch_add(1, Ordering::SeqCst); + + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + + let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + + let response_consumer_index = response_receivers.consumer_id().unwrap(); + + spawn_local(read_requests( + config.clone(), + request_senders, + response_consumer_index, + local_sender, + socket.clone(), + )) + .detach(); + + for (_, receiver) in response_receivers.streams().into_iter() { + spawn_local(send_responses( + socket.clone(), + receiver.map(|(response, addr)| (response.into(), addr)), + )) + .detach(); + } + + send_responses(socket, local_receiver.stream()).await; +} + +async fn read_requests( + config: Config, + request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>, + response_consumer_index: usize, + local_sender: LocalSender<(Response, SocketAddr)>, + socket: Rc, +) { + let mut rng = StdRng::from_entropy(); + + let access_list_mode = config.access_list.mode; + + // Needs to be updated periodically: use timer? + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + // Needs to be updated periodically: use timer? + let access_list = AccessList::default(); + // Needs to be cleaned periodically: use timer? + let mut connections = ConnectionMap::default(); + + let mut buf = [0u8; 2048]; + + loop { + match socket.recv_from(&mut buf).await { + Ok((amt, src)) => { + let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); + + ::log::debug!("read request: {:?}", request); + + match request { + Ok(Request::Connect(request)) => { + let connection_id = ConnectionId(rng.gen()); + + connections.insert(connection_id, src, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_sender.try_send((response, src)).unwrap(); + } + Ok(Request::Announce(request)) => { + if connections.contains(request.connection_id, src) { + if access_list.allows(access_list_mode, &request.info_hash.0) { + let request_consumer_index = + (request.info_hash.0[0] as usize) % config.request_workers; + + if let Err(err) = request_senders.try_send_to( + request_consumer_index, + (response_consumer_index, request, src), + ) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_sender.try_send((response, src)).unwrap(); + } + } + } + Ok(Request::Scrape(request)) => { + if connections.contains(request.connection_id, src) { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Scrape requests not supported".into(), + }); + + local_sender.try_send((response, src)).unwrap(); + } + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connections.contains(connection_id, src) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_sender.try_send((response.into(), src)).unwrap(); + } + } + } + } + } + Err(err) => { + ::log::error!("recv_from: {:?}", err); + } + } + + yield_if_needed().await; + } +} + +async fn send_responses(socket: Rc, mut stream: S) +where + S: Stream + ::std::marker::Unpin, +{ + let mut buf = [0u8; MAX_PACKET_SIZE]; + let mut buf = Cursor::new(&mut buf[..]); + + while let Some((response, src)) = stream.next().await { + buf.set_position(0); + + ::log::debug!("preparing to send response: {:?}", response.clone()); + + response + .write(&mut buf, ip_version_from_ip(src.ip())) + .expect("write response"); + + let position = buf.position() as usize; + + if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await { + ::log::info!("send_to failed: {:?}", err); + } + + yield_if_needed().await; + } +} + +fn ip_version_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => IpVersion::IPv4, + IpAddr::V6(ip) => { + if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { + IpVersion::IPv4 + } else { + IpVersion::IPv6 + } + } + } +} diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index a740a3e..187d563 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,121 +1,21 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::thread::Builder; -use std::time::Duration; - -use anyhow::Context; -use crossbeam_channel::unbounded; -use privdrop::PrivDrop; +use cfg_if::cfg_if; pub mod common; pub mod config; -pub mod handlers; -pub mod network; -pub mod tasks; +#[cfg(all(feature = "with-glommio", target_os = "linux"))] +pub mod glommio; +pub mod mio; -use common::State; use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::default(); - - tasks::update_access_list(&config, &state.access_list); - - let num_bound_sockets = start_workers(config.clone(), state.clone())?; - - if config.privileges.drop_privileges { - let mut counter = 0usize; - - loop { - let sockets = num_bound_sockets.load(Ordering::SeqCst); - - if sockets == config.socket_workers { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); - } + cfg_if! { + if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { + glommio::run(config) + } else { + mio::run(config) } } - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - tasks::update_access_list(&config, &state.access_list); - - state.torrents.lock().clean(&config, &state.access_list); - } -} - -pub fn start_workers(config: Config, state: State) -> ::anyhow::Result> { - let (request_sender, request_receiver) = unbounded(); - let (response_sender, response_receiver) = unbounded(); - - for i in 0..config.request_workers { - let state = state.clone(); - let config = config.clone(); - let request_receiver = request_receiver.clone(); - let response_sender = response_sender.clone(); - - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - handlers::run_request_worker(state, config, request_receiver, response_sender) - }) - .with_context(|| "spawn request worker")?; - } - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - for i in 0..config.socket_workers { - let state = state.clone(); - let config = config.clone(); - let request_sender = request_sender.clone(); - let response_receiver = response_receiver.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - network::run_socket_worker( - state, - config, - i, - request_sender, - response_receiver, - num_bound_sockets, - ) - }) - .with_context(|| "spawn socket worker")?; - } - - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); - - Builder::new() - .name("statistics-collector".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::gather_and_print_statistics(&state, &config); - }) - .with_context(|| "spawn statistics worker")?; - } - - Ok(num_bound_sockets) } diff --git a/aquatic_udp/src/lib/mio/common.rs b/aquatic_udp/src/lib/mio/common.rs new file mode 100644 index 0000000..8bf2233 --- /dev/null +++ b/aquatic_udp/src/lib/mio/common.rs @@ -0,0 +1,49 @@ +use aquatic_common::access_list::AccessListArcSwap; +use parking_lot::Mutex; +use std::sync::{atomic::AtomicUsize, Arc}; + +use crate::common::*; + +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + +#[derive(Default)] +pub struct Statistics { + pub requests_received: AtomicUsize, + pub responses_sent: AtomicUsize, + pub bytes_received: AtomicUsize, + pub bytes_sent: AtomicUsize, +} + +#[derive(Clone)] +pub struct State { + pub access_list: Arc, + pub torrents: Arc>, + pub statistics: Arc, +} + +impl Default for State { + fn default() -> Self { + Self { + access_list: Arc::new(AccessListArcSwap::default()), + torrents: Arc::new(Mutex::new(TorrentMaps::default())), + statistics: Arc::new(Statistics::default()), + } + } +} diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs new file mode 100644 index 0000000..549d061 --- /dev/null +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -0,0 +1,49 @@ +use std::net::{IpAddr, SocketAddr}; +use std::vec::Drain; + +use parking_lot::MutexGuard; +use rand::rngs::SmallRng; + +use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_udp_protocol::*; + +use crate::common::announce::handle_announce_request; +use crate::common::*; +use crate::config::Config; +use crate::mio::common::*; + +#[inline] +pub fn handle_announce_requests( + config: &Config, + torrents: &mut MutexGuard, + rng: &mut SmallRng, + requests: Drain<(AnnounceRequest, SocketAddr)>, + responses: &mut Vec<(ConnectedResponse, SocketAddr)>, +) { + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + responses.extend(requests.map(|(request, src)| { + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + let response = match peer_ip { + IpAddr::V4(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + }; + + (ConnectedResponse::Announce(response), src) + })); +} diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs similarity index 99% rename from aquatic_udp/src/lib/handlers/mod.rs rename to aquatic_udp/src/lib/mio/handlers/mod.rs index 0634702..af8e5a8 100644 --- a/aquatic_udp/src/lib/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; -use crate::common::*; use crate::config::Config; +use crate::mio::common::*; mod announce; mod scrape; diff --git a/aquatic_udp/src/lib/handlers/scrape.rs b/aquatic_udp/src/lib/mio/handlers/scrape.rs similarity index 98% rename from aquatic_udp/src/lib/handlers/scrape.rs rename to aquatic_udp/src/lib/mio/handlers/scrape.rs index b544ccf..b1a6742 100644 --- a/aquatic_udp/src/lib/handlers/scrape.rs +++ b/aquatic_udp/src/lib/mio/handlers/scrape.rs @@ -6,6 +6,8 @@ use parking_lot::MutexGuard; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; +use crate::mio::common::*; + use crate::common::*; #[inline] diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs new file mode 100644 index 0000000..bf863ee --- /dev/null +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -0,0 +1,142 @@ +use std::thread::Builder; +use std::time::Duration; +use std::{ + ops::Deref, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use anyhow::Context; +use crossbeam_channel::unbounded; +use privdrop::PrivDrop; + +pub mod common; +pub mod handlers; +pub mod network; +pub mod tasks; + +use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; + +use crate::config::Config; + +use common::State; + +pub fn run(config: Config) -> ::anyhow::Result<()> { + let state = State::default(); + + update_access_list(&config, &state.access_list); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; + + if config.privileges.drop_privileges { + let mut counter = 0usize; + + loop { + let sockets = num_bound_sockets.load(Ordering::SeqCst); + + if sockets == config.socket_workers { + PrivDrop::default() + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) + .apply()?; + + break; + } + + ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } + } + } + + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + update_access_list(&config, &state.access_list); + + state + .torrents + .lock() + .clean(&config, state.access_list.load_full().deref()); + } +} + +pub fn start_workers( + config: Config, + state: State, + num_bound_sockets: Arc, +) -> ::anyhow::Result<()> { + let (request_sender, request_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + + for i in 0..config.request_workers { + let state = state.clone(); + let config = config.clone(); + let request_receiver = request_receiver.clone(); + let response_sender = response_sender.clone(); + + Builder::new() + .name(format!("request-{:02}", i + 1)) + .spawn(move || { + handlers::run_request_worker(state, config, request_receiver, response_sender) + }) + .with_context(|| "spawn request worker")?; + } + + for i in 0..config.socket_workers { + let state = state.clone(); + let config = config.clone(); + let request_sender = request_sender.clone(); + let response_receiver = response_receiver.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + network::run_socket_worker( + state, + config, + i, + request_sender, + response_receiver, + num_bound_sockets, + ) + }) + .with_context(|| "spawn socket worker")?; + } + + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new() + .name("statistics-collector".to_string()) + .spawn(move || loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::gather_and_print_statistics(&state, &config); + }) + .with_context(|| "spawn statistics worker")?; + } + + Ok(()) +} + +pub fn update_access_list(config: &Config, access_list: &Arc) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = access_list.update_from_path(&config.access_list.path) { + ::log::error!("Update access list from path: {:?}", err); + } + } + AccessListMode::Off => {} + } +} diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/mio/network.rs similarity index 93% rename from aquatic_udp/src/lib/network.rs rename to aquatic_udp/src/lib/mio/network.rs index da6bd2a..fe34023 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -4,11 +4,11 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use crossbeam_channel::{Receiver, Sender}; -use hashbrown::HashMap; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{Rng, SeedableRng, StdRng}; @@ -16,33 +16,11 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; -#[derive(Default)] -struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); - -impl ConnectionMap { - fn insert( - &mut self, - connection_id: ConnectionId, - socket_addr: SocketAddr, - valid_until: ValidUntil, - ) { - self.0.insert((connection_id, socket_addr), valid_until); - } - - fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { - self.0.contains_key(&(connection_id, socket_addr)) - } - - fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.0 > now); - self.0.shrink_to_fit(); - } -} +use super::common::*; pub fn run_socket_worker( state: State, diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/mio/tasks.rs similarity index 84% rename from aquatic_udp/src/lib/tasks.rs rename to aquatic_udp/src/lib/mio/tasks.rs index 2665c45..4d9fe16 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/mio/tasks.rs @@ -1,24 +1,10 @@ use std::sync::atomic::Ordering; -use std::sync::Arc; use histogram::Histogram; -use aquatic_common::access_list::AccessListMode; - -use crate::common::*; +use super::common::*; use crate::config::Config; -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} - } -} - pub fn gather_and_print_statistics(state: &State, config: &Config) { let interval = config.statistics.interval; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 12b35e3..d71f77c 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -8,6 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index c89de8b..28c210e 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -15,7 +15,8 @@ use std::time::Duration; use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::handlers; +use aquatic_udp::mio::common::*; +use aquatic_udp::mio::handlers; use config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 7b62152..a7d5c18 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -8,6 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 4f30d8b..3d6b1df 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{AccessList, AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -136,7 +136,7 @@ impl TorrentMaps { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrent_maps: Arc>, } diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index f6ee599..6141466 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -33,7 +33,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { tasks::update_access_list(&config, &state); - state.torrent_maps.lock().clean(&config, &state.access_list); + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); } } diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index f12859d..bbb293d 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -2,6 +2,7 @@ use std::io::ErrorKind; use std::time::Duration; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use crossbeam_channel::Receiver; use hashbrown::HashMap; use log::{debug, error, info}; diff --git a/aquatic_ws/src/lib/tasks.rs b/aquatic_ws/src/lib/tasks.rs index 55cb62d..f96426c 100644 --- a/aquatic_ws/src/lib/tasks.rs +++ b/aquatic_ws/src/lib/tasks.rs @@ -1,4 +1,4 @@ -use aquatic_common::access_list::AccessListMode; +use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use histogram::Histogram; use crate::common::*;