From f2b157a1490c9de6d07adb010832413f845b008b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 22:31:56 +0200 Subject: [PATCH] aquatic_udp: split some code into mio and glommio versions --- Cargo.lock | 311 ++++++++++++++++-- aquatic_udp/Cargo.toml | 3 + aquatic_udp/src/lib/common.rs | 27 +- aquatic_udp/src/lib/glommio/mod.rs | 1 + aquatic_udp/src/lib/glommio/network.rs | 200 +++++++++++ aquatic_udp/src/lib/lib.rs | 8 +- .../src/lib/{ => mio}/handlers/announce.rs | 0 aquatic_udp/src/lib/{ => mio}/handlers/mod.rs | 0 .../src/lib/{ => mio}/handlers/scrape.rs | 0 aquatic_udp/src/lib/mio/mod.rs | 2 + aquatic_udp/src/lib/{ => mio}/network.rs | 26 -- aquatic_udp_bench/src/main.rs | 2 +- 12 files changed, 527 insertions(+), 53 deletions(-) create mode 100644 aquatic_udp/src/lib/glommio/mod.rs create mode 100644 aquatic_udp/src/lib/glommio/network.rs rename aquatic_udp/src/lib/{ => mio}/handlers/announce.rs (100%) rename aquatic_udp/src/lib/{ => mio}/handlers/mod.rs (100%) rename aquatic_udp/src/lib/{ => mio}/handlers/scrape.rs (100%) create mode 100644 aquatic_udp/src/lib/mio/mod.rs rename aquatic_udp/src/lib/{ => mio}/network.rs (93%) diff --git a/Cargo.lock b/Cargo.lock index c8f0e2c..1814d40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ dependencies = [ "rand", "serde", "smartstring", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -160,6 +160,8 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "crossbeam-channel", + "futures-lite", + "glommio", "hashbrown 0.11.2", "histogram", "indexmap", @@ -172,7 +174,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -208,7 +210,7 @@ dependencies = [ "rand", "rand_distr", "serde", - "socket2", + "socket2 0.4.2", ] [[package]] @@ -244,7 +246,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "socket2", + "socket2 0.4.2", "tungstenite", ] @@ -306,7 +308,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -353,6 +355,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitmaps" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb" + [[package]] name = "block-buffer" version = "0.9.0" @@ -374,6 +382,12 @@ dependencies = [ "serde", ] +[[package]] +name = "buddy-alloc" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5" + [[package]] name = "bumpalo" version = "3.7.1" @@ -392,6 +406,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + [[package]] name = "cast" version = "0.2.7" @@ -423,7 +443,7 @@ dependencies = [ "num-integer", "num-traits", "time", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -437,6 +457,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "console" version = "0.15.0" @@ -447,7 +476,7 @@ dependencies = [ "libc", "once_cell", "terminal_size", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -511,6 +540,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.1" @@ -541,10 +584,20 @@ dependencies = [ "cfg-if", "crossbeam-utils", "lazy_static", - "memoffset", + "memoffset 0.6.4", "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -592,6 +645,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "enclose" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1056f553da426e9c025a662efa48b52e62e0a3a7648aa2d15aeaaf7f0d329357" + [[package]] name = "encode_unicode" version = "0.3.6" @@ -630,6 +689,15 @@ dependencies = [ "synstructure", ] +[[package]] +name = "fastrand" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e" +dependencies = [ + "instant", +] + [[package]] name = "float-cmp" version = "0.8.0" @@ -670,6 +738,33 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures-core" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" + +[[package]] +name = "futures-io" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" + +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -697,6 +792,37 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" +[[package]] +name = "glommio" +version = "0.6.0" +source = "git+https://github.com/DataDog/glommio.git#4e6b14772da2f4325271fbcf12d24cf91ed466e5" +dependencies = [ + "ahash 0.7.6", + "bitflags", + "bitmaps", + "buddy-alloc", + "cc", + "concurrent-queue", + "crossbeam", + "enclose", + "futures-lite", + "intrusive-collections", + "lazy_static", + "libc", + "lockfree", + "log", + "membarrier", + "nix", + "pin-project-lite", + "rlimit", + "scoped-tls", + "scopeguard", + "smallvec", + "socket2 0.3.19", + "tracing", + "typenum", +] + [[package]] name = "half" version = "1.8.0" @@ -813,6 +939,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "intrusive-collections" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb4ed164b4cf1c6bd6e18c097490331a0e58fbb0f39e8f6b5ac7f168006511cd" +dependencies = [ + "memoffset 0.5.6", +] + [[package]] name = "itertools" version = "0.10.1" @@ -837,6 +972,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -873,6 +1018,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockfree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" +dependencies = [ + "owned-alloc", +] + [[package]] name = "log" version = "0.4.14" @@ -888,12 +1042,33 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "membarrier" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "925b0811d7e5fb2b666b5906c5047b7ec23aab78edc4d51b7b0f82dc5c955b1c" +dependencies = [ + "cfg-if", + "kernel32-sys", + "lazy_static", + "libc", +] + [[package]] name = "memchr" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.6.4" @@ -932,7 +1107,7 @@ dependencies = [ "log", "miow", "ntapi", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -941,7 +1116,7 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -972,7 +1147,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "memoffset", + "memoffset 0.6.4", ] [[package]] @@ -987,7 +1162,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1096,6 +1271,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owned-alloc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" + +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.11.2" @@ -1118,7 +1305,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1127,6 +1314,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project-lite" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" + [[package]] name = "pkg-config" version = "0.3.20" @@ -1330,7 +1523,16 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "rlimit" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc0bf25554376fd362f54332b8410a625c71f15445bca32ffdfdf4ec9ac91726" +dependencies = [ + "libc", ] [[package]] @@ -1370,9 +1572,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" dependencies = [ "lazy_static", - "winapi", + "winapi 0.3.9", ] +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1532,6 +1740,17 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if", + "libc", + "winapi 0.3.9", +] + [[package]] name = "socket2" version = "0.4.2" @@ -1539,7 +1758,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1582,7 +1801,7 @@ dependencies = [ "rand", "redox_syscall", "remove_dir_all", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1601,7 +1820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1640,7 +1859,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1677,6 +1896,38 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4" +dependencies = [ + "lazy_static", +] + [[package]] name = "tungstenite" version = "0.15.0" @@ -1777,6 +2028,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.2" @@ -1784,7 +2041,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi", + "winapi 0.3.9", "winapi-util", ] @@ -1858,6 +2115,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1868,6 +2131,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1880,7 +2149,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 265b707..8eb57c0 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -32,6 +32,9 @@ rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4.1", features = ["all"] } +glommio = { git = "https://github.com/DataDog/glommio.git" } +futures-lite = "1" + [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index d733c52..5a81e46 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -1,5 +1,5 @@ use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; @@ -195,6 +195,31 @@ impl Default for State { } } +#[derive(Default)] +pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); + +impl ConnectionMap { + pub fn insert( + &mut self, + connection_id: ConnectionId, + socket_addr: SocketAddr, + valid_until: ValidUntil, + ) { + self.0.insert((connection_id, socket_addr), valid_until); + } + + pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + self.0.contains_key(&(connection_id, socket_addr)) + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0 > now); + self.0.shrink_to_fit(); + } +} + #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs new file mode 100644 index 0000000..a61610b --- /dev/null +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -0,0 +1 @@ +pub mod network; diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs new file mode 100644 index 0000000..2cd9138 --- /dev/null +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -0,0 +1,200 @@ +/// TODO +/// - move connection checks to socket workers +/// - ignore scrape requests. forward announce requests to request workers +/// sharded by info hash (with some nice algo to make it difficult for an +/// attacker to know which one they get forwarded to). this way, shared +/// state can be avoided. +use std::io::Cursor; +use std::net::{IpAddr, SocketAddr}; +use std::rc::Rc; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use futures_lite::StreamExt; +use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; +use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; +use glommio::net::UdpSocket; +use glommio::prelude::*; +use rand::prelude::{Rng, SeedableRng, StdRng}; + +use aquatic_udp_protocol::{IpVersion, Request, Response}; + +use crate::common::*; +use crate::config::Config; + +pub fn run_socket_worker( + state: State, + config: Config, + request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, + response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + num_bound_sockets: Arc, +) { + LocalExecutorBuilder::default() + .spawn(|| async move { + let (local_sender, local_receiver) = new_unbounded(); + + let mut socket = UdpSocket::bind(config.network.address).unwrap(); + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + socket.set_buffer_size(recv_buffer_size); + } + + let socket = Rc::new(socket); + + num_bound_sockets.fetch_add(1, Ordering::SeqCst); + + spawn_local(read_requests( + config.clone(), + state.access_list.clone(), + request_sender, + local_sender, + socket.clone(), + )) + .await; + spawn_local(send_responses(response_receiver, local_receiver, socket)).await; + }) + .expect("failed to spawn local executor") + .join() + .unwrap(); +} + +async fn read_requests( + config: Config, + access_list: Arc, + request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, + local_sender: LocalSender<(Response, SocketAddr)>, + socket: Rc, +) { + let request_sender = request_sender.connect().await; + + let mut rng = StdRng::from_entropy(); + + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + let access_list_mode = config.access_list.mode; + + let mut connections = ConnectionMap::default(); + + let mut buf = [0u8; 2048]; + + loop { + match socket.recv_from(&mut buf).await { + Ok((amt, src)) => { + let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); + + match request { + Ok(Request::Connect(request)) => { + let connection_id = ConnectionId(rng.gen()); + + connections.insert(connection_id, src, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_sender.try_send((response, src)); + } + Ok(Request::Announce(request)) => { + if connections.contains(request.connection_id, src) { + if access_list.allows(access_list_mode, &request.info_hash.0) { + if let Err(err) = request_sender + .try_send((ConnectedRequest::Announce(request), src)) + { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_sender.try_send((response, src)); + } + } + } + Ok(Request::Scrape(request)) => { + if connections.contains(request.connection_id, src) { + if let Err(err) = + request_sender.try_send((ConnectedRequest::Scrape(request), src)) + { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connections.contains(connection_id, src) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_sender.try_send((response.into(), src)); + } + } + } + } + } + Err(err) => { + ::log::error!("recv_from: {:?}", err); + } + } + + yield_if_needed().await; + } +} + +async fn send_responses( + response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + local_receiver: LocalReceiver<(Response, SocketAddr)>, + socket: Rc, +) { + let response_receiver = response_receiver.connect().await; + + let mut buf = [0u8; MAX_PACKET_SIZE]; + let mut buf = Cursor::new(&mut buf[..]); + + let mut stream = local_receiver + .stream() + .race(response_receiver.map(|(response, addr)| (response.into(), addr))); + + while let Some((response, src)) = stream.next().await { + buf.set_position(0); + + response + .write(&mut buf, ip_version_from_ip(src.ip())) + .expect("write response"); + + let position = buf.position() as usize; + + if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await { + ::log::info!("send_to failed: {:?}", err); + } + + yield_if_needed().await; + } +} + +fn ip_version_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => IpVersion::IPv4, + IpAddr::V6(ip) => { + if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { + IpVersion::IPv4 + } else { + IpVersion::IPv6 + } + } + } +} diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index a740a3e..c1aa9e7 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -11,8 +11,8 @@ use privdrop::PrivDrop; pub mod common; pub mod config; -pub mod handlers; -pub mod network; +pub mod glommio; +pub mod mio; pub mod tasks; use common::State; @@ -74,7 +74,7 @@ pub fn start_workers(config: Config, state: State) -> ::anyhow::Result ::anyhow::Result); - -impl ConnectionMap { - fn insert( - &mut self, - connection_id: ConnectionId, - socket_addr: SocketAddr, - valid_until: ValidUntil, - ) { - self.0.insert((connection_id, socket_addr), valid_until); - } - - fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { - self.0.contains_key(&(connection_id, socket_addr)) - } - - fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.0 > now); - self.0.shrink_to_fit(); - } -} - pub fn run_socket_worker( state: State, config: Config, diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index c89de8b..7b3f75c 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -15,7 +15,7 @@ use std::time::Duration; use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::handlers; +use aquatic_udp::mio::handlers; use config::BenchConfig;