From 667cf04085e79ccb637b8d56c95b80f770b26ef6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:15:34 +0100 Subject: [PATCH 1/9] ws: remove mio implementation --- Cargo.lock | 86 --- aquatic_ws/Cargo.toml | 22 +- aquatic_ws/src/{common/mod.rs => common.rs} | 9 +- aquatic_ws/src/config.rs | 143 ++--- aquatic_ws/src/glommio/common.rs | 10 - aquatic_ws/src/glommio/mod.rs | 107 ---- aquatic_ws/src/glommio/request.rs | 128 ---- aquatic_ws/src/lib.rs | 128 +++- aquatic_ws/src/mio/common.rs | 51 -- aquatic_ws/src/mio/mod.rs | 218 ------- aquatic_ws/src/mio/request.rs | 103 ---- aquatic_ws/src/mio/socket/connection.rs | 577 ------------------ aquatic_ws/src/mio/socket/mod.rs | 403 ------------ aquatic_ws/src/workers/mod.rs | 2 + .../handlers.rs => workers/request.rs} | 125 +++- aquatic_ws/src/{glommio => workers}/socket.rs | 2 - 16 files changed, 283 insertions(+), 1831 deletions(-) rename aquatic_ws/src/{common/mod.rs => common.rs} (96%) delete mode 100644 aquatic_ws/src/glommio/common.rs delete mode 100644 aquatic_ws/src/glommio/mod.rs delete mode 100644 aquatic_ws/src/glommio/request.rs delete mode 100644 aquatic_ws/src/mio/common.rs delete mode 100644 aquatic_ws/src/mio/mod.rs delete mode 100644 aquatic_ws/src/mio/request.rs delete mode 100644 aquatic_ws/src/mio/socket/connection.rs delete mode 100644 aquatic_ws/src/mio/socket/mod.rs create mode 100644 aquatic_ws/src/workers/mod.rs rename aquatic_ws/src/{common/handlers.rs => workers/request.rs} (59%) rename aquatic_ws/src/{glommio => workers}/socket.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index 641b919..46e4cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,18 +264,14 @@ dependencies = [ "aquatic_ws_protocol", "async-tungstenite", "cfg-if", - "crossbeam-channel", "either", "futures", "futures-lite", "futures-rustls", "glommio", "hashbrown 0.12.0", - "histogram", "log", "mimalloc", - "mio", - "parking_lot", "privdrop", "quickcheck", "quickcheck_macros", @@ -285,7 +281,6 @@ dependencies = [ "serde", "signal-hook", "slab", - "socket2 0.4.4", "tungstenite", ] @@ -1066,12 +1061,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "histogram" -version = "0.6.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" - [[package]] name = "http" version = "0.2.6" @@ -1471,29 +1460,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" -[[package]] -name = "parking_lot" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-sys", -] - [[package]] name = "percent-encoding" version = "2.1.0" @@ -1687,15 +1653,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "redox_syscall" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "regex" version = "1.5.4" @@ -2397,46 +2354,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "windows-sys" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" -dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_msvc" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" - -[[package]] -name = "windows_i686_gnu" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" - -[[package]] -name = "windows_i686_msvc" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 255cfa0..ea08336 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -14,10 +14,7 @@ name = "aquatic_ws" name = "aquatic_ws" [features] -default = ["with-mio"] cpu-pinning = ["aquatic_common/cpu-pinning"] -with-glommio = ["cpu-pinning", "async-tungstenite", "futures-lite", "futures", "futures-rustls", "glommio"] -with-mio = ["crossbeam-channel", "histogram", "mio", "parking_lot", "socket2"] [dependencies] aquatic_cli_helpers = "0.1.0" @@ -39,20 +36,11 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" tungstenite = "0.17" - -# mio -crossbeam-channel = { version = "0.5", optional = true } -histogram = { version = "0.6", optional = true } -mio = { version = "0.8", features = ["net", "os-poll"], optional = true } -parking_lot = { version = "0.12", optional = true } -socket2 = { version = "0.4", features = ["all"], optional = true } - -# glommio -async-tungstenite = { version = "0.17", optional = true } -futures-lite = { version = "1", optional = true } -futures = { version = "0.3", optional = true } -futures-rustls = { version = "0.22", optional = true } -glommio = { version = "0.7", optional = true } +async-tungstenite = "0.17" +futures-lite = "1" +futures = "0.3" +futures-rustls = "0.22" +glommio = "0.7" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_ws/src/common/mod.rs b/aquatic_ws/src/common.rs similarity index 96% rename from aquatic_ws/src/common/mod.rs rename to aquatic_ws/src/common.rs index 0f43d2b..f7ced95 100644 --- a/aquatic_ws/src/common/mod.rs +++ b/aquatic_ws/src/common.rs @@ -1,5 +1,3 @@ -pub mod handlers; - use std::fs::File; use std::io::BufReader; use std::sync::Arc; @@ -14,6 +12,13 @@ use aquatic_ws_protocol::*; use crate::config::Config; +pub type TlsConfig = futures_rustls::rustls::ServerConfig; + +#[derive(Default, Clone)] +pub struct State { + pub access_list: Arc, +} + #[derive(Copy, Clone, Debug)] pub struct PendingScrapeId(pub usize); diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 76c7f7a..10ae6e1 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -23,15 +23,28 @@ pub struct Config { pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, - #[cfg(feature = "with-mio")] - pub handlers: HandlerConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, #[cfg(feature = "cpu-pinning")] pub cpu_pinning: CpuPinningConfig, - #[cfg(feature = "with-mio")] - pub statistics: StatisticsConfig, +} + +impl Default for Config { + fn default() -> Self { + Self { + socket_workers: 1, + request_workers: 1, + log_level: LogLevel::default(), + network: NetworkConfig::default(), + protocol: ProtocolConfig::default(), + cleaning: CleaningConfig::default(), + privileges: PrivilegeConfig::default(), + access_list: AccessListConfig::default(), + #[cfg(feature = "cpu-pinning")] + cpu_pinning: Default::default(), + } + } } impl aquatic_cli_helpers::Config for Config { @@ -55,81 +68,6 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, - - #[cfg(feature = "with-mio")] - pub poll_event_capacity: usize, - #[cfg(feature = "with-mio")] - pub poll_timeout_microseconds: u64, -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct ProtocolConfig { - /// Maximum number of torrents to accept in scrape request - pub max_scrape_torrents: usize, - /// Maximum number of offers to accept in announce request - pub max_offers: usize, - /// Ask peers to announce this often (seconds) - pub peer_announce_interval: usize, -} - -#[cfg(feature = "with-mio")] -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct HandlerConfig { - /// Maximum number of requests to receive from channel before locking - /// mutex and starting work - pub max_requests_per_iter: usize, - pub channel_recv_timeout_microseconds: u64, -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct CleaningConfig { - /// Clean peers this often (seconds) - pub torrent_cleaning_interval: u64, - /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, - - // Clean connections this often (seconds) - #[cfg(feature = "with-glommio")] - pub connection_cleaning_interval: u64, - /// Close connections if no responses have been sent to them for this long (seconds) - #[cfg(feature = "with-glommio")] - pub max_connection_idle: u64, - - /// Remove connections that are older than this (seconds) - #[cfg(feature = "with-mio")] - pub max_connection_age: u64, -} - -#[cfg(feature = "with-mio")] -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct StatisticsConfig { - /// Print statistics this often (seconds). Do not print when set to zero. - pub interval: u64, -} - -impl Default for Config { - fn default() -> Self { - Self { - socket_workers: 1, - request_workers: 1, - log_level: LogLevel::default(), - network: NetworkConfig::default(), - protocol: ProtocolConfig::default(), - #[cfg(feature = "with-mio")] - handlers: Default::default(), - cleaning: CleaningConfig::default(), - privileges: PrivilegeConfig::default(), - access_list: AccessListConfig::default(), - #[cfg(feature = "cpu-pinning")] - cpu_pinning: Default::default(), - #[cfg(feature = "with-mio")] - statistics: Default::default(), - } - } } impl Default for NetworkConfig { @@ -143,15 +81,21 @@ impl Default for NetworkConfig { websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, - - #[cfg(feature = "with-mio")] - poll_event_capacity: 4096, - #[cfg(feature = "with-mio")] - poll_timeout_microseconds: 200_000, } } } +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct ProtocolConfig { + /// Maximum number of torrents to accept in scrape request + pub max_scrape_torrents: usize, + /// Maximum number of offers to accept in announce request + pub max_offers: usize, + /// Ask peers to announce this often (seconds) + pub peer_announce_interval: usize, +} + impl Default for ProtocolConfig { fn default() -> Self { Self { @@ -162,14 +106,17 @@ impl Default for ProtocolConfig { } } -#[cfg(feature = "with-mio")] -impl Default for HandlerConfig { - fn default() -> Self { - Self { - max_requests_per_iter: 256, - channel_recv_timeout_microseconds: 200, - } - } +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct CleaningConfig { + /// Clean peers this often (seconds) + pub torrent_cleaning_interval: u64, + /// Remove peers that have not announced for this long (seconds) + pub max_peer_age: u64, + // Clean connections this often (seconds) + pub connection_cleaning_interval: u64, + /// Close connections if no responses have been sent to them for this long (seconds) + pub max_connection_idle: u64, } impl Default for CleaningConfig { @@ -177,24 +124,12 @@ impl Default for CleaningConfig { Self { torrent_cleaning_interval: 30, max_peer_age: 1800, - #[cfg(feature = "with-glommio")] max_connection_idle: 60 * 5, - - #[cfg(feature = "with-mio")] - max_connection_age: 1800, - #[cfg(feature = "with-glommio")] connection_cleaning_interval: 30, } } } -#[cfg(feature = "with-mio")] -impl Default for StatisticsConfig { - fn default() -> Self { - Self { interval: 0 } - } -} - #[cfg(test)] mod tests { use super::Config; diff --git a/aquatic_ws/src/glommio/common.rs b/aquatic_ws/src/glommio/common.rs deleted file mode 100644 index 539176e..0000000 --- a/aquatic_ws/src/glommio/common.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::sync::Arc; - -use aquatic_common::access_list::AccessListArcSwap; - -pub type TlsConfig = futures_rustls::rustls::ServerConfig; - -#[derive(Default, Clone)] -pub struct State { - pub access_list: Arc, -} diff --git a/aquatic_ws/src/glommio/mod.rs b/aquatic_ws/src/glommio/mod.rs deleted file mode 100644 index f1ceb69..0000000 --- a/aquatic_ws/src/glommio/mod.rs +++ /dev/null @@ -1,107 +0,0 @@ -pub mod common; -pub mod request; -pub mod socket; - -use std::sync::{atomic::AtomicUsize, Arc}; - -use crate::{common::create_tls_config, config::Config}; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; - -use self::common::*; - -use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; - -pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; - -pub fn run(config: Config, state: State) -> anyhow::Result<()> { - let num_peers = config.socket_workers + config.request_workers; - - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); - let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - let tls_config = Arc::new(create_tls_config(&config).unwrap()); - - let mut executors = Vec::new(); - - for i in 0..(config.socket_workers) { - let config = config.clone(); - let state = state.clone(); - let tls_config = tls_config.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - let builder = LocalExecutorBuilder::default().name("socket"); - - let executor = builder.spawn(move || async move { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::SocketWorker(i), - ); - - socket::run_socket_worker( - config, - state, - tls_config, - request_mesh_builder, - response_mesh_builder, - num_bound_sockets, - ) - .await - }); - - executors.push(executor); - } - - for i in 0..(config.request_workers) { - let config = config.clone(); - let state = state.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - - let builder = LocalExecutorBuilder::default().name("request"); - - let executor = builder.spawn(move || async move { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::RequestWorker(i), - ); - - request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) - .await - }); - - executors.push(executor); - } - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - for executor in executors { - executor - .expect("failed to spawn local executor") - .join() - .unwrap(); - } - - Ok(()) -} diff --git a/aquatic_ws/src/glommio/request.rs b/aquatic_ws/src/glommio/request.rs deleted file mode 100644 index bf9cdfb..0000000 --- a/aquatic_ws/src/glommio/request.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::cell::RefCell; -use std::rc::Rc; -use std::time::Duration; - -use futures::StreamExt; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::enclose; -use glommio::prelude::*; -use glommio::timer::TimerActionRepeat; -use rand::{rngs::SmallRng, SeedableRng}; - -use aquatic_ws_protocol::*; - -use crate::common::handlers::*; -use crate::common::*; -use crate::config::Config; - -use super::common::State; -use super::SHARED_IN_CHANNEL_SIZE; - -pub async fn run_request_worker( - config: Config, - state: State, - in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, - out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, -) { - let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap(); - let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap(); - - let out_message_senders = Rc::new(out_message_senders); - - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = state.access_list; - - // Periodically clean torrents - TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { - enclose!((config, torrents, access_list) move || async move { - torrents.borrow_mut().clean(&config, &access_list); - - Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) - })() - })); - - let mut handles = Vec::new(); - - for (_, receiver) in in_message_receivers.streams() { - let handle = spawn_local(handle_request_stream( - config.clone(), - torrents.clone(), - out_message_senders.clone(), - receiver, - )) - .detach(); - - handles.push(handle); - } - - for handle in handles { - handle.await; - } -} - -async fn handle_request_stream( - config: Config, - torrents: Rc>, - out_message_senders: Rc>, - stream: S, -) where - S: futures_lite::Stream + ::std::marker::Unpin, -{ - let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); - - let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); - - TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { - enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); - - Some(Duration::from_secs(1)) - })() - })); - - let config = &config; - let torrents = &torrents; - let peer_valid_until = &peer_valid_until; - let rng = &rng; - let out_message_senders = &out_message_senders; - - stream - .for_each_concurrent( - SHARED_IN_CHANNEL_SIZE, - move |(meta, in_message)| async move { - let mut out_messages = Vec::new(); - - match in_message { - InMessage::AnnounceRequest(request) => handle_announce_request( - &config, - &mut rng.borrow_mut(), - &mut torrents.borrow_mut(), - &mut out_messages, - peer_valid_until.borrow().to_owned(), - meta, - request, - ), - InMessage::ScrapeRequest(request) => handle_scrape_request( - &config, - &mut torrents.borrow_mut(), - &mut out_messages, - meta, - request, - ), - }; - - for (meta, out_message) in out_messages.drain(..) { - ::log::info!("request worker trying to send OutMessage to socket worker"); - - out_message_senders - .send_to(meta.out_message_consumer_id.0, (meta, out_message)) - .await - .expect("failed sending out_message to socket worker"); - - ::log::info!("request worker sent OutMessage to socket worker"); - } - }, - ) - .await; -} diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 2a4f0d9..a9a496c 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -1,28 +1,26 @@ use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use cfg_if::cfg_if; +use common::State; use signal_hook::{consts::SIGUSR1, iterator::Signals}; -use crate::config::Config; +use std::sync::{atomic::AtomicUsize, Arc}; + +use crate::{common::create_tls_config, config::Config}; +use aquatic_common::privileges::drop_privileges_after_socket_binding; + +use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; pub mod common; pub mod config; -#[cfg(feature = "with-glommio")] -pub mod glommio; -#[cfg(feature = "with-mio")] -pub mod mio; +pub mod workers; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; +pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; + pub fn run(config: Config) -> ::anyhow::Result<()> { - cfg_if!( - if #[cfg(feature = "with-glommio")] { - let state = glommio::common::State::default(); - } else { - let state = mio::common::State::default(); - } - ); + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -32,13 +30,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let config = config.clone(); let state = state.clone(); - cfg_if!( - if #[cfg(feature = "with-glommio")] { - ::std::thread::spawn(move || glommio::run(config, state)); - } else { - ::std::thread::spawn(move || mio::run(config, state)); - } - ); + ::std::thread::spawn(move || run_workers(config, state)); } #[cfg(feature = "cpu-pinning")] @@ -59,3 +51,99 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } + +pub fn run_workers(config: Config, state: State) -> anyhow::Result<()> { + let num_peers = config.socket_workers + config.request_workers; + + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + let tls_config = Arc::new(create_tls_config(&config).unwrap()); + + let mut executors = Vec::new(); + + for i in 0..(config.socket_workers) { + let config = config.clone(); + let state = state.clone(); + let tls_config = tls_config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + let builder = LocalExecutorBuilder::default().name("socket"); + + let executor = builder.spawn(move || async move { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); + + workers::socket::run_socket_worker( + config, + state, + tls_config, + request_mesh_builder, + response_mesh_builder, + num_bound_sockets, + ) + .await + }); + + executors.push(executor); + } + + for i in 0..(config.request_workers) { + let config = config.clone(); + let state = state.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + + let builder = LocalExecutorBuilder::default().name("request"); + + let executor = builder.spawn(move || async move { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); + + workers::request::run_request_worker( + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await + }); + + executors.push(executor); + } + + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); + + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + + for executor in executors { + executor + .expect("failed to spawn local executor") + .join() + .unwrap(); + } + + Ok(()) +} diff --git a/aquatic_ws/src/mio/common.rs b/aquatic_ws/src/mio/common.rs deleted file mode 100644 index 5337ca7..0000000 --- a/aquatic_ws/src/mio/common.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::sync::Arc; - -use aquatic_common::access_list::AccessListArcSwap; -use aquatic_ws_protocol::*; -use crossbeam_channel::{Receiver, Sender}; -use log::error; -use mio::Token; -use parking_lot::Mutex; - -use crate::common::*; - -pub const LISTENER_TOKEN: Token = Token(0); -pub const CHANNEL_TOKEN: Token = Token(1); - -#[derive(Clone)] -pub struct State { - pub access_list: Arc, - pub torrent_maps: Arc>, -} - -impl Default for State { - fn default() -> Self { - Self { - access_list: Arc::new(Default::default()), - torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())), - } - } -} - -pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>; -pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>; -pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>; - -#[derive(Clone)] -pub struct OutMessageSender(Vec>); - -impl OutMessageSender { - pub fn new(senders: Vec>) -> Self { - Self(senders) - } - - #[inline] - pub fn send(&self, meta: ConnectionMeta, message: OutMessage) { - if let Err(err) = self.0[meta.out_message_consumer_id.0].send((meta, message)) { - error!("OutMessageSender: couldn't send message: {:?}", err); - } - } -} - -pub type SocketWorkerStatus = Option>; -pub type SocketWorkerStatuses = Arc>>; diff --git a/aquatic_ws/src/mio/mod.rs b/aquatic_ws/src/mio/mod.rs deleted file mode 100644 index 54a2fa4..0000000 --- a/aquatic_ws/src/mio/mod.rs +++ /dev/null @@ -1,218 +0,0 @@ -use std::sync::Arc; -use std::thread::Builder; -use std::time::Duration; - -use anyhow::Context; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use histogram::Histogram; -use mio::{Poll, Waker}; -use parking_lot::Mutex; -use privdrop::PrivDrop; - -pub mod common; -pub mod request; -pub mod socket; - -use crate::{common::create_tls_config, config::Config}; -use common::*; - -pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; - -const SHARED_IN_CHANNEL_SIZE: usize = 1024; - -pub fn run(config: Config, state: State) -> anyhow::Result<()> { - start_workers(config.clone(), state.clone()).expect("couldn't start workers"); - - // TODO: privdrop here instead - - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - loop { - ::std::thread::sleep(Duration::from_secs( - config.cleaning.torrent_cleaning_interval, - )); - - state.torrent_maps.lock().clean(&config, &state.access_list); - } -} - -pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { - let tls_config = Arc::new(create_tls_config(&config)?); - - let (in_message_sender, in_message_receiver) = - ::crossbeam_channel::bounded(SHARED_IN_CHANNEL_SIZE); - - let mut out_message_senders = Vec::new(); - let mut wakers = Vec::new(); - - let socket_worker_statuses: SocketWorkerStatuses = { - let mut statuses = Vec::new(); - - for _ in 0..config.socket_workers { - statuses.push(None); - } - - Arc::new(Mutex::new(statuses)) - }; - - for i in 0..config.socket_workers { - let config = config.clone(); - let state = state.clone(); - let socket_worker_statuses = socket_worker_statuses.clone(); - let in_message_sender = in_message_sender.clone(); - let tls_config = tls_config.clone(); - let poll = Poll::new()?; - let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN)?); - - let (out_message_sender, out_message_receiver) = - ::crossbeam_channel::bounded(SHARED_IN_CHANNEL_SIZE * 16); - - out_message_senders.push(out_message_sender); - wakers.push(waker); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::SocketWorker(i), - ); - - socket::run_socket_worker( - config, - state, - i, - socket_worker_statuses, - poll, - in_message_sender, - out_message_receiver, - tls_config, - ); - })?; - } - - // Wait for socket worker statuses. On error from any, quit program. - // On success from all, drop privileges if corresponding setting is set - // and continue program. - loop { - ::std::thread::sleep(::std::time::Duration::from_millis(10)); - - if let Some(statuses) = socket_worker_statuses.try_lock() { - for opt_status in statuses.iter() { - if let Some(Err(err)) = opt_status { - return Err(::anyhow::anyhow!(err.to_owned())); - } - } - - if statuses.iter().all(Option::is_some) { - if config.privileges.drop_privileges { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply() - .context("Couldn't drop root privileges")?; - } - - break; - } - } - } - - let out_message_sender = OutMessageSender::new(out_message_senders); - - for i in 0..config.request_workers { - let config = config.clone(); - let state = state.clone(); - let in_message_receiver = in_message_receiver.clone(); - let out_message_sender = out_message_sender.clone(); - let wakers = wakers.clone(); - - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::RequestWorker(i), - ); - - request::run_request_worker( - config, - state, - in_message_receiver, - out_message_sender, - wakers, - ); - })?; - } - - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); - - Builder::new() - .name("statistics".to_string()) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - print_statistics(&state); - } - }) - .expect("spawn statistics thread"); - } - - Ok(()) -} - -fn print_statistics(state: &State) { - let mut peers_per_torrent = Histogram::new(); - - { - let torrents = &mut state.torrent_maps.lock(); - - for torrent in torrents.ipv4.values() { - let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; - - if let Err(err) = peers_per_torrent.increment(num_peers) { - eprintln!("error incrementing peers_per_torrent histogram: {}", err) - } - } - for torrent in torrents.ipv6.values() { - let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; - - if let Err(err) = peers_per_torrent.increment(num_peers) { - eprintln!("error incrementing peers_per_torrent histogram: {}", err) - } - } - } - - if peers_per_torrent.entries() != 0 { - println!( - "peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}", - peers_per_torrent.minimum().unwrap(), - peers_per_torrent.percentile(50.0).unwrap(), - peers_per_torrent.percentile(75.0).unwrap(), - peers_per_torrent.percentile(90.0).unwrap(), - peers_per_torrent.percentile(99.0).unwrap(), - peers_per_torrent.percentile(99.9).unwrap(), - peers_per_torrent.maximum().unwrap(), - ); - } -} diff --git a/aquatic_ws/src/mio/request.rs b/aquatic_ws/src/mio/request.rs deleted file mode 100644 index d3b905a..0000000 --- a/aquatic_ws/src/mio/request.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use mio::Waker; -use parking_lot::MutexGuard; -use rand::{rngs::SmallRng, SeedableRng}; - -use aquatic_ws_protocol::*; - -use crate::common::handlers::{handle_announce_request, handle_scrape_request}; -use crate::common::*; -use crate::config::Config; - -use super::common::*; - -pub fn run_request_worker( - config: Config, - state: State, - in_message_receiver: InMessageReceiver, - out_message_sender: OutMessageSender, - wakers: Vec>, -) { - let mut wake_socket_workers: Vec = (0..config.socket_workers).map(|_| false).collect(); - - let mut announce_requests = Vec::new(); - let mut scrape_requests = Vec::new(); - let mut out_messages = Vec::new(); - - let mut rng = SmallRng::from_entropy(); - - let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); - - loop { - let mut opt_torrent_map_guard: Option> = None; - - for i in 0..config.handlers.max_requests_per_iter { - let opt_in_message = if i == 0 { - in_message_receiver.recv().ok() - } else { - in_message_receiver.recv_timeout(timeout).ok() - }; - - match opt_in_message { - Some((meta, InMessage::AnnounceRequest(r))) => { - announce_requests.push((meta, r)); - } - Some((meta, InMessage::ScrapeRequest(r))) => { - scrape_requests.push((meta, r)); - } - None => { - if let Some(torrent_guard) = state.torrent_maps.try_lock() { - opt_torrent_map_guard = Some(torrent_guard); - - break; - } - } - } - } - - let mut torrent_map_guard = - opt_torrent_map_guard.unwrap_or_else(|| state.torrent_maps.lock()); - - let valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - for (meta, request) in announce_requests.drain(..) { - handle_announce_request( - &config, - &mut rng, - &mut torrent_map_guard, - &mut out_messages, - valid_until, - meta, - request, - ); - } - for (meta, request) in scrape_requests.drain(..) { - handle_scrape_request( - &config, - &mut torrent_map_guard, - &mut out_messages, - meta, - request, - ); - } - - ::std::mem::drop(torrent_map_guard); - - for (meta, out_message) in out_messages.drain(..) { - wake_socket_workers[meta.out_message_consumer_id.0] = true; - out_message_sender.send(meta, out_message); - } - - for (worker_index, wake) in wake_socket_workers.iter_mut().enumerate() { - if *wake { - if let Err(err) = wakers[worker_index].wake() { - ::log::error!("request handler couldn't wake poll: {:?}", err); - } - - *wake = false; - } - } - } -} diff --git a/aquatic_ws/src/mio/socket/connection.rs b/aquatic_ws/src/mio/socket/connection.rs deleted file mode 100644 index 50f5ca2..0000000 --- a/aquatic_ws/src/mio/socket/connection.rs +++ /dev/null @@ -1,577 +0,0 @@ -use std::{collections::VecDeque, io::ErrorKind, marker::PhantomData, net::Shutdown, sync::Arc}; - -use aquatic_common::ValidUntil; -use aquatic_ws_protocol::{InMessage, OutMessage}; -use mio::{net::TcpStream, Interest, Poll, Token}; -use rustls::{ServerConfig, ServerConnection}; -use tungstenite::{ - handshake::{server::NoCallback, MidHandshake}, - protocol::WebSocketConfig, - HandshakeError, ServerHandshake, -}; - -use crate::common::ConnectionMeta; - -const MAX_PENDING_MESSAGES: usize = 16; - -type TlsStream = rustls::StreamOwned; - -type WsHandshakeResult = - Result, HandshakeError>>; - -type ConnectionReadResult = ::std::io::Result>; - -pub trait RegistryStatus {} - -pub struct Registered; - -impl RegistryStatus for Registered {} - -pub struct NotRegistered; - -impl RegistryStatus for NotRegistered {} - -enum ConnectionReadStatus { - Message(T, InMessage), - Ok(T), - WouldBlock(T), -} - -enum ConnectionState { - TlsHandshaking(TlsHandshaking), - WsHandshaking(WsHandshaking), - WsConnection(WsConnection), -} - -pub struct Connection { - pub valid_until: ValidUntil, - meta: ConnectionMeta, - state: ConnectionState, - pub message_queue: VecDeque, - pub interest: Interest, - phantom_data: PhantomData, -} - -impl Connection { - pub fn get_meta(&self) -> ConnectionMeta { - self.meta - } -} - -impl Connection { - pub fn new( - tls_config: Arc, - ws_config: WebSocketConfig, - tcp_stream: TcpStream, - valid_until: ValidUntil, - meta: ConnectionMeta, - ) -> Self { - let state = - ConnectionState::TlsHandshaking(TlsHandshaking::new(tls_config, ws_config, tcp_stream)); - - Self { - valid_until, - meta, - state, - message_queue: Default::default(), - interest: Interest::READABLE, - phantom_data: PhantomData::default(), - } - } - - /// Read until stream blocks (or error occurs) - /// - /// Requires Connection not to be registered, since it might be dropped on errors - pub fn read( - mut self, - message_handler: &mut F, - ) -> ::std::io::Result> - where - F: FnMut(ConnectionMeta, InMessage), - { - loop { - let result = match self.state { - ConnectionState::TlsHandshaking(inner) => inner.read(), - ConnectionState::WsHandshaking(inner) => inner.read(), - ConnectionState::WsConnection(inner) => inner.read(), - }; - - match result { - Ok(ConnectionReadStatus::Message(state, message)) => { - self.state = state; - - message_handler(self.meta, message); - - // Stop looping even if WouldBlock wasn't necessarily reached. Otherwise, - // we might get stuck reading from this connection only. Since we register - // the connection again upon reinsertion into the ConnectionMap, we should - // be getting new events anyway. - return Ok(self); - } - Ok(ConnectionReadStatus::Ok(state)) => { - self.state = state; - - ::log::debug!("read connection"); - } - Ok(ConnectionReadStatus::WouldBlock(state)) => { - self.state = state; - - ::log::debug!("reading connection would block"); - - return Ok(self); - } - Err(err) => { - ::log::debug!("Connection::read error: {}", err); - - return Err(err); - } - } - } - } - - pub fn register(self, poll: &mut Poll, token: Token) -> Connection { - let state = match self.state { - ConnectionState::TlsHandshaking(inner) => { - ConnectionState::TlsHandshaking(inner.register(poll, token, self.interest)) - } - ConnectionState::WsHandshaking(inner) => { - ConnectionState::WsHandshaking(inner.register(poll, token, self.interest)) - } - ConnectionState::WsConnection(inner) => { - ConnectionState::WsConnection(inner.register(poll, token, self.interest)) - } - }; - - Connection { - valid_until: self.valid_until, - meta: self.meta, - state, - message_queue: self.message_queue, - interest: self.interest, - phantom_data: PhantomData::default(), - } - } - - pub fn close(self) { - ::log::debug!("will close connection to {}", self.meta.peer_addr.get()); - - match self.state { - ConnectionState::TlsHandshaking(inner) => inner.close(), - ConnectionState::WsHandshaking(inner) => inner.close(), - ConnectionState::WsConnection(inner) => inner.close(), - } - } -} - -impl Connection { - pub fn write_or_queue_message( - &mut self, - poll: &mut Poll, - message: OutMessage, - ) -> ::std::io::Result<()> { - let message_clone = message.clone(); - - match self.write_message(message) { - Ok(()) => Ok(()), - Err(err) if err.kind() == ErrorKind::WouldBlock => { - if self.message_queue.len() < MAX_PENDING_MESSAGES { - self.message_queue.push_back(message_clone); - - if !self.interest.is_writable() { - self.interest = Interest::WRITABLE; - - self.reregister(poll)?; - } - } else { - ::log::info!("Connection::message_queue is full, dropping message"); - } - - Ok(()) - } - Err(err) => Err(err), - } - } - - pub fn write(&mut self, poll: &mut Poll) -> ::std::io::Result<()> { - if let ConnectionState::WsConnection(_) = self.state { - while let Some(message) = self.message_queue.pop_front() { - let message_clone = message.clone(); - - match self.write_message(message) { - Ok(()) => {} - Err(err) if err.kind() == ErrorKind::WouldBlock => { - // Can't make message queue longer than it was before pop_front - self.message_queue.push_front(message_clone); - - return Ok(()); - } - Err(err) => { - return Err(err); - } - } - } - - if self.message_queue.is_empty() { - self.interest = Interest::READABLE; - } - - self.reregister(poll)?; - - Ok(()) - } else { - Err(std::io::Error::new( - ErrorKind::NotConnected, - "WebSocket connection not established", - )) - } - } - - fn write_message(&mut self, message: OutMessage) -> ::std::io::Result<()> { - if let ConnectionState::WsConnection(WsConnection { - ref mut web_socket, .. - }) = self.state - { - match web_socket.write_message(message.to_ws_message()) { - Ok(_) => {} - Err(tungstenite::Error::SendQueueFull(_message)) => { - return Err(std::io::Error::new( - ErrorKind::WouldBlock, - "Send queue full", - )) - } - Err(tungstenite::Error::Io(err)) => return Err(err), - Err(err) => return Err(std::io::Error::new(ErrorKind::Other, err))?, - } - - match web_socket.write_pending() { - Ok(()) => Ok(()), - Err(tungstenite::Error::Io(err)) => Err(err), - Err(err) => Err(std::io::Error::new(ErrorKind::Other, err))?, - } - } else { - Err(std::io::Error::new( - ErrorKind::NotConnected, - "WebSocket connection not established", - )) - } - } - - pub fn reregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> { - let token = Token(self.meta.connection_id.0); - - match self.state { - ConnectionState::TlsHandshaking(ref mut inner) => { - inner.reregister(poll, token, self.interest) - } - ConnectionState::WsHandshaking(ref mut inner) => { - inner.reregister(poll, token, self.interest) - } - ConnectionState::WsConnection(ref mut inner) => { - inner.reregister(poll, token, self.interest) - } - } - } - - pub fn deregister(self, poll: &mut Poll) -> Connection { - let state = match self.state { - ConnectionState::TlsHandshaking(inner) => { - ConnectionState::TlsHandshaking(inner.deregister(poll)) - } - ConnectionState::WsHandshaking(inner) => { - ConnectionState::WsHandshaking(inner.deregister(poll)) - } - ConnectionState::WsConnection(inner) => { - ConnectionState::WsConnection(inner.deregister(poll)) - } - }; - - Connection { - valid_until: self.valid_until, - meta: self.meta, - state, - message_queue: self.message_queue, - interest: self.interest, - phantom_data: PhantomData::default(), - } - } -} - -struct TlsHandshaking { - tls_conn: ServerConnection, - ws_config: WebSocketConfig, - tcp_stream: TcpStream, - phantom_data: PhantomData, -} - -impl TlsHandshaking { - fn new(tls_config: Arc, ws_config: WebSocketConfig, stream: TcpStream) -> Self { - Self { - tls_conn: ServerConnection::new(tls_config).unwrap(), - ws_config, - tcp_stream: stream, - phantom_data: PhantomData::default(), - } - } - - fn read(mut self) -> ConnectionReadResult> { - match self.tls_conn.read_tls(&mut self.tcp_stream) { - Ok(0) => { - return Err(::std::io::Error::new( - ErrorKind::ConnectionReset, - "Connection closed", - )) - } - Ok(_) => match self.tls_conn.process_new_packets() { - Ok(_) => { - while self.tls_conn.wants_write() { - self.tls_conn.write_tls(&mut self.tcp_stream)?; - } - - if self.tls_conn.is_handshaking() { - Ok(ConnectionReadStatus::WouldBlock( - ConnectionState::TlsHandshaking(self), - )) - } else { - let tls_stream = TlsStream::new(self.tls_conn, self.tcp_stream); - - WsHandshaking::handle_handshake_result(tungstenite::accept_with_config( - tls_stream, - Some(self.ws_config), - )) - } - } - Err(err) => { - let _ = self.tls_conn.write_tls(&mut self.tcp_stream); - - Err(::std::io::Error::new(ErrorKind::InvalidData, err)) - } - }, - Err(err) if err.kind() == ErrorKind::WouldBlock => { - return Ok(ConnectionReadStatus::WouldBlock( - ConnectionState::TlsHandshaking(self), - )) - } - Err(err) => return Err(err), - } - } - - fn register( - mut self, - poll: &mut Poll, - token: Token, - interest: Interest, - ) -> TlsHandshaking { - poll.registry() - .register(&mut self.tcp_stream, token, interest) - .unwrap(); - - TlsHandshaking { - tls_conn: self.tls_conn, - ws_config: self.ws_config, - tcp_stream: self.tcp_stream, - phantom_data: PhantomData::default(), - } - } - - fn close(self) { - ::log::debug!("closing connection (TlsHandshaking state)"); - - let _ = self.tcp_stream.shutdown(Shutdown::Both); - } -} - -impl TlsHandshaking { - fn deregister(mut self, poll: &mut Poll) -> TlsHandshaking { - poll.registry().deregister(&mut self.tcp_stream).unwrap(); - - TlsHandshaking { - tls_conn: self.tls_conn, - ws_config: self.ws_config, - tcp_stream: self.tcp_stream, - phantom_data: PhantomData::default(), - } - } - - fn reregister( - &mut self, - poll: &mut Poll, - token: Token, - interest: Interest, - ) -> std::io::Result<()> { - poll.registry() - .reregister(&mut self.tcp_stream, token, interest) - } -} - -struct WsHandshaking { - mid_handshake: MidHandshake>, - phantom_data: PhantomData, -} - -impl WsHandshaking { - fn read(self) -> ConnectionReadResult> { - Self::handle_handshake_result(self.mid_handshake.handshake()) - } - - fn handle_handshake_result( - handshake_result: WsHandshakeResult, - ) -> ConnectionReadResult> { - match handshake_result { - Ok(web_socket) => { - let conn = ConnectionState::WsConnection(WsConnection { - web_socket, - phantom_data: PhantomData::default(), - }); - - Ok(ConnectionReadStatus::Ok(conn)) - } - Err(HandshakeError::Interrupted(mid_handshake)) => { - let conn = ConnectionState::WsHandshaking(WsHandshaking { - mid_handshake, - phantom_data: PhantomData::default(), - }); - - Ok(ConnectionReadStatus::WouldBlock(conn)) - } - Err(HandshakeError::Failure(err)) => { - return Err(std::io::Error::new(ErrorKind::InvalidData, err)) - } - } - } - - fn register( - mut self, - poll: &mut Poll, - token: Token, - interest: Interest, - ) -> WsHandshaking { - let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock; - - poll.registry() - .register(tcp_stream, token, interest) - .unwrap(); - - WsHandshaking { - mid_handshake: self.mid_handshake, - phantom_data: PhantomData::default(), - } - } - - fn close(mut self) { - ::log::debug!("closing connection (WsHandshaking state)"); - - let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock; - - let _ = tcp_stream.shutdown(Shutdown::Both); - } -} - -impl WsHandshaking { - fn deregister(mut self, poll: &mut Poll) -> WsHandshaking { - let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock; - - poll.registry().deregister(tcp_stream).unwrap(); - - WsHandshaking { - mid_handshake: self.mid_handshake, - phantom_data: PhantomData::default(), - } - } - - fn reregister( - &mut self, - poll: &mut Poll, - token: Token, - interest: Interest, - ) -> std::io::Result<()> { - let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock; - - poll.registry().reregister(tcp_stream, token, interest) - } -} - -struct WsConnection { - web_socket: tungstenite::WebSocket, - phantom_data: PhantomData, -} - -impl WsConnection { - fn read(mut self) -> ConnectionReadResult> { - match self.web_socket.read_message() { - Ok( - message @ tungstenite::Message::Text(_) | message @ tungstenite::Message::Binary(_), - ) => match InMessage::from_ws_message(message) { - Ok(message) => { - ::log::debug!("received WebSocket message"); - - Ok(ConnectionReadStatus::Message( - ConnectionState::WsConnection(self), - message, - )) - } - Err(err) => Err(std::io::Error::new(ErrorKind::InvalidData, err)), - }, - Ok(message) => { - ::log::info!("received unexpected WebSocket message: {}", message); - - Err(std::io::Error::new( - ErrorKind::InvalidData, - "unexpected WebSocket message type", - )) - } - Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { - let conn = ConnectionState::WsConnection(self); - - Ok(ConnectionReadStatus::WouldBlock(conn)) - } - Err(tungstenite::Error::Io(err)) => Err(err), - Err(err) => Err(std::io::Error::new(ErrorKind::InvalidData, err)), - } - } - - fn register( - mut self, - poll: &mut Poll, - token: Token, - interest: Interest, - ) -> WsConnection { - poll.registry() - .register(self.web_socket.get_mut().get_mut(), token, interest) - .unwrap(); - - WsConnection { - web_socket: self.web_socket, - phantom_data: PhantomData::default(), - } - } - - fn close(mut self) { - ::log::debug!("closing connection (WsConnection state)"); - - let _ = self.web_socket.close(None); - let _ = self.web_socket.write_pending(); - } -} - -impl WsConnection { - fn deregister(mut self, poll: &mut Poll) -> WsConnection { - poll.registry() - .deregister(self.web_socket.get_mut().get_mut()) - .unwrap(); - - WsConnection { - web_socket: self.web_socket, - phantom_data: PhantomData::default(), - } - } - - fn reregister( - &mut self, - poll: &mut Poll, - token: Token, - interest: Interest, - ) -> std::io::Result<()> { - poll.registry() - .reregister(self.web_socket.get_mut().get_mut(), token, interest) - } -} diff --git a/aquatic_ws/src/mio/socket/mod.rs b/aquatic_ws/src/mio/socket/mod.rs deleted file mode 100644 index 181c8f6..0000000 --- a/aquatic_ws/src/mio/socket/mod.rs +++ /dev/null @@ -1,403 +0,0 @@ -use std::io::ErrorKind; -use std::sync::Arc; -use std::time::{Duration, Instant}; - -use anyhow::Context; -use aquatic_common::access_list::AccessListQuery; -use aquatic_common::CanonicalSocketAddr; -use hashbrown::HashMap; -use mio::net::TcpListener; -use mio::{Events, Interest, Poll, Token}; -use socket2::{Domain, Protocol, Socket, Type}; -use tungstenite::protocol::WebSocketConfig; - -use aquatic_ws_protocol::*; - -use crate::common::*; -use crate::config::Config; - -pub mod connection; - -use super::common::*; - -use connection::{Connection, NotRegistered, Registered}; - -struct ConnectionMap { - token_counter: Token, - connections: HashMap>, -} - -impl Default for ConnectionMap { - fn default() -> Self { - Self { - token_counter: Token(2), - connections: Default::default(), - } - } -} - -impl ConnectionMap { - fn insert_and_register_new(&mut self, poll: &mut Poll, connection_creator: F) - where - F: FnOnce(Token) -> Connection, - { - self.token_counter.0 = self.token_counter.0.wrapping_add(1); - - // Don't assign LISTENER_TOKEN or CHANNEL_TOKEN - if self.token_counter.0 < 2 { - self.token_counter.0 = 2; - } - - let token = self.token_counter; - - // Remove, deregister and close any existing connection with this token. - // This shouldn't happen in practice. - if let Some(connection) = self.connections.remove(&token) { - ::log::warn!( - "removing existing connection {} because of token reuse", - token.0 - ); - - connection.deregister(poll).close(); - } - - let connection = connection_creator(token); - - self.insert_and_register(poll, token, connection); - } - - fn insert_and_register( - &mut self, - poll: &mut Poll, - key: Token, - conn: Connection, - ) { - self.connections.insert(key, conn.register(poll, key)); - } - - fn remove_and_deregister( - &mut self, - poll: &mut Poll, - key: &Token, - ) -> Option> { - if let Some(connection) = self.connections.remove(key) { - Some(connection.deregister(poll)) - } else { - None - } - } - - fn get_mut(&mut self, key: &Token) -> Option<&mut Connection> { - self.connections.get_mut(key) - } - - /// Close and remove inactive connections - fn clean(mut self, poll: &mut Poll) -> Self { - let now = Instant::now(); - - let mut retained_connections = HashMap::default(); - - for (token, connection) in self.connections.drain() { - if connection.valid_until.0 < now { - connection.deregister(poll).close(); - } else { - retained_connections.insert(token, connection); - } - } - - ConnectionMap { - connections: retained_connections, - ..self - } - } -} - -pub fn run_socket_worker( - config: Config, - state: State, - socket_worker_index: usize, - socket_worker_statuses: SocketWorkerStatuses, - poll: Poll, - in_message_sender: InMessageSender, - out_message_receiver: OutMessageReceiver, - tls_config: Arc, -) { - match create_listener(&config) { - Ok(listener) => { - socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(())); - - run_poll_loop( - config, - &state, - socket_worker_index, - poll, - in_message_sender, - out_message_receiver, - listener, - tls_config, - ); - } - Err(err) => { - socket_worker_statuses.lock()[socket_worker_index] = - Some(Err(format!("Couldn't open socket: {:#}", err))); - } - } -} - -fn run_poll_loop( - config: Config, - state: &State, - socket_worker_index: usize, - mut poll: Poll, - in_message_sender: InMessageSender, - out_message_receiver: OutMessageReceiver, - listener: ::std::net::TcpListener, - tls_config: Arc, -) { - let poll_timeout = Duration::from_micros(config.network.poll_timeout_microseconds); - let ws_config = WebSocketConfig { - max_message_size: Some(config.network.websocket_max_message_size), - max_frame_size: Some(config.network.websocket_max_frame_size), - max_send_queue: Some(2), - ..Default::default() - }; - - let mut listener = TcpListener::from_std(listener); - let mut events = Events::with_capacity(config.network.poll_event_capacity); - - poll.registry() - .register(&mut listener, LISTENER_TOKEN, Interest::READABLE) - .unwrap(); - - let mut connections = ConnectionMap::default(); - let mut local_responses = Vec::new(); - - let mut iter_counter = 0usize; - - loop { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); - - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - for event in events.iter() { - let token = event.token(); - - match token { - LISTENER_TOKEN => { - accept_new_streams( - &tls_config, - ws_config, - socket_worker_index, - &mut listener, - &mut poll, - &mut connections, - valid_until, - ); - } - CHANNEL_TOKEN => { - write_or_queue_messages( - &mut poll, - out_message_receiver - .try_iter() - .take(out_message_receiver.len()), - &mut connections, - ); - } - token => { - if event.is_writable() { - let mut remove_connection = false; - - if let Some(connection) = connections.get_mut(&token) { - if let Err(err) = connection.write(&mut poll) { - ::log::debug!("Connection::write error: {}", err); - - remove_connection = true; - } - } - - if remove_connection { - if let Some(connection) = - connections.remove_and_deregister(&mut poll, &token) - { - connection.close(); - } - } - } - if event.is_readable() { - handle_stream_read_event( - &config, - state, - &mut local_responses, - &in_message_sender, - &mut poll, - &mut connections, - token, - valid_until, - ); - } - } - } - - write_or_queue_messages(&mut poll, local_responses.drain(..), &mut connections); - } - - // Remove inactive connections, but not every iteration - if iter_counter % 128 == 0 { - connections = connections.clean(&mut poll); - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - -fn accept_new_streams( - tls_config: &Arc, - ws_config: WebSocketConfig, - socket_worker_index: usize, - listener: &mut TcpListener, - poll: &mut Poll, - connections: &mut ConnectionMap, - valid_until: ValidUntil, -) { - loop { - match listener.accept() { - Ok((stream, _)) => { - let peer_addr = if let Ok(peer_addr) = stream.peer_addr() { - CanonicalSocketAddr::new(peer_addr) - } else { - continue; - }; - - connections.insert_and_register_new(poll, move |token| { - let meta = ConnectionMeta { - out_message_consumer_id: ConsumerId(socket_worker_index), - connection_id: ConnectionId(token.0), - peer_addr, - pending_scrape_id: None, // FIXME - }; - - Connection::new(tls_config.clone(), ws_config, stream, valid_until, meta) - }); - } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - break; - } - Err(err) => { - ::log::info!("error while accepting streams: {}", err); - } - } - } -} - -fn handle_stream_read_event( - config: &Config, - state: &State, - local_responses: &mut Vec<(ConnectionMeta, OutMessage)>, - in_message_sender: &InMessageSender, - poll: &mut Poll, - connections: &mut ConnectionMap, - token: Token, - valid_until: ValidUntil, -) { - let access_list_mode = config.access_list.mode; - - if let Some(mut connection) = connections.remove_and_deregister(poll, &token) { - let message_handler = &mut |meta, message| match message { - InMessage::AnnounceRequest(ref request) - if !state - .access_list - .allows(access_list_mode, &request.info_hash.0) => - { - let out_message = OutMessage::ErrorResponse(ErrorResponse { - failure_reason: "Info hash not allowed".into(), - action: Some(ErrorResponseAction::Announce), - info_hash: Some(request.info_hash), - }); - - local_responses.push((meta, out_message)); - } - in_message => { - if let Err(err) = in_message_sender.send((meta, in_message)) { - ::log::info!("InMessageSender: couldn't send message: {:?}", err); - } - } - }; - - connection.valid_until = valid_until; - - match connection.read(message_handler) { - Ok(connection) => { - connections.insert_and_register(poll, token, connection); - } - Err(_) => {} - } - } -} - -fn write_or_queue_messages(poll: &mut Poll, responses: I, connections: &mut ConnectionMap) -where - I: Iterator, -{ - for (meta, out_message) in responses { - let token = Token(meta.connection_id.0); - - let mut remove_connection = false; - - if let Some(connection) = connections.get_mut(&token) { - if connection.get_meta().peer_addr != meta.peer_addr { - ::log::warn!( - "socket worker error: connection socket addr {} didn't match channel {}. Token: {}.", - connection.get_meta().peer_addr.get(), - meta.peer_addr.get(), - token.0 - ); - - remove_connection = true; - } else { - match connection.write_or_queue_message(poll, out_message) { - Ok(()) => {} - Err(err) => { - ::log::debug!("Connection::write_or_queue_message error: {}", err); - - remove_connection = true; - } - } - } - } - - if remove_connection { - connections.remove_and_deregister(poll, &token); - } - } -} - -pub fn create_listener(config: &Config) -> ::anyhow::Result<::std::net::TcpListener> { - let builder = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)) - } else { - Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP)) - } - .context("Couldn't create socket2::Socket")?; - - if config.network.ipv6_only { - builder - .set_only_v6(true) - .context("Couldn't put socket in ipv6 only mode")? - } - - builder - .set_nonblocking(true) - .context("Couldn't put socket in non-blocking mode")?; - builder - .set_reuse_port(true) - .context("Couldn't put socket in reuse_port mode")?; - builder - .bind(&config.network.address.into()) - .with_context(|| format!("Couldn't bind socket to address {}", config.network.address))?; - builder - .listen(128) - .context("Couldn't listen for connections on socket")?; - - Ok(builder.into()) -} diff --git a/aquatic_ws/src/workers/mod.rs b/aquatic_ws/src/workers/mod.rs new file mode 100644 index 0000000..63fc0ec --- /dev/null +++ b/aquatic_ws/src/workers/mod.rs @@ -0,0 +1,2 @@ +pub mod request; +pub mod socket; diff --git a/aquatic_ws/src/common/handlers.rs b/aquatic_ws/src/workers/request.rs similarity index 59% rename from aquatic_ws/src/common/handlers.rs rename to aquatic_ws/src/workers/request.rs index 1d6ae0c..a5dd22f 100644 --- a/aquatic_ws/src/common/handlers.rs +++ b/aquatic_ws/src/workers/request.rs @@ -1,11 +1,130 @@ -use aquatic_common::extract_response_peers; -use hashbrown::HashMap; -use rand::rngs::SmallRng; +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; +use futures::StreamExt; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::enclose; +use glommio::prelude::*; +use glommio::timer::TimerActionRepeat; +use hashbrown::HashMap; +use rand::{rngs::SmallRng, SeedableRng}; + +use aquatic_common::extract_response_peers; use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; +use crate::SHARED_IN_CHANNEL_SIZE; + +pub async fn run_request_worker( + config: Config, + state: State, + in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, + out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, +) { + let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap(); + let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap(); + + let out_message_senders = Rc::new(out_message_senders); + + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let access_list = state.access_list; + + // Periodically clean torrents + TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { + enclose!((config, torrents, access_list) move || async move { + torrents.borrow_mut().clean(&config, &access_list); + + Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) + })() + })); + + let mut handles = Vec::new(); + + for (_, receiver) in in_message_receivers.streams() { + let handle = spawn_local(handle_request_stream( + config.clone(), + torrents.clone(), + out_message_senders.clone(), + receiver, + )) + .detach(); + + handles.push(handle); + } + + for handle in handles { + handle.await; + } +} + +async fn handle_request_stream( + config: Config, + torrents: Rc>, + out_message_senders: Rc>, + stream: S, +) where + S: futures_lite::Stream + ::std::marker::Unpin, +{ + let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); + + let max_peer_age = config.cleaning.max_peer_age; + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + + TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { + enclose!((peer_valid_until) move || async move { + *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + + Some(Duration::from_secs(1)) + })() + })); + + let config = &config; + let torrents = &torrents; + let peer_valid_until = &peer_valid_until; + let rng = &rng; + let out_message_senders = &out_message_senders; + + stream + .for_each_concurrent( + SHARED_IN_CHANNEL_SIZE, + move |(meta, in_message)| async move { + let mut out_messages = Vec::new(); + + match in_message { + InMessage::AnnounceRequest(request) => handle_announce_request( + &config, + &mut rng.borrow_mut(), + &mut torrents.borrow_mut(), + &mut out_messages, + peer_valid_until.borrow().to_owned(), + meta, + request, + ), + InMessage::ScrapeRequest(request) => handle_scrape_request( + &config, + &mut torrents.borrow_mut(), + &mut out_messages, + meta, + request, + ), + }; + + for (meta, out_message) in out_messages.drain(..) { + ::log::info!("request worker trying to send OutMessage to socket worker"); + + out_message_senders + .send_to(meta.out_message_consumer_id.0, (meta, out_message)) + .await + .expect("failed sending out_message to socket worker"); + + ::log::info!("request worker sent OutMessage to socket worker"); + } + }, + ) + .await; +} pub fn handle_announce_request( config: &Config, diff --git a/aquatic_ws/src/glommio/socket.rs b/aquatic_ws/src/workers/socket.rs similarity index 99% rename from aquatic_ws/src/glommio/socket.rs rename to aquatic_ws/src/workers/socket.rs index b9dc6fe..1de5d46 100644 --- a/aquatic_ws/src/glommio/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -29,8 +29,6 @@ use crate::config::Config; use crate::common::*; -use super::common::*; - const LOCAL_CHANNEL_SIZE: usize = 16; struct PendingScrapeResponse { From b4c07e5005182781caf5a8249b5adc7f49fcd3c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:18:13 +0100 Subject: [PATCH 2/9] README: update to account for removal of mio version of aquatic_ws --- README.md | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 5ef5e7b..11ee3d8 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ of sub-implementations for different protocols: [mio]: https://github.com/tokio-rs/mio [glommio]: https://github.com/DataDog/glommio -| Name | Protocol | OS requirements | -|--------------|--------------------------------------------|------------------------------------------------------------| -| aquatic_udp | [BitTorrent over UDP] | Unix-like | -| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ | -| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Unix-like with [mio] (default) / Linux 5.8+ with [glommio] | +| Name | Protocol | OS requirements | +|--------------|--------------------------------------------|------------------------------| +| aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) | +| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) | +| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) | ## Usage @@ -25,10 +25,9 @@ of sub-implementations for different protocols: - Install Rust with [rustup](https://rustup.rs/) (stable is recommended) - Install cmake with your package manager (e.g., `apt-get install cmake`) -- Unless you're planning to only run the cross-platform mio based - implementations, make sure locked memory limits are sufficient. - You can do this by adding the following lines to `/etc/security/limits.conf`, - and then logging out and back in: +- Unless you're planning to only run `aquatic_udp`, make sure locked memory + limits are sufficient. You can do this by adding the following lines to + `/etc/security/limits.conf`, and then logging out and back in: ``` * hard memlock 512 @@ -50,7 +49,6 @@ Compile the implementations that you are interested in: cargo build --release -p aquatic_udp cargo build --release -p aquatic_http cargo build --release -p aquatic_ws -cargo build --release -p aquatic_ws --features "with-glommio" --no-default-features ``` ### Running From fbd3ce17b7ce4a5445368a070f62f7c6fad8bc76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:21:18 +0100 Subject: [PATCH 3/9] Run cargo update --- Cargo.lock | 117 ++++++++++++++++++++++++++--------------------------- 1 file changed, 58 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46e4cf9..bd48ae4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,9 +39,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.54" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d" +checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" [[package]] name = "aquatic" @@ -341,9 +341,9 @@ dependencies = [ [[package]] name = "async-tungstenite" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3fe458e5f0c283bd6315a884d30f90b654c612b2c16d676730ba71f23cf160a" +checksum = "7922abeade7dd8948c20dfa1f85dc48cc952d2e0791f7c42b8b1cbb07a57129d" dependencies = [ "futures-io", "futures-util", @@ -604,9 +604,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa" +checksum = "fdbfe11fe19ff083c48923cf179540e8cd0535903dc35e178a1fdeeb59aef51f" dependencies = [ "cfg-if", "crossbeam-utils", @@ -625,10 +625,11 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.7" +version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00d6d2ea26e8b151d99093005cb442fb9a37aeaca582a03ec70946f49ab5ed9" +checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" dependencies = [ + "autocfg", "cfg-if", "crossbeam-utils", "lazy_static", @@ -638,9 +639,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dd435b205a4842da59efd07628f921c096bc1cc0a156835b4fa0bcb9a19bcce" +checksum = "1f25d8400f4a7a5778f0e4e52384a48cbd9b5c495d110786187fc750075277a2" dependencies = [ "cfg-if", "crossbeam-utils", @@ -648,9 +649,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" dependencies = [ "cfg-if", "lazy_static", @@ -789,9 +790,9 @@ dependencies = [ [[package]] name = "flume" -version = "0.10.11" +version = "0.10.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4" +checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" dependencies = [ "futures-core", "futures-sink", @@ -943,14 +944,14 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" +checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -1013,11 +1014,11 @@ checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" [[package]] name = "halfbrown" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed39577259d319b81a15176a32673271be2786cb463889703c58c90fe83c825" +checksum = "49e26621a30b9fdb4f949b9c6a7fa42ce88112851c33ac4ca00bfa7848d26fb4" dependencies = [ - "hashbrown 0.11.2", + "hashbrown 0.12.0", "serde", ] @@ -1027,15 +1028,6 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.12.0" @@ -1193,9 +1185,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.118" +version = "0.2.120" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e509672465a0504304aa87f9f176f2b2b716ed8fb105ebe5c02dc6dce96a94" +checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09" [[package]] name = "libm" @@ -1302,14 +1294,15 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" dependencies = [ "libc", "log", "miow", "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", "winapi 0.3.9", ] @@ -1324,9 +1317,9 @@ dependencies = [ [[package]] name = "nanorand" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" dependencies = [ "getrandom", ] @@ -1438,9 +1431,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "oorandom" @@ -1581,9 +1574,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" +checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" dependencies = [ "proc-macro2", ] @@ -1655,9 +1648,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -1775,9 +1768,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" +checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" [[package]] name = "serde" @@ -1919,9 +1912,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "smartstring" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31aa6a31c0c2b21327ce875f7e8952322acfcfd0c27569a6e18a647281352c9b" +checksum = "e714dff2b33f2321fdcd475b71cec79781a692d846f37f415fb395a1d2bcd48e" dependencies = [ "static_assertions", ] @@ -1970,9 +1963,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "syn" -version = "1.0.86" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" +checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" dependencies = [ "proc-macro2", "quote", @@ -1993,9 +1986,9 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" dependencies = [ "winapi-util", ] @@ -2085,9 +2078,9 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" dependencies = [ "cfg-if", "pin-project-lite", @@ -2097,9 +2090,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716" +checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b" dependencies = [ "proc-macro2", "quote", @@ -2108,18 +2101,18 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23" +checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" dependencies = [ "lazy_static", ] [[package]] name = "tungstenite" -version = "0.17.1" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a5198d211a468fa9573edf4919aa88a17515723c766b3a6b3a10536eb7e1ee0" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" dependencies = [ "base64", "byteorder", @@ -2199,9 +2192,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "value-trait" -version = "0.2.9" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0393efdd7d82f856a927b0fcafa80bca45911f5c89ef6b9d80197bebc284f72e" +checksum = "0fe40a74a6f052b10668ef021c8c3ae56ab38269f9c0f401daa6ed36f96662fd" dependencies = [ "float-cmp", "halfbrown", @@ -2238,6 +2231,12 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.79" From cb563ee37e9d269d28b5b82b79bb92559e752961 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:29:08 +0100 Subject: [PATCH 4/9] Adjust scripts/run-aquatic-ws.sh now that there is no mio version --- scripts/run-aquatic-ws.sh | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 5aadff6..40be253 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -1,14 +1,5 @@ -#!/bin/bash +#!/bin/sh . ./scripts/env-native-cpu-without-avx-512 -if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then - echo "Usage: $0 [mio|glommio] [ARGS]" -else - if [ "$1" = "mio" ]; then - cargo run --release --bin aquatic_ws -- "${@:2}" - else - cargo run --release --features "with-glommio" --no-default-features --bin aquatic_ws -- "${@:2}" - fi -fi - +cargo run --release --bin aquatic_ws -- $@ From 50be6acd221d76365fc8516f39ad63cbbd3c70dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:34:40 +0100 Subject: [PATCH 5/9] Fix CI workflow --- .github/workflows/cargo-build-and-test.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/cargo-build-and-test.yml b/.github/workflows/cargo-build-and-test.yml index c1cd5a4..ca49e4e 100644 --- a/.github/workflows/cargo-build-and-test.yml +++ b/.github/workflows/cargo-build-and-test.yml @@ -20,11 +20,8 @@ jobs: - name: Build run: | cargo build --verbose -p aquatic_udp --features "cpu-pinning" - cargo build --verbose -p aquatic_http --features "cpu-pinning" - cargo build --verbose -p aquatic_ws --features "cpu-pinning" - cargo build --verbose -p aquatic_ws --features "with-glommio cpu-pinning" --no-default-features - name: Run tests run: cargo test --verbose --workspace --all-targets @@ -35,6 +32,4 @@ jobs: steps: - uses: actions/checkout@v2 - name: Build - run: | - cargo build --verbose -p aquatic_udp - cargo build --verbose -p aquatic_ws + run: cargo build --verbose -p aquatic_udp From 193ad1689fc53cc3f307f4869bbe3f7090e9677f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:45:07 +0100 Subject: [PATCH 6/9] ws: move code from common.rs into lib.rs and request.rs --- aquatic_ws/src/common.rs | 148 +----------------------------- aquatic_ws/src/lib.rs | 57 +++++++++--- aquatic_ws/src/workers/request.rs | 114 ++++++++++++++++++++++- 3 files changed, 158 insertions(+), 161 deletions(-) diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index f7ced95..2d2f834 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -1,17 +1,10 @@ -use std::fs::File; -use std::io::BufReader; use std::sync::Arc; -use std::time::Instant; -use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; +use aquatic_common::access_list::AccessListArcSwap; +use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; -use aquatic_ws_protocol::*; - -use crate::config::Config; - pub type TlsConfig = futures_rustls::rustls::ServerConfig; #[derive(Default, Clone)] @@ -37,140 +30,3 @@ pub struct ConnectionMeta { pub peer_addr: CanonicalSocketAddr, pub pending_scrape_id: Option, } - -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum PeerStatus { - Seeding, - Leeching, - Stopped, -} - -impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { - if let AnnounceEvent::Stopped = event { - Self::Stopped - } else if let Some(0) = opt_bytes_left { - Self::Seeding - } else { - Self::Leeching - } - } -} - -#[derive(Clone, Copy)] -pub struct Peer { - pub connection_meta: ConnectionMeta, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -pub type PeerMap = AHashIndexMap; - -pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -pub type TorrentMap = AHashIndexMap; - -#[derive(Default)] -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let mut access_list_cache = create_access_list_cache(access_list); - - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); - } - - fn clean_torrent_map( - config: &Config, - access_list_cache: &mut AccessListCache, - torrent_map: &mut TorrentMap, - ) { - let now = Instant::now(); - - torrent_map.retain(|info_hash, torrent_data| { - if !access_list_cache - .load() - .allows(config.access_list.mode, &info_hash.0) - { - return false; - } - - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); - } -} - -pub fn create_tls_config(config: &Config) -> anyhow::Result { - let certs = { - let f = File::open(&config.network.tls_certificate_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::certs(&mut f)? - .into_iter() - .map(|bytes| rustls::Certificate(bytes)) - .collect() - }; - - let private_key = { - let f = File::open(&config.network.tls_private_key_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::pkcs8_private_keys(&mut f)? - .first() - .map(|bytes| rustls::PrivateKey(bytes.clone())) - .ok_or(anyhow::anyhow!("No private keys in file"))? - }; - - let tls_config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, private_key)?; - - Ok(tls_config) -} diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index a9a496c..0e8cd38 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -1,20 +1,22 @@ -use aquatic_common::access_list::update_access_list; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use common::State; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; - -use std::sync::{atomic::AtomicUsize, Arc}; - -use crate::{common::create_tls_config, config::Config}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; - -use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; - pub mod common; pub mod config; pub mod workers; +use std::fs::File; +use std::io::BufReader; +use std::sync::{atomic::AtomicUsize, Arc}; + +use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; + +use aquatic_common::access_list::update_access_list; +#[cfg(feature = "cpu-pinning")] +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; +use aquatic_common::privileges::drop_privileges_after_socket_binding; + +use common::*; +use config::Config; + pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; @@ -147,3 +149,32 @@ pub fn run_workers(config: Config, state: State) -> anyhow::Result<()> { Ok(()) } + +pub fn create_tls_config(config: &Config) -> anyhow::Result { + let certs = { + let f = File::open(&config.network.tls_certificate_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::certs(&mut f)? + .into_iter() + .map(|bytes| rustls::Certificate(bytes)) + .collect() + }; + + let private_key = { + let f = File::open(&config.network.tls_private_key_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::pkcs8_private_keys(&mut f)? + .first() + .map(|bytes| rustls::PrivateKey(bytes.clone())) + .ok_or(anyhow::anyhow!("No private keys in file"))? + }; + + let tls_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, private_key)?; + + Ok(tls_config) +} diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/request.rs index a5dd22f..6cf2ca0 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/request.rs @@ -1,7 +1,9 @@ use std::cell::RefCell; use std::rc::Rc; -use std::time::Duration; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::enclose; @@ -10,13 +12,121 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::extract_response_peers; +use aquatic_common::{extract_response_peers, AHashIndexMap}; use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; use crate::SHARED_IN_CHANNEL_SIZE; +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if let Some(0) = opt_bytes_left { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Clone, Copy)] +pub struct Peer { + pub connection_meta: ConnectionMeta, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +pub type PeerMap = AHashIndexMap; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + #[inline] + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = AHashIndexMap; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let mut access_list_cache = create_access_list_cache(access_list); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + } + + fn clean_torrent_map( + config: &Config, + access_list_cache: &mut AccessListCache, + torrent_map: &mut TorrentMap, + ) { + let now = Instant::now(); + + torrent_map.retain(|info_hash, torrent_data| { + if !access_list_cache + .load() + .allows(config.access_list.mode, &info_hash.0) + { + return false; + } + + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + pub async fn run_request_worker( config: Config, state: State, From 66232df6d2253449ce7510cf4e4228573dfb9641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:47:14 +0100 Subject: [PATCH 7/9] ws: remove pub visibility where not needed --- aquatic_ws/src/lib.rs | 4 ++-- aquatic_ws/src/workers/request.rs | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 0e8cd38..4b6c9ad 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -54,7 +54,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } -pub fn run_workers(config: Config, state: State) -> anyhow::Result<()> { +fn run_workers(config: Config, state: State) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); @@ -150,7 +150,7 @@ pub fn run_workers(config: Config, state: State) -> anyhow::Result<()> { Ok(()) } -pub fn create_tls_config(config: &Config) -> anyhow::Result { +fn create_tls_config(config: &Config) -> anyhow::Result { let certs = { let f = File::open(&config.network.tls_certificate_path)?; let mut f = BufReader::new(f); diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/request.rs index 6cf2ca0..c9adc36 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/request.rs @@ -20,7 +20,7 @@ use crate::config::Config; use crate::SHARED_IN_CHANNEL_SIZE; #[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum PeerStatus { +enum PeerStatus { Seeding, Leeching, Stopped, @@ -31,7 +31,7 @@ impl PeerStatus { /// /// Likely, the last branch will be taken most of the time. #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { if let AnnounceEvent::Stopped = event { Self::Stopped } else if let Some(0) = opt_bytes_left { @@ -43,15 +43,15 @@ impl PeerStatus { } #[derive(Clone, Copy)] -pub struct Peer { +struct Peer { pub connection_meta: ConnectionMeta, pub status: PeerStatus, pub valid_until: ValidUntil, } -pub type PeerMap = AHashIndexMap; +type PeerMap = AHashIndexMap; -pub struct TorrentData { +struct TorrentData { pub peers: PeerMap, pub num_seeders: usize, pub num_leechers: usize, @@ -68,16 +68,16 @@ impl Default for TorrentData { } } -pub type TorrentMap = AHashIndexMap; +type TorrentMap = AHashIndexMap; #[derive(Default)] -pub struct TorrentMaps { +struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + fn clean(&mut self, config: &Config, access_list: &Arc) { let mut access_list_cache = create_access_list_cache(access_list); Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); @@ -236,7 +236,7 @@ async fn handle_request_stream( .await; } -pub fn handle_announce_request( +fn handle_announce_request( config: &Config, rng: &mut SmallRng, torrent_maps: &mut TorrentMaps, @@ -374,7 +374,7 @@ pub fn handle_announce_request( out_messages.push((request_sender_meta, out_message)); } -pub fn handle_scrape_request( +fn handle_scrape_request( config: &Config, torrent_maps: &mut TorrentMaps, out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, From 688d68105bc11ab607d7931955f11311de8c9608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:51:20 +0100 Subject: [PATCH 8/9] http, http_protocol: upgrade smartstring from 0.2 to 1.0 --- Cargo.lock | 4 ++-- aquatic_http/Cargo.toml | 2 +- aquatic_http_protocol/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd48ae4..ef96f4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1912,9 +1912,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "smartstring" -version = "0.2.10" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e714dff2b33f2321fdcd475b71cec79781a692d846f37f415fb395a1d2bcd48e" +checksum = "ea958ad90cacc8ece7f238fde3671e1b350ee1741964edf2a22fd16f60224163" dependencies = [ "static_assertions", ] diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 9bdae8c..8566590 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -39,7 +39,7 @@ rustls-pemfile = "0.3" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" -smartstring = "0.2" +smartstring = "1" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_http_protocol/Cargo.toml b/aquatic_http_protocol/Cargo.toml index ead5436..9bda1de 100644 --- a/aquatic_http_protocol/Cargo.toml +++ b/aquatic_http_protocol/Cargo.toml @@ -31,7 +31,7 @@ memchr = "2" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } serde_bencode = "0.2" -smartstring = "0.2" +smartstring = "1" urlencoding = "2" [dev-dependencies] From 07cc575e39988af00592d05f1afb1b91388423dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:51:56 +0100 Subject: [PATCH 9/9] ws: sort dependencies alphabetically --- aquatic_ws/Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index ea08336..4f1222a 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -23,8 +23,13 @@ aquatic_toml_config = "0.1.0" aquatic_ws_protocol = "0.1.0" anyhow = "1" +async-tungstenite = "0.17" cfg-if = "1" either = "1" +futures = "0.3" +futures-lite = "1" +futures-rustls = "0.22" +glommio = "0.7" hashbrown = { version = "0.12", features = ["serde"] } log = "0.4" mimalloc = { version = "0.1", default-features = false } @@ -36,11 +41,6 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" tungstenite = "0.17" -async-tungstenite = "0.17" -futures-lite = "1" -futures = "0.3" -futures-rustls = "0.22" -glommio = "0.7" [dev-dependencies] quickcheck = "1"