From f2b157a1490c9de6d07adb010832413f845b008b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 22:31:56 +0200 Subject: [PATCH 01/21] aquatic_udp: split some code into mio and glommio versions --- Cargo.lock | 311 ++++++++++++++++-- aquatic_udp/Cargo.toml | 3 + aquatic_udp/src/lib/common.rs | 27 +- aquatic_udp/src/lib/glommio/mod.rs | 1 + aquatic_udp/src/lib/glommio/network.rs | 200 +++++++++++ aquatic_udp/src/lib/lib.rs | 8 +- .../src/lib/{ => mio}/handlers/announce.rs | 0 aquatic_udp/src/lib/{ => mio}/handlers/mod.rs | 0 .../src/lib/{ => mio}/handlers/scrape.rs | 0 aquatic_udp/src/lib/mio/mod.rs | 2 + aquatic_udp/src/lib/{ => mio}/network.rs | 26 -- aquatic_udp_bench/src/main.rs | 2 +- 12 files changed, 527 insertions(+), 53 deletions(-) create mode 100644 aquatic_udp/src/lib/glommio/mod.rs create mode 100644 aquatic_udp/src/lib/glommio/network.rs rename aquatic_udp/src/lib/{ => mio}/handlers/announce.rs (100%) rename aquatic_udp/src/lib/{ => mio}/handlers/mod.rs (100%) rename aquatic_udp/src/lib/{ => mio}/handlers/scrape.rs (100%) create mode 100644 aquatic_udp/src/lib/mio/mod.rs rename aquatic_udp/src/lib/{ => mio}/network.rs (93%) diff --git a/Cargo.lock b/Cargo.lock index c8f0e2c..1814d40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ dependencies = [ "rand", "serde", "smartstring", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -160,6 +160,8 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "crossbeam-channel", + "futures-lite", + "glommio", "hashbrown 0.11.2", "histogram", "indexmap", @@ -172,7 +174,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -208,7 +210,7 @@ dependencies = [ "rand", "rand_distr", "serde", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -244,7 +246,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "socket2", + "socket2 0.4.2", "tungstenite", ] @@ -306,7 +308,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -353,6 +355,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitmaps" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb" + [[package]] name = "block-buffer" version = "0.9.0" @@ -374,6 +382,12 @@ dependencies = [ "serde", ] +[[package]] +name = "buddy-alloc" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5" + [[package]] name = "bumpalo" version = "3.7.1" @@ -392,6 +406,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cast" version = "0.2.7" @@ -423,7 +443,7 @@ dependencies = [ "num-integer", "num-traits", "time", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -437,6 +457,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "console" version = "0.15.0" @@ -447,7 +476,7 @@ dependencies = [ "libc", "once_cell", "terminal_size", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -511,6 +540,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.1" @@ -541,10 +584,20 @@ dependencies = [ "cfg-if", "crossbeam-utils", "lazy_static", - "memoffset", + "memoffset 0.6.4", "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -592,6 +645,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "enclose" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1056f553da426e9c025a662efa48b52e62e0a3a7648aa2d15aeaaf7f0d329357" + [[package]] name = "encode_unicode" version = "0.3.6" @@ -630,6 +689,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "fastrand" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e" +dependencies = [ + "instant", +] + [[package]] name = "float-cmp" version = "0.8.0" @@ -670,6 +738,33 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-core" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" + +[[package]] +name = "futures-io" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" + +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -697,6 +792,37 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "glommio" +version = "0.6.0" +source = "git+https://github.com/DataDog/glommio.git#4e6b14772da2f4325271fbcf12d24cf91ed466e5" +dependencies = [ + "ahash 0.7.6", + "bitflags", + "bitmaps", + "buddy-alloc", + "cc", + "concurrent-queue", + "crossbeam", + "enclose", + "futures-lite", + "intrusive-collections", + "lazy_static", + "libc", + "lockfree", + "log", + "membarrier", + "nix", + "pin-project-lite", + "rlimit", + "scoped-tls", + "scopeguard", + "smallvec", + "socket2 0.3.19", + "tracing", + "typenum", +] + [[package]] name = "half" version = "1.8.0" @@ -813,6 +939,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "intrusive-collections" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4ed164b4cf1c6bd6e18c097490331a0e58fbb0f39e8f6b5ac7f168006511cd" +dependencies = [ + "memoffset 0.5.6", +] + [[package]] name = "itertools" version = "0.10.1" @@ -837,6 +972,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -873,6 +1018,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockfree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" +dependencies = [ + "owned-alloc", +] + [[package]] name = "log" version = "0.4.14" @@ -888,12 +1042,33 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "membarrier" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925b0811d7e5fb2b666b5906c5047b7ec23aab78edc4d51b7b0f82dc5c955b1c" +dependencies = [ + "cfg-if", + "kernel32-sys", + "lazy_static", + "libc", +] + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.6.4" @@ -932,7 +1107,7 @@ dependencies = [ "log", "miow", "ntapi", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -941,7 +1116,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -972,7 +1147,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "memoffset", + "memoffset 0.6.4", ] [[package]] @@ -987,7 +1162,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1096,6 +1271,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owned-alloc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" + +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1118,7 +1305,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1127,6 +1314,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project-lite" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" + [[package]] name = "pkg-config" version = "0.3.20" @@ -1330,7 +1523,16 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "rlimit" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc0bf25554376fd362f54332b8410a625c71f15445bca32ffdfdf4ec9ac91726" +dependencies = [ + "libc", ] [[package]] @@ -1370,9 +1572,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi", + "winapi 0.3.9", ] +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1532,6 +1740,17 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if", + "libc", + "winapi 0.3.9", +] + [[package]] name = "socket2" version = "0.4.2" @@ -1539,7 +1758,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1582,7 +1801,7 @@ dependencies = [ "rand", "redox_syscall", "remove_dir_all", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1601,7 +1820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1640,7 +1859,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1677,6 +1896,38 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + [[package]] name = "tungstenite" version = "0.15.0" @@ -1777,6 +2028,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.2" @@ -1784,7 +2041,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi", + "winapi 0.3.9", "winapi-util", ] @@ -1858,6 +2115,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1868,6 +2131,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1880,7 +2149,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 265b707..8eb57c0 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -32,6 +32,9 @@ rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4.1", features = ["all"] } +glommio = { git = "https://github.com/DataDog/glommio.git" } +futures-lite = "1" + [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index d733c52..5a81e46 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -1,5 +1,5 @@ use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; @@ -195,6 +195,31 @@ impl Default for State { } } +#[derive(Default)] +pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); + +impl ConnectionMap { + pub fn insert( + &mut self, + connection_id: ConnectionId, + socket_addr: SocketAddr, + valid_until: ValidUntil, + ) { + self.0.insert((connection_id, socket_addr), valid_until); + } + + pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + self.0.contains_key(&(connection_id, socket_addr)) + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0 > now); + self.0.shrink_to_fit(); + } +} + #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs new file mode 100644 index 0000000..a61610b --- /dev/null +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -0,0 +1 @@ +pub mod network; diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs new file mode 100644 index 0000000..2cd9138 --- /dev/null +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -0,0 +1,200 @@ +/// TODO +/// - move connection checks to socket workers +/// - ignore scrape requests. forward announce requests to request workers +/// sharded by info hash (with some nice algo to make it difficult for an +/// attacker to know which one they get forwarded to). this way, shared +/// state can be avoided. +use std::io::Cursor; +use std::net::{IpAddr, SocketAddr}; +use std::rc::Rc; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use futures_lite::StreamExt; +use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; +use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; +use glommio::net::UdpSocket; +use glommio::prelude::*; +use rand::prelude::{Rng, SeedableRng, StdRng}; + +use aquatic_udp_protocol::{IpVersion, Request, Response}; + +use crate::common::*; +use crate::config::Config; + +pub fn run_socket_worker( + state: State, + config: Config, + request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, + response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + num_bound_sockets: Arc, +) { + LocalExecutorBuilder::default() + .spawn(|| async move { + let (local_sender, local_receiver) = new_unbounded(); + + let mut socket = UdpSocket::bind(config.network.address).unwrap(); + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + socket.set_buffer_size(recv_buffer_size); + } + + let socket = Rc::new(socket); + + num_bound_sockets.fetch_add(1, Ordering::SeqCst); + + spawn_local(read_requests( + config.clone(), + state.access_list.clone(), + request_sender, + local_sender, + socket.clone(), + )) + .await; + spawn_local(send_responses(response_receiver, local_receiver, socket)).await; + }) + .expect("failed to spawn local executor") + .join() + .unwrap(); +} + +async fn read_requests( + config: Config, + access_list: Arc, + request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, + local_sender: LocalSender<(Response, SocketAddr)>, + socket: Rc, +) { + let request_sender = request_sender.connect().await; + + let mut rng = StdRng::from_entropy(); + + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + let access_list_mode = config.access_list.mode; + + let mut connections = ConnectionMap::default(); + + let mut buf = [0u8; 2048]; + + loop { + match socket.recv_from(&mut buf).await { + Ok((amt, src)) => { + let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); + + match request { + Ok(Request::Connect(request)) => { + let connection_id = ConnectionId(rng.gen()); + + connections.insert(connection_id, src, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_sender.try_send((response, src)); + } + Ok(Request::Announce(request)) => { + if connections.contains(request.connection_id, src) { + if access_list.allows(access_list_mode, &request.info_hash.0) { + if let Err(err) = request_sender + .try_send((ConnectedRequest::Announce(request), src)) + { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_sender.try_send((response, src)); + } + } + } + Ok(Request::Scrape(request)) => { + if connections.contains(request.connection_id, src) { + if let Err(err) = + request_sender.try_send((ConnectedRequest::Scrape(request), src)) + { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connections.contains(connection_id, src) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_sender.try_send((response.into(), src)); + } + } + } + } + } + Err(err) => { + ::log::error!("recv_from: {:?}", err); + } + } + + yield_if_needed().await; + } +} + +async fn send_responses( + response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + local_receiver: LocalReceiver<(Response, SocketAddr)>, + socket: Rc, +) { + let response_receiver = response_receiver.connect().await; + + let mut buf = [0u8; MAX_PACKET_SIZE]; + let mut buf = Cursor::new(&mut buf[..]); + + let mut stream = local_receiver + .stream() + .race(response_receiver.map(|(response, addr)| (response.into(), addr))); + + while let Some((response, src)) = stream.next().await { + buf.set_position(0); + + response + .write(&mut buf, ip_version_from_ip(src.ip())) + .expect("write response"); + + let position = buf.position() as usize; + + if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await { + ::log::info!("send_to failed: {:?}", err); + } + + yield_if_needed().await; + } +} + +fn ip_version_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => IpVersion::IPv4, + IpAddr::V6(ip) => { + if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { + IpVersion::IPv4 + } else { + IpVersion::IPv6 + } + } + } +} diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index a740a3e..c1aa9e7 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -11,8 +11,8 @@ use privdrop::PrivDrop; pub mod common; pub mod config; -pub mod handlers; -pub mod network; +pub mod glommio; +pub mod mio; pub mod tasks; use common::State; @@ -74,7 +74,7 @@ pub fn start_workers(config: Config, state: State) -> ::anyhow::Result ::anyhow::Result); - -impl ConnectionMap { - fn insert( - &mut self, - connection_id: ConnectionId, - socket_addr: SocketAddr, - valid_until: ValidUntil, - ) { - self.0.insert((connection_id, socket_addr), valid_until); - } - - fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { - self.0.contains_key(&(connection_id, socket_addr)) - } - - fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.0 > now); - self.0.shrink_to_fit(); - } -} - pub fn run_socket_worker( state: State, config: Config, diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index c89de8b..7b3f75c 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -15,7 +15,7 @@ use std::time::Duration; use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::handlers; +use aquatic_udp::mio::handlers; use config::BenchConfig; From 80754ab4ad94b81217c1727e548609539e9aaece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 23:17:42 +0200 Subject: [PATCH 02/21] aquatic_udp: start work on announce handler in glommio version --- aquatic_udp/src/lib/common/announce.rs | 178 ++++++++++++++++++ .../src/lib/{common.rs => common/mod.rs} | 3 + aquatic_udp/src/lib/common/network.rs | 0 aquatic_udp/src/lib/glommio/handlers.rs | 49 +++++ aquatic_udp/src/lib/glommio/mod.rs | 1 + aquatic_udp/src/lib/glommio/network.rs | 21 ++- aquatic_udp/src/lib/mio/handlers/announce.rs | 176 +---------------- 7 files changed, 244 insertions(+), 184 deletions(-) create mode 100644 aquatic_udp/src/lib/common/announce.rs rename aquatic_udp/src/lib/{common.rs => common/mod.rs} (99%) create mode 100644 aquatic_udp/src/lib/common/network.rs create mode 100644 aquatic_udp/src/lib/glommio/handlers.rs diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/announce.rs new file mode 100644 index 0000000..4a71787 --- /dev/null +++ b/aquatic_udp/src/lib/common/announce.rs @@ -0,0 +1,178 @@ +use rand::rngs::SmallRng; + +use aquatic_common::extract_response_peers; + +use crate::common::*; + +pub fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMap, + request: AnnounceRequest, + peer_ip: I, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + let peer_key = PeerMapKey { + ip: peer_ip, + peer_id: request.peer_id, + }; + + let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: peer_status, + valid_until: peer_valid_until, + }; + + let torrent_data = torrents.entry(request.info_hash).or_default(); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); + + let response_peers = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_key, + Peer::to_response_peer, + ); + + AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), + leechers: NumberOfPeers(torrent_data.num_leechers as i32), + seeders: NumberOfPeers(torrent_data.num_seeders as i32), + peers: response_peers, + } +} + +#[inline] +fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { + if peers_wanted <= 0 { + config.protocol.max_response_peers as usize + } else { + ::std::cmp::min( + config.protocol.max_response_peers as usize, + peers_wanted as usize, + ) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::Ipv4Addr; + + use indexmap::IndexMap; + use quickcheck::{quickcheck, TestResult}; + use rand::thread_rng; + + use super::*; + + fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { + let ip_address = Ipv4Addr::from(i.to_be_bytes()); + let peer_id = PeerId([0; 20]); + + let key = PeerMapKey { + ip: ip_address, + peer_id, + }; + let value = Peer { + ip_address, + port: Port(1), + status: PeerStatus::Leeching, + valid_until: ValidUntil::new(0), + }; + + (key, value) + } + + #[test] + fn test_extract_response_peers() { + fn prop(data: (u16, u16)) -> TestResult { + let gen_num_peers = data.0 as u32; + let req_num_peers = data.1 as usize; + + let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); + + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + + for i in 0..gen_num_peers { + let (key, value) = gen_peer_map_key_and_value((i << 16) + i); + + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(value.to_response_peer()); + } + + peer_map.insert(key, value); + } + + let mut rng = thread_rng(); + + let peers = extract_response_peers( + &mut rng, + &peer_map, + req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), + Peer::to_response_peer, + ); + + // Check that number of returned peers is correct + + let mut success = peers.len() <= req_num_peers; + + if req_num_peers >= gen_num_peers as usize { + success &= peers.len() == gen_num_peers as usize + || peers.len() + 1 == gen_num_peers as usize; + } + + // Check that returned peers are unique (no overlap) and that sender + // isn't returned + + let mut ip_addresses = HashSet::with_capacity(peers.len()); + + for peer in peers { + if peer == opt_sender_peer.clone().unwrap() + || ip_addresses.contains(&peer.ip_address) + { + success = false; + + break; + } + + ip_addresses.insert(peer.ip_address); + } + + TestResult::from_bool(success) + } + + quickcheck(prop as fn((u16, u16)) -> TestResult); + } +} \ No newline at end of file diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common/mod.rs similarity index 99% rename from aquatic_udp/src/lib/common.rs rename to aquatic_udp/src/lib/common/mod.rs index 5a81e46..5662e1f 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -12,6 +12,9 @@ pub use aquatic_udp_protocol::*; use crate::config::Config; +pub mod announce; +pub mod network; + pub const MAX_PACKET_SIZE: usize = 4096; pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs new file mode 100644 index 0000000..e69de29 diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs new file mode 100644 index 0000000..771f59b --- /dev/null +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -0,0 +1,49 @@ +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +use glommio::prelude::*; +use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; +use rand::SeedableRng; +use rand::prelude::SmallRng; + +use crate::config::Config; +use crate::common::*; +use crate::common::announce::handle_announce_request; + +pub fn run_request_worker( + config: Config, + request_receiver: SharedReceiver<(AnnounceRequest, SocketAddr)>, + response_sender: SharedSender<(AnnounceResponse, SocketAddr)>, +) { + LocalExecutorBuilder::default() + .spawn(|| async move { + let request_receiver = request_receiver.connect().await; + let response_sender = response_sender.connect().await; + + let mut rng = SmallRng::from_entropy(); + + let mut torrents_ipv4 = TorrentMap::::default(); + let mut torrents_ipv6 = TorrentMap::::default(); + + // Needs to be updated periodically + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + while let Some((request, addr)) = request_receiver.recv().await { + let response = match addr.ip() { + IpAddr::V4(ip) => { + handle_announce_request(&config, &mut rng, &mut torrents_ipv4, request, ip, peer_valid_until) + }, + IpAddr::V6(ip) => { + handle_announce_request(&config, &mut rng, &mut torrents_ipv6, request, ip, peer_valid_until) + }, + }; + + if let Err(err) = response_sender.try_send((response, addr)) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + } + + }) + .expect("failed to spawn local executor") + .join() + .unwrap(); +} \ No newline at end of file diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index a61610b..030b2ee 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1 +1,2 @@ +pub mod handlers; pub mod network; diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 2cd9138..1a934e7 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -27,8 +27,8 @@ use crate::config::Config; pub fn run_socket_worker( state: State, config: Config, - request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, - response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, + response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, num_bound_sockets: Arc, ) { LocalExecutorBuilder::default() @@ -65,7 +65,7 @@ pub fn run_socket_worker( async fn read_requests( config: Config, access_list: Arc, - request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, + request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, ) { @@ -102,7 +102,7 @@ async fn read_requests( if connections.contains(request.connection_id, src) { if access_list.allows(access_list_mode, &request.info_hash.0) { if let Err(err) = request_sender - .try_send((ConnectedRequest::Announce(request), src)) + .try_send((request, src)) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -118,11 +118,12 @@ async fn read_requests( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Scrape(request), src)) - { - ::log::warn!("request_sender.try_send failed: {:?}", err) - } + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Scrape requests not supported".into(), + }); + + local_sender.try_send((response, src)); } } Err(err) => { @@ -156,7 +157,7 @@ async fn read_requests( } async fn send_responses( - response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, local_receiver: LocalReceiver<(Response, SocketAddr)>, socket: Rc, ) { diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 913a0d6..43406f5 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -4,10 +4,11 @@ use std::vec::Drain; use parking_lot::MutexGuard; use rand::rngs::SmallRng; -use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; +use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; use crate::common::*; +use crate::common::announce::handle_announce_request; use crate::config::Config; #[inline] @@ -45,176 +46,3 @@ pub fn handle_announce_requests( (ConnectedResponse::Announce(response), src) })); } - -fn handle_announce_request( - config: &Config, - rng: &mut SmallRng, - torrents: &mut TorrentMap, - request: AnnounceRequest, - peer_ip: I, - peer_valid_until: ValidUntil, -) -> AnnounceResponse { - let peer_key = PeerMapKey { - ip: peer_ip, - peer_id: request.peer_id, - }; - - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); - - let peer = Peer { - ip_address: peer_ip, - port: request.port, - status: peer_status, - valid_until: peer_valid_until, - }; - - let torrent_data = torrents.entry(request.info_hash).or_default(); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_key, - Peer::to_response_peer, - ); - - AnnounceResponse { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), - peers: response_peers, - } -} - -#[inline] -fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { - if peers_wanted <= 0 { - config.protocol.max_response_peers as usize - } else { - ::std::cmp::min( - config.protocol.max_response_peers as usize, - peers_wanted as usize, - ) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::net::Ipv4Addr; - - use indexmap::IndexMap; - use quickcheck::{quickcheck, TestResult}; - use rand::thread_rng; - - use super::*; - - fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { - let ip_address = Ipv4Addr::from(i.to_be_bytes()); - let peer_id = PeerId([0; 20]); - - let key = PeerMapKey { - ip: ip_address, - peer_id, - }; - let value = Peer { - ip_address, - port: Port(1), - status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), - }; - - (key, value) - } - - #[test] - fn test_extract_response_peers() { - fn prop(data: (u16, u16)) -> TestResult { - let gen_num_peers = data.0 as u32; - let req_num_peers = data.1 as usize; - - let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); - - let mut opt_sender_key = None; - let mut opt_sender_peer = None; - - for i in 0..gen_num_peers { - let (key, value) = gen_peer_map_key_and_value((i << 16) + i); - - if i == 0 { - opt_sender_key = Some(key); - opt_sender_peer = Some(value.to_response_peer()); - } - - peer_map.insert(key, value); - } - - let mut rng = thread_rng(); - - let peers = extract_response_peers( - &mut rng, - &peer_map, - req_num_peers, - opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), - Peer::to_response_peer, - ); - - // Check that number of returned peers is correct - - let mut success = peers.len() <= req_num_peers; - - if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize - || peers.len() + 1 == gen_num_peers as usize; - } - - // Check that returned peers are unique (no overlap) and that sender - // isn't returned - - let mut ip_addresses = HashSet::with_capacity(peers.len()); - - for peer in peers { - if peer == opt_sender_peer.clone().unwrap() - || ip_addresses.contains(&peer.ip_address) - { - success = false; - - break; - } - - ip_addresses.insert(peer.ip_address); - } - - TestResult::from_bool(success) - } - - quickcheck(prop as fn((u16, u16)) -> TestResult); - } -} From f28808b30c8e00f964deae91a1f33f5b84ccc666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 23:23:27 +0200 Subject: [PATCH 03/21] aquatic_udp: add todo comments --- aquatic_udp/src/lib/glommio/handlers.rs | 3 ++- aquatic_udp/src/lib/glommio/network.rs | 8 +++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 771f59b..2dc9763 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -21,10 +21,11 @@ pub fn run_request_worker( let mut rng = SmallRng::from_entropy(); + // Need to be cleaned periodically: use timer? let mut torrents_ipv4 = TorrentMap::::default(); let mut torrents_ipv6 = TorrentMap::::default(); - // Needs to be updated periodically + // Needs to be updated periodically: use timer? let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); while let Some((request, addr)) = request_receiver.recv().await { diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 1a934e7..1feb540 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,9 +1,7 @@ /// TODO -/// - move connection checks to socket workers -/// - ignore scrape requests. forward announce requests to request workers -/// sharded by info hash (with some nice algo to make it difficult for an -/// attacker to know which one they get forwarded to). this way, shared -/// state can be avoided. +/// - forward announce requests to request workers sharded by info hash (with +/// some nice algo to make it difficult for an attacker to know which one +/// they get forwarded to) use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; From 4a8651e1c617bc276a7e20496b1db12115510831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 01:01:10 +0200 Subject: [PATCH 04/21] aquatic_udp: use mesh in glommio version --- aquatic_udp/src/lib/common/announce.rs | 2 +- aquatic_udp/src/lib/common/network.rs | 1 + aquatic_udp/src/lib/glommio/handlers.rs | 54 +++++++++++++------- aquatic_udp/src/lib/glommio/mod.rs | 32 ++++++++++++ aquatic_udp/src/lib/glommio/network.rs | 52 ++++++++++--------- aquatic_udp/src/lib/mio/handlers/announce.rs | 2 +- 6 files changed, 100 insertions(+), 43 deletions(-) diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/announce.rs index 4a71787..2a63b61 100644 --- a/aquatic_udp/src/lib/common/announce.rs +++ b/aquatic_udp/src/lib/common/announce.rs @@ -175,4 +175,4 @@ mod tests { quickcheck(prop as fn((u16, u16)) -> TestResult); } -} \ No newline at end of file +} diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index e69de29..8b13789 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -0,0 +1 @@ + diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 2dc9763..ad28f75 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,23 +1,26 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use futures_lite::stream::empty; +use futures_lite::StreamExt; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; use glommio::prelude::*; -use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; -use rand::SeedableRng; use rand::prelude::SmallRng; +use rand::SeedableRng; -use crate::config::Config; -use crate::common::*; use crate::common::announce::handle_announce_request; +use crate::common::*; +use crate::config::Config; pub fn run_request_worker( config: Config, - request_receiver: SharedReceiver<(AnnounceRequest, SocketAddr)>, - response_sender: SharedSender<(AnnounceResponse, SocketAddr)>, + request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, ) { LocalExecutorBuilder::default() .spawn(|| async move { - let request_receiver = request_receiver.connect().await; - let response_sender = response_sender.connect().await; + let (_, mut request_receivers) = + request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); let mut rng = SmallRng::from_entropy(); @@ -28,23 +31,38 @@ pub fn run_request_worker( // Needs to be updated periodically: use timer? let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - while let Some((request, addr)) = request_receiver.recv().await { + let mut stream = empty().boxed_local(); + + for (_, receiver) in request_receivers.streams() { + stream = Box::pin(stream.race(receiver)); + } + + while let Some((producer_index, request, addr)) = stream.next().await { let response = match addr.ip() { - IpAddr::V4(ip) => { - handle_announce_request(&config, &mut rng, &mut torrents_ipv4, request, ip, peer_valid_until) - }, - IpAddr::V6(ip) => { - handle_announce_request(&config, &mut rng, &mut torrents_ipv6, request, ip, peer_valid_until) - }, + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv6, + request, + ip, + peer_valid_until, + ), }; - if let Err(err) = response_sender.try_send((response, addr)) { + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { ::log::warn!("response_sender.try_send: {:?}", err); } } - }) .expect("failed to spawn local executor") .join() .unwrap(); -} \ No newline at end of file +} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 030b2ee..9381d7f 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,2 +1,34 @@ +use std::sync::{atomic::AtomicUsize, Arc}; + +use glommio::channels::channel_mesh::MeshBuilder; + +use crate::config::Config; + pub mod handlers; pub mod network; + +fn start_workers(config: Config) { + let num_peers = config.socket_workers + config.request_workers; + + let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); + let response_mesh_builder = MeshBuilder::partial(num_peers, 1024); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + for _ in 0..(config.socket_workers) { + network::run_socket_worker( + config.clone(), + request_mesh_builder.clone(), + response_mesh_builder.clone(), + num_bound_sockets.clone(), + ); + } + + for _ in 0..(config.request_workers) { + handlers::run_request_worker( + config.clone(), + request_mesh_builder.clone(), + response_mesh_builder.clone(), + ); + } +} diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 1feb540..a4c3688 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,7 +1,5 @@ /// TODO -/// - forward announce requests to request workers sharded by info hash (with -/// some nice algo to make it difficult for an attacker to know which one -/// they get forwarded to) +/// - Don't use race, use other means to receive from multiple channels use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -11,8 +9,8 @@ use std::sync::{ }; use futures_lite::StreamExt; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; -use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; use glommio::net::UdpSocket; use glommio::prelude::*; use rand::prelude::{Rng, SeedableRng, StdRng}; @@ -23,10 +21,9 @@ use crate::common::*; use crate::config::Config; pub fn run_socket_worker( - state: State, config: Config, - request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, - response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, + request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, num_bound_sockets: Arc, ) { LocalExecutorBuilder::default() @@ -45,15 +42,21 @@ pub fn run_socket_worker( num_bound_sockets.fetch_add(1, Ordering::SeqCst); + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + + let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + + let response_consumer_index = response_receivers.consumer_id().unwrap(); + spawn_local(read_requests( config.clone(), - state.access_list.clone(), - request_sender, + request_senders, + response_consumer_index, local_sender, socket.clone(), )) .await; - spawn_local(send_responses(response_receiver, local_receiver, socket)).await; + spawn_local(send_responses(response_receivers, local_receiver, socket)).await; }) .expect("failed to spawn local executor") .join() @@ -62,18 +65,17 @@ pub fn run_socket_worker( async fn read_requests( config: Config, - access_list: Arc, - request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, + request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>, + response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, ) { - let request_sender = request_sender.connect().await; - let mut rng = StdRng::from_entropy(); let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; + let access_list = AccessList::default(); let mut connections = ConnectionMap::default(); let mut buf = [0u8; 2048]; @@ -99,9 +101,13 @@ async fn read_requests( Ok(Request::Announce(request)) => { if connections.contains(request.connection_id, src) { if access_list.allows(access_list_mode, &request.info_hash.0) { - if let Err(err) = request_sender - .try_send((request, src)) - { + let request_consumer_index = + (request.info_hash.0[0] as usize) % config.request_workers; + + if let Err(err) = request_senders.try_send_to( + request_consumer_index, + (response_consumer_index, request, src), + ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } } else { @@ -155,18 +161,18 @@ async fn read_requests( } async fn send_responses( - response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, + mut response_receivers: Receivers<(AnnounceResponse, SocketAddr)>, local_receiver: LocalReceiver<(Response, SocketAddr)>, socket: Rc, ) { - let response_receiver = response_receiver.connect().await; - let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = Cursor::new(&mut buf[..]); - let mut stream = local_receiver - .stream() - .race(response_receiver.map(|(response, addr)| (response.into(), addr))); + let mut stream = local_receiver.stream().boxed_local(); + + for (_, receiver) in response_receivers.streams().into_iter() { + stream = Box::pin(stream.race(receiver.map(|(response, addr)| (response.into(), addr)))); + } while let Some((response, src)) = stream.next().await { buf.set_position(0); diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 43406f5..99e6a46 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -7,8 +7,8 @@ use rand::rngs::SmallRng; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; -use crate::common::*; use crate::common::announce::handle_announce_request; +use crate::common::*; use crate::config::Config; #[inline] From 3aebdfda8a6e3326228ea30e9a713efb24267a44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 01:40:06 +0200 Subject: [PATCH 05/21] aquatic_udp: more work on splitting mio and glommio logic --- aquatic_udp/src/lib/glommio/mod.rs | 4 +- aquatic_udp/src/lib/lib.rs | 114 +++-------------------- aquatic_udp/src/lib/mio/mod.rs | 120 +++++++++++++++++++++++++ aquatic_udp/src/lib/mio/network.rs | 2 +- aquatic_udp/src/lib/{ => mio}/tasks.rs | 14 --- 5 files changed, 134 insertions(+), 120 deletions(-) rename aquatic_udp/src/lib/{ => mio}/tasks.rs (84%) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 9381d7f..b33e784 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -7,7 +7,7 @@ use crate::config::Config; pub mod handlers; pub mod network; -fn start_workers(config: Config) { +pub fn run(config: Config) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); @@ -31,4 +31,6 @@ fn start_workers(config: Config) { response_mesh_builder.clone(), ); } + + Ok(()) } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index c1aa9e7..fa4d882 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,121 +1,27 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::thread::Builder; -use std::time::Duration; +use std::sync::Arc; -use anyhow::Context; -use crossbeam_channel::unbounded; -use privdrop::PrivDrop; +use aquatic_common::access_list::{AccessList, AccessListMode}; pub mod common; pub mod config; pub mod glommio; pub mod mio; -pub mod tasks; -use common::State; use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::default(); + mio::run(config) +} - tasks::update_access_list(&config, &state.access_list); - - let num_bound_sockets = start_workers(config.clone(), state.clone())?; - - if config.privileges.drop_privileges { - let mut counter = 0usize; - - loop { - let sockets = num_bound_sockets.load(Ordering::SeqCst); - - if sockets == config.socket_workers { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); +pub fn update_access_list(config: &Config, access_list: &Arc) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = access_list.update_from_path(&config.access_list.path) { + ::log::error!("Update access list from path: {:?}", err); } } - } - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - tasks::update_access_list(&config, &state.access_list); - - state.torrents.lock().clean(&config, &state.access_list); + AccessListMode::Off => {} } } - -pub fn start_workers(config: Config, state: State) -> ::anyhow::Result> { - let (request_sender, request_receiver) = unbounded(); - let (response_sender, response_receiver) = unbounded(); - - for i in 0..config.request_workers { - let state = state.clone(); - let config = config.clone(); - let request_receiver = request_receiver.clone(); - let response_sender = response_sender.clone(); - - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - mio::handlers::run_request_worker(state, config, request_receiver, response_sender) - }) - .with_context(|| "spawn request worker")?; - } - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - for i in 0..config.socket_workers { - let state = state.clone(); - let config = config.clone(); - let request_sender = request_sender.clone(); - let response_receiver = response_receiver.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - mio::network::run_socket_worker( - state, - config, - i, - request_sender, - response_receiver, - num_bound_sockets, - ) - }) - .with_context(|| "spawn socket worker")?; - } - - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); - - Builder::new() - .name("statistics-collector".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::gather_and_print_statistics(&state, &config); - }) - .with_context(|| "spawn statistics worker")?; - } - - Ok(num_bound_sockets) -} diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 030b2ee..a59f2ec 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -1,2 +1,122 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::thread::Builder; +use std::time::Duration; + +use anyhow::Context; +use crossbeam_channel::unbounded; +use privdrop::PrivDrop; + pub mod handlers; pub mod network; +pub mod tasks; + +use crate::common::State; +use crate::config::Config; +use crate::update_access_list; + +pub fn run(config: Config) -> ::anyhow::Result<()> { + let state = State::default(); + + update_access_list(&config, &state.access_list); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; + + if config.privileges.drop_privileges { + let mut counter = 0usize; + + loop { + let sockets = num_bound_sockets.load(Ordering::SeqCst); + + if sockets == config.socket_workers { + PrivDrop::default() + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) + .apply()?; + + break; + } + + ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } + } + } + + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + update_access_list(&config, &state.access_list); + + state.torrents.lock().clean(&config, &state.access_list); + } +} + +pub fn start_workers( + config: Config, + state: State, + num_bound_sockets: Arc, +) -> ::anyhow::Result<()> { + let (request_sender, request_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + + for i in 0..config.request_workers { + let state = state.clone(); + let config = config.clone(); + let request_receiver = request_receiver.clone(); + let response_sender = response_sender.clone(); + + Builder::new() + .name(format!("request-{:02}", i + 1)) + .spawn(move || { + handlers::run_request_worker(state, config, request_receiver, response_sender) + }) + .with_context(|| "spawn request worker")?; + } + + for i in 0..config.socket_workers { + let state = state.clone(); + let config = config.clone(); + let request_sender = request_sender.clone(); + let response_receiver = response_receiver.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + network::run_socket_worker( + state, + config, + i, + request_sender, + response_receiver, + num_bound_sockets, + ) + }) + .with_context(|| "spawn socket worker")?; + } + + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new() + .name("statistics-collector".to_string()) + .spawn(move || loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::gather_and_print_statistics(&state, &config); + }) + .with_context(|| "spawn statistics worker")?; + } + + Ok(()) +} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 6171244..7e79b0a 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -4,7 +4,7 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::vec::Drain; use crossbeam_channel::{Receiver, Sender}; diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/mio/tasks.rs similarity index 84% rename from aquatic_udp/src/lib/tasks.rs rename to aquatic_udp/src/lib/mio/tasks.rs index 2665c45..2fde39d 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/mio/tasks.rs @@ -1,24 +1,10 @@ use std::sync::atomic::Ordering; -use std::sync::Arc; use histogram::Histogram; -use aquatic_common::access_list::AccessListMode; - use crate::common::*; use crate::config::Config; -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} - } -} - pub fn gather_and_print_statistics(state: &State, config: &Config) { let interval = config.statistics.interval; From c6ba1bc61c450d192d98519cd4222c5b084674ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 01:53:47 +0200 Subject: [PATCH 06/21] aquatic_udp: glommio: start executors in mod.rs, update TODO --- aquatic_udp/src/lib/glommio/handlers.rs | 80 +++++++++++-------------- aquatic_udp/src/lib/glommio/mod.rs | 46 ++++++++++---- aquatic_udp/src/lib/glommio/network.rs | 51 +++++++--------- 3 files changed, 93 insertions(+), 84 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index ad28f75..c9b0316 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -3,7 +3,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use futures_lite::stream::empty; use futures_lite::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; -use glommio::prelude::*; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -11,58 +10,51 @@ use crate::common::announce::handle_announce_request; use crate::common::*; use crate::config::Config; -pub fn run_request_worker( +pub async fn run_request_worker( config: Config, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, ) { - LocalExecutorBuilder::default() - .spawn(|| async move { - let (_, mut request_receivers) = - request_mesh_builder.join(Role::Consumer).await.unwrap(); - let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let mut rng = SmallRng::from_entropy(); + let mut rng = SmallRng::from_entropy(); - // Need to be cleaned periodically: use timer? - let mut torrents_ipv4 = TorrentMap::::default(); - let mut torrents_ipv6 = TorrentMap::::default(); + // Need to be cleaned periodically: use timer? + let mut torrents_ipv4 = TorrentMap::::default(); + let mut torrents_ipv6 = TorrentMap::::default(); - // Needs to be updated periodically: use timer? - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + // Needs to be updated periodically: use timer? + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - let mut stream = empty().boxed_local(); + let mut stream = empty().boxed_local(); - for (_, receiver) in request_receivers.streams() { - stream = Box::pin(stream.race(receiver)); - } + for (_, receiver) in request_receivers.streams() { + stream = Box::pin(stream.race(receiver)); + } - while let Some((producer_index, request, addr)) = stream.next().await { - let response = match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents_ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents_ipv6, - request, - ip, - peer_valid_until, - ), - }; + while let Some((producer_index, request, addr)) = stream.next().await { + let response = match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv6, + request, + ip, + peer_valid_until, + ), + }; - if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { - ::log::warn!("response_sender.try_send: {:?}", err); - } - } - }) - .expect("failed to spawn local executor") - .join() - .unwrap(); + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + } } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index b33e784..89c6b9b 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; use glommio::channels::channel_mesh::MeshBuilder; +use glommio::prelude::*; use crate::config::Config; @@ -15,21 +16,44 @@ pub fn run(config: Config) -> anyhow::Result<()> { let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + let mut executors = Vec::new(); + for _ in 0..(config.socket_workers) { - network::run_socket_worker( - config.clone(), - request_mesh_builder.clone(), - response_mesh_builder.clone(), - num_bound_sockets.clone(), - ); + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + let executor = LocalExecutorBuilder::default().spawn(|| async move { + network::run_socket_worker( + config, + request_mesh_builder, + response_mesh_builder, + num_bound_sockets, + ) + .await + }); + + executors.push(executor); } for _ in 0..(config.request_workers) { - handlers::run_request_worker( - config.clone(), - request_mesh_builder.clone(), - response_mesh_builder.clone(), - ); + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + + let executor = LocalExecutorBuilder::default().spawn(|| async move { + handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await + }); + + executors.push(executor); + } + + for executor in executors { + executor + .expect("failed to spawn local executor") + .join() + .unwrap(); } Ok(()) diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index a4c3688..f685e8f 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,5 +1,3 @@ -/// TODO -/// - Don't use race, use other means to receive from multiple channels use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -20,47 +18,42 @@ use aquatic_udp_protocol::{IpVersion, Request, Response}; use crate::common::*; use crate::config::Config; -pub fn run_socket_worker( +pub async fn run_socket_worker( config: Config, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, num_bound_sockets: Arc, ) { - LocalExecutorBuilder::default() - .spawn(|| async move { - let (local_sender, local_receiver) = new_unbounded(); + let (local_sender, local_receiver) = new_unbounded(); - let mut socket = UdpSocket::bind(config.network.address).unwrap(); + let mut socket = UdpSocket::bind(config.network.address).unwrap(); - let recv_buffer_size = config.network.socket_recv_buffer_size; + let recv_buffer_size = config.network.socket_recv_buffer_size; - if recv_buffer_size != 0 { - socket.set_buffer_size(recv_buffer_size); - } + if recv_buffer_size != 0 { + socket.set_buffer_size(recv_buffer_size); + } - let socket = Rc::new(socket); + let socket = Rc::new(socket); - num_bound_sockets.fetch_add(1, Ordering::SeqCst); + num_bound_sockets.fetch_add(1, Ordering::SeqCst); - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); - let response_consumer_index = response_receivers.consumer_id().unwrap(); + let response_consumer_index = response_receivers.consumer_id().unwrap(); - spawn_local(read_requests( - config.clone(), - request_senders, - response_consumer_index, - local_sender, - socket.clone(), - )) - .await; - spawn_local(send_responses(response_receivers, local_receiver, socket)).await; - }) - .expect("failed to spawn local executor") - .join() - .unwrap(); + spawn_local(read_requests( + config.clone(), + request_senders, + response_consumer_index, + local_sender, + socket.clone(), + )) + .await; + + spawn_local(send_responses(response_receivers, local_receiver, socket)).await; } async fn read_requests( From 93907822f8e2dee166ce37a36f908cdcbf9c629f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 02:03:23 +0200 Subject: [PATCH 07/21] aquatic_udp: use StreamExt::or when receiving from channels --- aquatic_udp/src/lib/glommio/handlers.rs | 2 +- aquatic_udp/src/lib/glommio/network.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index c9b0316..3d52f93 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -30,7 +30,7 @@ pub async fn run_request_worker( let mut stream = empty().boxed_local(); for (_, receiver) in request_receivers.streams() { - stream = Box::pin(stream.race(receiver)); + stream = Box::pin(stream.or(receiver)); } while let Some((producer_index, request, addr)) = stream.next().await { diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index f685e8f..daa9a2d 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -68,7 +68,9 @@ async fn read_requests( let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; + // Needs to be updated periodically: use timer? let access_list = AccessList::default(); + // Needs to be cleaned periodically: use timer? let mut connections = ConnectionMap::default(); let mut buf = [0u8; 2048]; @@ -164,7 +166,7 @@ async fn send_responses( let mut stream = local_receiver.stream().boxed_local(); for (_, receiver) in response_receivers.streams().into_iter() { - stream = Box::pin(stream.race(receiver.map(|(response, addr)| (response.into(), addr)))); + stream = Box::pin(stream.or(receiver.map(|(response, addr)| (response.into(), addr)))); } while let Some((response, src)) = stream.next().await { From 9dc615a8f923ea92ef5f7e0f277122d340b1f1da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 02:11:27 +0200 Subject: [PATCH 08/21] aquatic_udp: glommio: add yield_if_needed(), add comment --- aquatic_udp/src/lib/glommio/handlers.rs | 3 +++ aquatic_udp/src/lib/glommio/network.rs | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 3d52f93..d7b5220 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -3,6 +3,7 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use futures_lite::stream::empty; use futures_lite::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; +use glommio::prelude::*; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -56,5 +57,7 @@ pub async fn run_request_worker( if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { ::log::warn!("response_sender.try_send: {:?}", err); } + + yield_if_needed().await; } } diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index daa9a2d..834d07a 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -65,9 +65,10 @@ async fn read_requests( ) { let mut rng = StdRng::from_entropy(); - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; + // Needs to be updated periodically: use timer? + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); // Needs to be updated periodically: use timer? let access_list = AccessList::default(); // Needs to be cleaned periodically: use timer? From 63c03a95d5c9848c048bf45c3ee6e07bdc0d5b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 02:26:54 +0200 Subject: [PATCH 09/21] Update TODO --- TODO.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/TODO.md b/TODO.md index ea65cc2..2a5d962 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,15 @@ # TODO +* aquatic_udp glommio + * use fairer stream receiver scheduling than StreamExt::or, like running a + task for each stream - but then what about access to the TorrentMaps? + * update access lists + * clean connections + * clean torrents (using access list) + * update peer valid until + * privdrop + * a lot of "common code" is only used in mio implementation + * access lists: * use arc-swap Cache * add CI tests From 68b2bdd4a6e8267c9bfa1c7124eb9276cfc85deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 10:51:49 +0200 Subject: [PATCH 10/21] aquatic_udp: do task-per-receiver in glommio handlers --- aquatic_udp/src/lib/glommio/handlers.rs | 43 +++++++++++++++++-------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index d7b5220..14e0e16 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,8 +1,9 @@ +use std::cell::RefCell; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::rc::Rc; -use futures_lite::stream::empty; -use futures_lite::StreamExt; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::prelude::*; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -19,27 +20,41 @@ pub async fn run_request_worker( let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let mut rng = SmallRng::from_entropy(); + let response_senders = Rc::new(response_senders); // Need to be cleaned periodically: use timer? - let mut torrents_ipv4 = TorrentMap::::default(); - let mut torrents_ipv6 = TorrentMap::::default(); + let torrents_ipv4 = Rc::new(RefCell::new(TorrentMap::::default())); + let torrents_ipv6 = Rc::new(RefCell::new(TorrentMap::::default())); + + for (_, receiver) in request_receivers.streams() { + handle_request_stream( + &config, + torrents_ipv4.clone(), + torrents_ipv6.clone(), + response_senders.clone(), + receiver + ).await; + } +} + +async fn handle_request_stream( + config: &Config, + torrents_ipv4: Rc>>, + torrents_ipv6: Rc>>, + response_senders: Rc>, + mut stream: S, +) where S: Stream + ::std::marker::Unpin { + let mut rng = SmallRng::from_entropy(); // Needs to be updated periodically: use timer? let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - let mut stream = empty().boxed_local(); - - for (_, receiver) in request_receivers.streams() { - stream = Box::pin(stream.or(receiver)); - } - while let Some((producer_index, request, addr)) = stream.next().await { let response = match addr.ip() { IpAddr::V4(ip) => handle_announce_request( &config, &mut rng, - &mut torrents_ipv4, + &mut torrents_ipv4.borrow_mut(), request, ip, peer_valid_until, @@ -47,7 +62,7 @@ pub async fn run_request_worker( IpAddr::V6(ip) => handle_announce_request( &config, &mut rng, - &mut torrents_ipv6, + &mut torrents_ipv6.borrow_mut(), request, ip, peer_valid_until, From b5aa07c21f4660e7dac2dcf321fa9017564d53a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 11:20:59 +0200 Subject: [PATCH 11/21] aquatic_udp: glommio networking: use task-per-receiver --- aquatic_udp/src/lib/glommio/handlers.rs | 9 ++++--- aquatic_udp/src/lib/glommio/network.rs | 33 +++++++++++++------------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 14e0e16..0fff931 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -32,8 +32,9 @@ pub async fn run_request_worker( torrents_ipv4.clone(), torrents_ipv6.clone(), response_senders.clone(), - receiver - ).await; + receiver, + ) + .await; } } @@ -43,7 +44,9 @@ async fn handle_request_stream( torrents_ipv6: Rc>>, response_senders: Rc>, mut stream: S, -) where S: Stream + ::std::marker::Unpin { +) where + S: Stream + ::std::marker::Unpin, +{ let mut rng = SmallRng::from_entropy(); // Needs to be updated periodically: use timer? diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 834d07a..427ec61 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -6,9 +6,9 @@ use std::sync::{ Arc, }; -use futures_lite::StreamExt; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; -use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::channels::local_channel::{new_unbounded, LocalSender}; use glommio::net::UdpSocket; use glommio::prelude::*; use rand::prelude::{Rng, SeedableRng, StdRng}; @@ -40,7 +40,7 @@ pub async fn run_socket_worker( let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); let response_consumer_index = response_receivers.consumer_id().unwrap(); @@ -53,7 +53,15 @@ pub async fn run_socket_worker( )) .await; - spawn_local(send_responses(response_receivers, local_receiver, socket)).await; + for (_, receiver) in response_receivers.streams().into_iter() { + spawn_local(send_responses( + socket.clone(), + receiver.map(|(response, addr)| (response.into(), addr)), + )) + .await; + } + + send_responses(socket, local_receiver.stream()).await; } async fn read_requests( @@ -156,20 +164,13 @@ async fn read_requests( } } -async fn send_responses( - mut response_receivers: Receivers<(AnnounceResponse, SocketAddr)>, - local_receiver: LocalReceiver<(Response, SocketAddr)>, - socket: Rc, -) { +async fn send_responses(socket: Rc, mut stream: S) +where + S: Stream + ::std::marker::Unpin, +{ let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = Cursor::new(&mut buf[..]); - let mut stream = local_receiver.stream().boxed_local(); - - for (_, receiver) in response_receivers.streams().into_iter() { - stream = Box::pin(stream.or(receiver.map(|(response, addr)| (response.into(), addr)))); - } - while let Some((response, src)) = stream.next().await { buf.set_position(0); From cad3618fad5aefa14678cc2c38c5e9e4584188b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 11:54:00 +0200 Subject: [PATCH 12/21] aquatic_udp: glommio: start work on periodic cleaning --- aquatic_udp/src/lib/glommio/handlers.rs | 33 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 0fff931..9560920 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,10 +1,12 @@ use std::cell::RefCell; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; +use std::time::Duration; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::prelude::*; +use glommio::{enclose, prelude::*}; +use glommio::timer::TimerActionRepeat; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -22,15 +24,25 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); - // Need to be cleaned periodically: use timer? - let torrents_ipv4 = Rc::new(RefCell::new(TorrentMap::::default())); - let torrents_ipv6 = Rc::new(RefCell::new(TorrentMap::::default())); + let torrents= Rc::new(RefCell::new(TorrentMaps::default())); + + async fn clean( + config: Config, + torrents: Rc>, + ) -> Option { + torrents.borrow_mut(); // .clean(config, access_list); + + Some(Duration::from_secs(config.cleaning.interval)) + } + + TimerActionRepeat::repeat(enclose!((config, torrents) move || { + clean(config.clone(), torrents.clone()) + })); for (_, receiver) in request_receivers.streams() { handle_request_stream( &config, - torrents_ipv4.clone(), - torrents_ipv6.clone(), + torrents.clone(), response_senders.clone(), receiver, ) @@ -40,8 +52,7 @@ pub async fn run_request_worker( async fn handle_request_stream( config: &Config, - torrents_ipv4: Rc>>, - torrents_ipv6: Rc>>, + torrents: Rc>, response_senders: Rc>, mut stream: S, ) where @@ -57,7 +68,7 @@ async fn handle_request_stream( IpAddr::V4(ip) => handle_announce_request( &config, &mut rng, - &mut torrents_ipv4.borrow_mut(), + &mut torrents.borrow_mut().ipv4, request, ip, peer_valid_until, @@ -65,7 +76,7 @@ async fn handle_request_stream( IpAddr::V6(ip) => handle_announce_request( &config, &mut rng, - &mut torrents_ipv6.borrow_mut(), + &mut torrents.borrow_mut().ipv6, request, ip, peer_valid_until, From 38617c70f49d04543c1cf66a1fe822e2772d2608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 22:51:05 +0200 Subject: [PATCH 13/21] Refactor AccessList; update it periodically in aquatic_udp glommio --- Cargo.lock | 1 + aquatic_common/src/access_list.rs | 61 +++++++++++++++++-------- aquatic_http/src/lib/common.rs | 4 +- aquatic_http/src/lib/lib.rs | 5 +- aquatic_http/src/lib/network/mod.rs | 1 + aquatic_http/src/lib/tasks.rs | 2 +- aquatic_udp/Cargo.toml | 1 + aquatic_udp/src/lib/common/mod.rs | 39 ++++------------ aquatic_udp/src/lib/common/network.rs | 29 ++++++++++++ aquatic_udp/src/lib/glommio/handlers.rs | 40 +++++++++++----- aquatic_udp/src/lib/glommio/network.rs | 1 + aquatic_udp/src/lib/lib.rs | 4 +- aquatic_udp/src/lib/mio/mod.rs | 16 +++++-- aquatic_udp/src/lib/mio/network.rs | 2 + aquatic_ws/src/lib/common.rs | 4 +- aquatic_ws/src/lib/lib.rs | 5 +- aquatic_ws/src/lib/network/mod.rs | 1 + aquatic_ws/src/lib/tasks.rs | 2 +- 18 files changed, 141 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1814d40..1baa87a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,7 @@ dependencies = [ "futures-lite", "glommio", "hashbrown 0.11.2", + "hex", "histogram", "indexmap", "log", diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 93b8373..28e24e3 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -18,6 +18,12 @@ pub enum AccessListMode { Off, } +impl AccessListMode { + pub fn is_on(&self) -> bool { + !matches!(self, Self::Off) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccessListConfig { pub mode: AccessListMode, @@ -36,54 +42,71 @@ impl Default for AccessListConfig { } } -pub struct AccessList(ArcSwap>); +#[derive(Default)] +pub struct AccessList(HashSet<[u8; 20]>); -impl Default for AccessList { - fn default() -> Self { - Self(ArcSwap::from(Arc::new(HashSet::default()))) +impl AccessList { + pub fn insert_from_line(&mut self, line: &str) -> anyhow::Result<()> { + self.0.insert(parse_info_hash(line)?); + + Ok(()) + } + pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool { + match mode { + AccessListMode::White => self.0.contains(info_hash), + AccessListMode::Black => !self.0.contains(info_hash), + AccessListMode::Off => true, + } } } -impl AccessList { - fn parse_info_hash(line: String) -> anyhow::Result<[u8; 20]> { - let mut bytes = [0u8; 20]; +pub trait AccessListQuery { + fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>; + fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; +} - hex::decode_to_slice(line, &mut bytes)?; +pub type AccessListArcSwap = ArcSwap; - Ok(bytes) - } - - pub fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { +impl AccessListQuery for AccessListArcSwap { + fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { let file = File::open(path)?; let reader = BufReader::new(file); let mut new_list = HashSet::new(); for line in reader.lines() { - new_list.insert(Self::parse_info_hash(line?)?); + new_list.insert(parse_info_hash(&line?)?); } - self.0.store(Arc::new(new_list)); + self.store(Arc::new(AccessList(new_list))); Ok(()) } - pub fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { - match list_mode { - AccessListMode::White => self.0.load().contains(info_hash_bytes), - AccessListMode::Black => !self.0.load().contains(info_hash_bytes), + fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { + match mode { + AccessListMode::White => self.load().0.contains(info_hash_bytes), + AccessListMode::Black => !self.load().0.contains(info_hash_bytes), AccessListMode::Off => true, } } } +fn parse_info_hash(line: &str) -> anyhow::Result<[u8; 20]> { + let mut bytes = [0u8; 20]; + + hex::decode_to_slice(line, &mut bytes)?; + + Ok(bytes) +} + #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_info_hash() { - let f = AccessList::parse_info_hash; + let f = parse_info_hash; assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok()); assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err()); diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index e36e390..e4a9832 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{AccessList, AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; use either::Either; use hashbrown::HashMap; @@ -165,7 +165,7 @@ impl TorrentMaps { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrent_maps: Arc>, } diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 2662b2b..3f96ef2 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -31,7 +31,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { tasks::update_access_list(&config, &state); - state.torrent_maps.lock().clean(&config, &state.access_list); + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); } } diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index c14ed92..dd7d889 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use aquatic_http_protocol::request::Request; use hashbrown::HashMap; use log::{debug, error, info}; diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs index 3273ebd..341a434 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/tasks.rs @@ -1,6 +1,6 @@ use histogram::Histogram; -use aquatic_common::access_list::AccessListMode; +use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use crate::{common::*, config::Config}; diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 8eb57c0..a1bd561 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -21,6 +21,7 @@ aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" crossbeam-channel = "0.5" hashbrown = "0.11.2" +hex = "0.4" histogram = "0.6" indexmap = "1" log = "0.4" diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 5662e1f..d42951a 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,8 +1,10 @@ +use std::borrow::Borrow; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; +use aquatic_common::access_list::AccessListArcSwap; use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; @@ -127,19 +129,19 @@ pub struct TorrentMaps { impl TorrentMaps { /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean>(&mut self, config: &Config, access_list: T) { let now = Instant::now(); let access_list_mode = config.access_list.mode; self.ipv4.retain(|info_hash, torrent| { - access_list.allows(access_list_mode, &info_hash.0) + access_list.borrow().allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv4.shrink_to_fit(); self.ipv6.retain(|info_hash, torrent| { - access_list.allows(access_list_mode, &info_hash.0) + access_list.borrow().allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv6.shrink_to_fit(); @@ -183,7 +185,7 @@ pub struct Statistics { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrents: Arc>, pub statistics: Arc, } @@ -191,38 +193,13 @@ pub struct State { impl Default for State { fn default() -> Self { Self { - access_list: Arc::new(AccessList::default()), + access_list: Arc::new(AccessListArcSwap::default()), torrents: Arc::new(Mutex::new(TorrentMaps::default())), statistics: Arc::new(Statistics::default()), } } } -#[derive(Default)] -pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); - -impl ConnectionMap { - pub fn insert( - &mut self, - connection_id: ConnectionId, - socket_addr: SocketAddr, - valid_until: ValidUntil, - ) { - self.0.insert((connection_id, socket_addr), valid_until); - } - - pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { - self.0.contains_key(&(connection_id, socket_addr)) - } - - pub fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.0 > now); - self.0.shrink_to_fit(); - } -} - #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index 8b13789..833c99f 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -1 +1,30 @@ +use std::{net::SocketAddr, time::Instant}; +pub use aquatic_common::{access_list::AccessList, ValidUntil}; +pub use aquatic_udp_protocol::*; +use hashbrown::HashMap; + +#[derive(Default)] +pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); + +impl ConnectionMap { + pub fn insert( + &mut self, + connection_id: ConnectionId, + socket_addr: SocketAddr, + valid_until: ValidUntil, + ) { + self.0.insert((connection_id, socket_addr), valid_until); + } + + pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + self.0.contains_key(&(connection_id, socket_addr)) + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0 > now); + self.0.shrink_to_fit(); + } +} diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 9560920..a6770a6 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -3,10 +3,11 @@ use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; use std::time::Duration; -use futures_lite::{Stream, StreamExt}; +use futures_lite::{AsyncBufReadExt, Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::{enclose, prelude::*}; +use glommio::io::{BufferedFile, StreamReaderBuilder}; use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -24,19 +25,34 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); - let torrents= Rc::new(RefCell::new(TorrentMaps::default())); + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let access_list = Rc::new(RefCell::new(AccessList::default())); - async fn clean( - config: Config, - torrents: Rc>, - ) -> Option { - torrents.borrow_mut(); // .clean(config, access_list); + TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { + enclose!((config, torrents, access_list) move || async move { + if config.access_list.mode.is_on(){ + let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); - Some(Duration::from_secs(config.cleaning.interval)) - } + let mut reader = StreamReaderBuilder::new(access_list_file).build(); - TimerActionRepeat::repeat(enclose!((config, torrents) move || { - clean(config.clone(), torrents.clone()) + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + access_list.borrow_mut().insert_from_line(&buf).unwrap() // FIXME + }, + Err(err) => { + + } + } + } + } + + torrents.borrow_mut().clean(&config, &*access_list.borrow()); + + Some(Duration::from_secs(config.cleaning.interval)) + })() })); for (_, receiver) in request_receivers.streams() { diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 427ec61..9f007b0 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -15,6 +15,7 @@ use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index fa4d882..134427c 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use aquatic_common::access_list::{AccessList, AccessListMode}; +use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; pub mod common; pub mod config; @@ -15,7 +15,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { mio::run(config) } -pub fn update_access_list(config: &Config, access_list: &Arc) { +pub fn update_access_list(config: &Config, access_list: &Arc) { match config.access_list.mode { AccessListMode::White | AccessListMode::Black => { if let Err(err) = access_list.update_from_path(&config.access_list.path) { diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index a59f2ec..f75ce9f 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -1,9 +1,12 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; use std::thread::Builder; use std::time::Duration; +use std::{ + ops::Deref, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use anyhow::Context; use crossbeam_channel::unbounded; @@ -56,7 +59,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config, &state.access_list); - state.torrents.lock().clean(&config, &state.access_list); + state + .torrents + .lock() + .clean(&config, state.access_list.load_full().deref()); } } diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 7e79b0a..e755fdd 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -7,6 +7,7 @@ use std::sync::{ use std::time::Duration; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use crossbeam_channel::{Receiver, Sender}; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -15,6 +16,7 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 4f30d8b..3d6b1df 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{AccessList, AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -136,7 +136,7 @@ impl TorrentMaps { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrent_maps: Arc>, } diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index f6ee599..6141466 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -33,7 +33,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { tasks::update_access_list(&config, &state); - state.torrent_maps.lock().clean(&config, &state.access_list); + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); } } diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index f12859d..bbb293d 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -2,6 +2,7 @@ use std::io::ErrorKind; use std::time::Duration; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use crossbeam_channel::Receiver; use hashbrown::HashMap; use log::{debug, error, info}; diff --git a/aquatic_ws/src/lib/tasks.rs b/aquatic_ws/src/lib/tasks.rs index 55cb62d..f96426c 100644 --- a/aquatic_ws/src/lib/tasks.rs +++ b/aquatic_ws/src/lib/tasks.rs @@ -1,4 +1,4 @@ -use aquatic_common::access_list::AccessListMode; +use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use histogram::Histogram; use crate::common::*; From bfcdc1c842353d4ffe74e9269984e5abc85e535a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 22:58:59 +0200 Subject: [PATCH 14/21] aquatic_udp glommio: improve AccessList update code --- aquatic_udp/src/lib/glommio/handlers.rs | 46 +++++++++++++++---------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index a6770a6..6b341b9 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -28,26 +28,10 @@ pub async fn run_request_worker( let torrents = Rc::new(RefCell::new(TorrentMaps::default())); let access_list = Rc::new(RefCell::new(AccessList::default())); + // Periodically clean torrents and update access list TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - if config.access_list.mode.is_on(){ - let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); - - let mut reader = StreamReaderBuilder::new(access_list_file).build(); - - loop { - let mut buf = String::with_capacity(42); - - match reader.read_line(&mut buf).await { - Ok(_) => { - access_list.borrow_mut().insert_from_line(&buf).unwrap() // FIXME - }, - Err(err) => { - - } - } - } - } + update_access_list(config.clone(), access_list.clone()).await; torrents.borrow_mut().clean(&config, &*access_list.borrow()); @@ -106,3 +90,29 @@ async fn handle_request_stream( yield_if_needed().await; } } + +pub async fn update_access_list( + config: Config, + access_list: Rc>, +){ + if config.access_list.mode.is_on(){ + let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); + + let mut reader = StreamReaderBuilder::new(access_list_file).build(); + + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + access_list.borrow_mut().insert_from_line(&buf); + }, + Err(err) => { + break; + } + } + + yield_if_needed().await; + } + } +} From 02889e1bb8b2262ef0174b72d402ee0ef372d25c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 23:04:51 +0200 Subject: [PATCH 15/21] aquatic_udp: glommio: move update_access_list to new common.rs --- aquatic_udp/src/lib/glommio/common.rs | 35 +++++++++++++++++++++++++ aquatic_udp/src/lib/glommio/handlers.rs | 29 ++------------------ aquatic_udp/src/lib/glommio/mod.rs | 1 + 3 files changed, 38 insertions(+), 27 deletions(-) create mode 100644 aquatic_udp/src/lib/glommio/common.rs diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs new file mode 100644 index 0000000..c18ac51 --- /dev/null +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -0,0 +1,35 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use futures_lite::{AsyncBufReadExt}; +use glommio::io::{BufferedFile, StreamReaderBuilder}; +use glommio::{prelude::*}; + +use crate::common::*; +use crate::config::Config; + +pub async fn update_access_list( + config: Config, + access_list: Rc>, +){ + if config.access_list.mode.is_on(){ + let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); + + let mut reader = StreamReaderBuilder::new(access_list_file).build(); + + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + access_list.borrow_mut().insert_from_line(&buf); + }, + Err(err) => { + break; + } + } + + yield_if_needed().await; + } + } +} diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 6b341b9..3a308b1 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -3,9 +3,8 @@ use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; use std::time::Duration; -use futures_lite::{AsyncBufReadExt, Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::io::{BufferedFile, StreamReaderBuilder}; use glommio::timer::TimerActionRepeat; use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; @@ -14,6 +13,7 @@ use rand::SeedableRng; use crate::common::announce::handle_announce_request; use crate::common::*; use crate::config::Config; +use crate::glommio::common::update_access_list; pub async fn run_request_worker( config: Config, @@ -91,28 +91,3 @@ async fn handle_request_stream( } } -pub async fn update_access_list( - config: Config, - access_list: Rc>, -){ - if config.access_list.mode.is_on(){ - let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); - - let mut reader = StreamReaderBuilder::new(access_list_file).build(); - - loop { - let mut buf = String::with_capacity(42); - - match reader.read_line(&mut buf).await { - Ok(_) => { - access_list.borrow_mut().insert_from_line(&buf); - }, - Err(err) => { - break; - } - } - - yield_if_needed().await; - } - } -} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 89c6b9b..31fd483 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -5,6 +5,7 @@ use glommio::prelude::*; use crate::config::Config; +mod common; pub mod handlers; pub mod network; From f9ed98f10d4dafaf928b0d00e5597a0e0c3e2a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 23:09:54 +0200 Subject: [PATCH 16/21] aquatic_udp: run rustfmt --- aquatic_udp/src/lib/glommio/common.rs | 13 +++++-------- aquatic_udp/src/lib/glommio/handlers.rs | 1 - 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index c18ac51..5dd6eeb 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -1,18 +1,15 @@ use std::cell::RefCell; use std::rc::Rc; -use futures_lite::{AsyncBufReadExt}; +use futures_lite::AsyncBufReadExt; use glommio::io::{BufferedFile, StreamReaderBuilder}; -use glommio::{prelude::*}; +use glommio::prelude::*; use crate::common::*; use crate::config::Config; -pub async fn update_access_list( - config: Config, - access_list: Rc>, -){ - if config.access_list.mode.is_on(){ +pub async fn update_access_list(config: Config, access_list: Rc>) { + if config.access_list.mode.is_on() { let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); let mut reader = StreamReaderBuilder::new(access_list_file).build(); @@ -23,7 +20,7 @@ pub async fn update_access_list( match reader.read_line(&mut buf).await { Ok(_) => { access_list.borrow_mut().insert_from_line(&buf); - }, + } Err(err) => { break; } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 3a308b1..397faf3 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -90,4 +90,3 @@ async fn handle_request_stream( yield_if_needed().await; } } - From a853a410b1af1828317b67f8d3c1ff9857f28e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 23:10:54 +0200 Subject: [PATCH 17/21] Update TODO --- TODO.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/TODO.md b/TODO.md index 2a5d962..f0a6971 100644 --- a/TODO.md +++ b/TODO.md @@ -1,11 +1,8 @@ # TODO * aquatic_udp glommio - * use fairer stream receiver scheduling than StreamExt::or, like running a - task for each stream - but then what about access to the TorrentMaps? * update access lists * clean connections - * clean torrents (using access list) * update peer valid until * privdrop * a lot of "common code" is only used in mio implementation From 2cc357f2f2330f08d9681dbec75e2685e60d0460 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 20 Oct 2021 01:21:07 +0200 Subject: [PATCH 18/21] aquatic_udp: glommio: spawn request handlers as tasks --- aquatic_udp/src/lib/glommio/handlers.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 397faf3..acb2ff5 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -40,18 +40,17 @@ pub async fn run_request_worker( })); for (_, receiver) in request_receivers.streams() { - handle_request_stream( - &config, + spawn_local(handle_request_stream( + config.clone(), torrents.clone(), response_senders.clone(), receiver, - ) - .await; + )).await; } } async fn handle_request_stream( - config: &Config, + config: Config, torrents: Rc>, response_senders: Rc>, mut stream: S, From 047d138b2bec9c5cd617f1a1964931b74b6491a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 20 Oct 2021 02:06:51 +0200 Subject: [PATCH 19/21] aquatic_udp glommio: detach tasks, await them later; add debug logging --- aquatic_udp/src/lib/glommio/handlers.rs | 14 ++++++++++++-- aquatic_udp/src/lib/glommio/mod.rs | 6 ++++-- aquatic_udp/src/lib/glommio/network.rs | 18 ++++++++++-------- aquatic_udp/src/lib/lib.rs | 2 +- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index acb2ff5..12db332 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -39,13 +39,21 @@ pub async fn run_request_worker( })() })); + let mut handles = Vec::new(); + for (_, receiver) in request_receivers.streams() { - spawn_local(handle_request_stream( + let handle = spawn_local(handle_request_stream( config.clone(), torrents.clone(), response_senders.clone(), receiver, - )).await; + )).detach(); + + handles.push(handle); + } + + for handle in handles { + handle.await; } } @@ -82,6 +90,8 @@ async fn handle_request_stream( ), }; + ::log::debug!("preparing to send response to channel: {:?}", response); + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { ::log::warn!("response_sender.try_send: {:?}", err); } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 31fd483..59f37d1 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -9,11 +9,13 @@ mod common; pub mod handlers; pub mod network; +pub const SHARED_CHANNEL_SIZE: usize = 4096; + pub fn run(config: Config) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; - let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); - let response_mesh_builder = MeshBuilder::partial(num_peers, 1024); + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 9f007b0..48840b6 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -51,15 +51,13 @@ pub async fn run_socket_worker( response_consumer_index, local_sender, socket.clone(), - )) - .await; + )).detach(); for (_, receiver) in response_receivers.streams().into_iter() { spawn_local(send_responses( socket.clone(), receiver.map(|(response, addr)| (response.into(), addr)), - )) - .await; + )).detach(); } send_responses(socket, local_receiver.stream()).await; @@ -90,6 +88,8 @@ async fn read_requests( Ok((amt, src)) => { let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); + ::log::debug!("read request: {:?}", request); + match request { Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); @@ -101,7 +101,7 @@ async fn read_requests( transaction_id: request.transaction_id, }); - local_sender.try_send((response, src)); + local_sender.try_send((response, src)).unwrap(); } Ok(Request::Announce(request)) => { if connections.contains(request.connection_id, src) { @@ -121,7 +121,7 @@ async fn read_requests( message: "Info hash not allowed".into(), }); - local_sender.try_send((response, src)); + local_sender.try_send((response, src)).unwrap(); } } } @@ -132,7 +132,7 @@ async fn read_requests( message: "Scrape requests not supported".into(), }); - local_sender.try_send((response, src)); + local_sender.try_send((response, src)).unwrap(); } } Err(err) => { @@ -150,7 +150,7 @@ async fn read_requests( message: err.right_or("Parse error").into(), }; - local_sender.try_send((response.into(), src)); + local_sender.try_send((response.into(), src)).unwrap(); } } } @@ -174,6 +174,8 @@ where while let Some((response, src)) = stream.next().await { buf.set_position(0); + + ::log::debug!("preparing to send response: {:?}", response.clone()); response .write(&mut buf, ip_version_from_ip(src.ip())) diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 134427c..41003fc 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -12,7 +12,7 @@ use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - mio::run(config) + glommio::run(config) } pub fn update_access_list(config: &Config, access_list: &Arc) { From 81b7777a4a5661002468e8c0835f4b23ca944999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 21 Oct 2021 15:26:16 +0200 Subject: [PATCH 20/21] aquatic_udp: feature-gate glommio version --- Cargo.lock | 3 ++- aquatic_udp/Cargo.toml | 8 ++++++-- aquatic_udp/src/lib/lib.rs | 9 ++++++++- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1baa87a..1c49d63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,6 +159,7 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_common", "aquatic_udp_protocol", + "cfg-if", "crossbeam-channel", "futures-lite", "glommio", @@ -796,7 +797,7 @@ checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" [[package]] name = "glommio" version = "0.6.0" -source = "git+https://github.com/DataDog/glommio.git#4e6b14772da2f4325271fbcf12d24cf91ed466e5" +source = "git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5#4e6b14772da2f4325271fbcf12d24cf91ed466e5" dependencies = [ "ahash 0.7.6", "bitflags", diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a1bd561..a79294c 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -14,11 +14,15 @@ path = "src/lib/lib.rs" [[bin]] name = "aquatic_udp" +[features] +with-glommio = ["glommio", "futures-lite"] + [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" +cfg-if = "1" crossbeam-channel = "0.5" hashbrown = "0.11.2" hex = "0.4" @@ -33,8 +37,8 @@ rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4.1", features = ["all"] } -glommio = { git = "https://github.com/DataDog/glommio.git" } -futures-lite = "1" +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } +futures-lite = { version = "1", optional = true } [dev-dependencies] quickcheck = "1.0" diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 41003fc..3ea2bba 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -4,6 +4,7 @@ use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQ pub mod common; pub mod config; +#[cfg(all(feature = "with-glommio", target_os = "linux"))] pub mod glommio; pub mod mio; @@ -12,7 +13,13 @@ use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - glommio::run(config) + cfg_if::cfg_if! { + if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { + glommio::run(config) + } else { + mio::run(config) + } + } } pub fn update_access_list(config: &Config, access_list: &Arc) { From b10f7b89e769c1cea0d19385a15180ae17b4b134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 21 Oct 2021 15:35:21 +0200 Subject: [PATCH 21/21] aquatic_udp: move code only use in mio impl out of crate::common --- TODO.md | 1 - aquatic_udp/src/lib/common/mod.rs | 47 ------------------- aquatic_udp/src/lib/glommio/handlers.rs | 3 +- aquatic_udp/src/lib/glommio/mod.rs | 5 ++ aquatic_udp/src/lib/glommio/network.rs | 8 ++-- aquatic_udp/src/lib/lib.rs | 17 +------ aquatic_udp/src/lib/mio/common.rs | 49 ++++++++++++++++++++ aquatic_udp/src/lib/mio/handlers/announce.rs | 1 + aquatic_udp/src/lib/mio/handlers/mod.rs | 2 +- aquatic_udp/src/lib/mio/handlers/scrape.rs | 2 + aquatic_udp/src/lib/mio/mod.rs | 18 ++++++- aquatic_udp/src/lib/mio/network.rs | 2 + aquatic_udp/src/lib/mio/tasks.rs | 2 +- aquatic_udp_bench/src/announce.rs | 1 + aquatic_udp_bench/src/main.rs | 1 + aquatic_udp_bench/src/scrape.rs | 1 + 16 files changed, 89 insertions(+), 71 deletions(-) create mode 100644 aquatic_udp/src/lib/mio/common.rs diff --git a/TODO.md b/TODO.md index f0a6971..b290b67 100644 --- a/TODO.md +++ b/TODO.md @@ -5,7 +5,6 @@ * clean connections * update peer valid until * privdrop - * a lot of "common code" is only used in mio implementation * access lists: * use arc-swap Cache diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index d42951a..8c84c3c 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,13 +1,10 @@ use std::borrow::Borrow; use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; -use aquatic_common::access_list::AccessListArcSwap; use hashbrown::HashMap; use indexmap::IndexMap; -use parking_lot::Mutex; pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_udp_protocol::*; @@ -35,25 +32,6 @@ impl Ip for Ipv6Addr { } } -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(ScrapeRequest), -} - -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape(ScrapeResponse), -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), - } - } -} - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, @@ -175,31 +153,6 @@ impl TorrentMaps { } } -#[derive(Default)] -pub struct Statistics { - pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, - pub bytes_received: AtomicUsize, - pub bytes_sent: AtomicUsize, -} - -#[derive(Clone)] -pub struct State { - pub access_list: Arc, - pub torrents: Arc>, - pub statistics: Arc, -} - -impl Default for State { - fn default() -> Self { - Self { - access_list: Arc::new(AccessListArcSwap::default()), - torrents: Arc::new(Mutex::new(TorrentMaps::default())), - statistics: Arc::new(Statistics::default()), - } - } -} - #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 12db332..9e18322 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -47,7 +47,8 @@ pub async fn run_request_worker( torrents.clone(), response_senders.clone(), receiver, - )).detach(); + )) + .detach(); handles.push(handle); } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 59f37d1..0da8ab9 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,3 +1,8 @@ +//! Work-in-progress glommio (io_uring) implementation +//! +//! * Doesn't support scrape requests +//! * Currently not faster than mio implementation + use std::sync::{atomic::AtomicUsize, Arc}; use glommio::channels::channel_mesh::MeshBuilder; diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 48840b6..93d5f04 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -51,13 +51,15 @@ pub async fn run_socket_worker( response_consumer_index, local_sender, socket.clone(), - )).detach(); + )) + .detach(); for (_, receiver) in response_receivers.streams().into_iter() { spawn_local(send_responses( socket.clone(), receiver.map(|(response, addr)| (response.into(), addr)), - )).detach(); + )) + .detach(); } send_responses(socket, local_receiver.stream()).await; @@ -174,7 +176,7 @@ where while let Some((response, src)) = stream.next().await { buf.set_position(0); - + ::log::debug!("preparing to send response: {:?}", response.clone()); response diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 3ea2bba..187d563 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,6 +1,4 @@ -use std::sync::Arc; - -use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; +use cfg_if::cfg_if; pub mod common; pub mod config; @@ -13,7 +11,7 @@ use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - cfg_if::cfg_if! { + cfg_if! { if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { glommio::run(config) } else { @@ -21,14 +19,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } } } - -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} - } -} diff --git a/aquatic_udp/src/lib/mio/common.rs b/aquatic_udp/src/lib/mio/common.rs new file mode 100644 index 0000000..8bf2233 --- /dev/null +++ b/aquatic_udp/src/lib/mio/common.rs @@ -0,0 +1,49 @@ +use aquatic_common::access_list::AccessListArcSwap; +use parking_lot::Mutex; +use std::sync::{atomic::AtomicUsize, Arc}; + +use crate::common::*; + +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + +#[derive(Default)] +pub struct Statistics { + pub requests_received: AtomicUsize, + pub responses_sent: AtomicUsize, + pub bytes_received: AtomicUsize, + pub bytes_sent: AtomicUsize, +} + +#[derive(Clone)] +pub struct State { + pub access_list: Arc, + pub torrents: Arc>, + pub statistics: Arc, +} + +impl Default for State { + fn default() -> Self { + Self { + access_list: Arc::new(AccessListArcSwap::default()), + torrents: Arc::new(Mutex::new(TorrentMaps::default())), + statistics: Arc::new(Statistics::default()), + } + } +} diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 99e6a46..549d061 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -10,6 +10,7 @@ use aquatic_udp_protocol::*; use crate::common::announce::handle_announce_request; use crate::common::*; use crate::config::Config; +use crate::mio::common::*; #[inline] pub fn handle_announce_requests( diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs index 0634702..af8e5a8 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; -use crate::common::*; use crate::config::Config; +use crate::mio::common::*; mod announce; mod scrape; diff --git a/aquatic_udp/src/lib/mio/handlers/scrape.rs b/aquatic_udp/src/lib/mio/handlers/scrape.rs index b544ccf..b1a6742 100644 --- a/aquatic_udp/src/lib/mio/handlers/scrape.rs +++ b/aquatic_udp/src/lib/mio/handlers/scrape.rs @@ -6,6 +6,8 @@ use parking_lot::MutexGuard; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; +use crate::mio::common::*; + use crate::common::*; #[inline] diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index f75ce9f..bf863ee 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -12,13 +12,16 @@ use anyhow::Context; use crossbeam_channel::unbounded; use privdrop::PrivDrop; +pub mod common; pub mod handlers; pub mod network; pub mod tasks; -use crate::common::State; +use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; + use crate::config::Config; -use crate::update_access_list; + +use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); @@ -126,3 +129,14 @@ pub fn start_workers( Ok(()) } + +pub fn update_access_list(config: &Config, access_list: &Arc) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = access_list.update_from_path(&config.access_list.path) { + ::log::error!("Update access list from path: {:?}", err); + } + } + AccessListMode::Off => {} + } +} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index e755fdd..fe34023 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -20,6 +20,8 @@ use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +use super::common::*; + pub fn run_socket_worker( state: State, config: Config, diff --git a/aquatic_udp/src/lib/mio/tasks.rs b/aquatic_udp/src/lib/mio/tasks.rs index 2fde39d..4d9fe16 100644 --- a/aquatic_udp/src/lib/mio/tasks.rs +++ b/aquatic_udp/src/lib/mio/tasks.rs @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering; use histogram::Histogram; -use crate::common::*; +use super::common::*; use crate::config::Config; pub fn gather_and_print_statistics(state: &State, config: &Config) { diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 12b35e3..d71f77c 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -8,6 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 7b3f75c..28c210e 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -15,6 +15,7 @@ use std::time::Duration; use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use aquatic_udp::mio::handlers; use config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 7b62152..a7d5c18 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -8,6 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig;