From d922e5e6802e6aec13a6b81624fa0e841f544ff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 20:32:13 +0100 Subject: [PATCH 1/5] aquatic_udp: mio: update access list on SIGHUP instead of regularly --- Cargo.lock | 21 +++++++ aquatic_common/Cargo.toml | 1 + aquatic_common/src/access_list.rs | 24 ++++++-- aquatic_udp/Cargo.toml | 1 + aquatic_udp/src/lib/mio/mod.rs | 91 +++++++++++++++++-------------- 5 files changed, 92 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0735770..1f2308c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ dependencies = [ "hashbrown 0.11.2", "hex", "indexmap", + "log", "privdrop", "rand", "serde", @@ -181,6 +182,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", + "signal-hook", "socket2 0.4.2", ] @@ -1754,6 +1756,25 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "simd-json" version = "0.4.8" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index e86e0d0..3f34d1f 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -16,6 +16,7 @@ arc-swap = "1" hashbrown = "0.11.2" hex = "0.4" indexmap = "1" +log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 69f856c..a2c3f31 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -75,17 +75,31 @@ impl AccessList { } pub trait AccessListQuery { - fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>; + fn update(&self, config: &AccessListConfig); fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; } pub type AccessListArcSwap = ArcSwap; impl AccessListQuery for AccessListArcSwap { - fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { - self.store(Arc::new(AccessList::create_from_path(path)?)); - - Ok(()) + fn update(&self, config: &AccessListConfig) { + match config.mode { + AccessListMode::White | AccessListMode::Black => { + match AccessList::create_from_path(&config.path) { + Ok(new) => { + self.store(Arc::new(new)); + } + Err(err) => { + ::log::error!("Updating access list failed: {:?}", err); + } + } + } + AccessListMode::Off => { + ::log::error!( + "AccessListQuery::update_from_path called, but AccessListMode is Off" + ); + } + } } fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a7bf538..e418054 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -34,6 +34,7 @@ mimalloc = { version = "0.1", default-features = false } parking_lot = "0.11" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } # mio crossbeam-channel = { version = "0.5", optional = true } diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 1b8e656..b247c8b 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -9,15 +9,17 @@ use anyhow::Context; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; +use aquatic_common::access_list::AccessListQuery; +use signal_hook::consts::SIGHUP; +use signal_hook::iterator::Signals; + +use crate::config::Config; + pub mod common; pub mod handlers; pub mod network; pub mod tasks; -use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; - -use crate::config::Config; - use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { @@ -27,38 +29,42 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { }); } + let mut signals = Signals::new(::std::iter::once(SIGHUP))?; + let state = State::default(); - update_access_list(&config, &state.access_list); + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGHUP => { + ::log::info!("Updating access list"); + + state.access_list.update(&config.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } + + state.access_list.update(&config.access_list); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - update_access_list(&config, &state.access_list); - - state - .torrents - .lock() - .clean(&config, state.access_list.load_full().deref()); - } -} - -pub fn start_workers( - config: Config, - state: State, - num_bound_sockets: Arc, -) -> ::anyhow::Result<()> { let (request_sender, request_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded(); @@ -132,16 +138,19 @@ pub fn start_workers( .with_context(|| "spawn statistics worker")?; } - Ok(()) -} + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + state + .torrents + .lock() + .clean(&config, state.access_list.load_full().deref()); } } From 78d29770f38976b2765173b95397984217436f31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 20:33:14 +0100 Subject: [PATCH 2/5] aquatic_ws, aquatic_ws_load_test: cargo fmt --- aquatic_ws/src/lib/network.rs | 7 +++---- aquatic_ws_load_test/src/main.rs | 1 - aquatic_ws_load_test/src/network.rs | 20 +++++++++++++------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index fdbb561..d0e9679 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -17,7 +17,7 @@ use futures_lite::StreamExt; use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::channels::local_channel::{LocalReceiver, LocalSender, new_unbounded}; +use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; use glommio::timer::TimerActionRepeat; @@ -95,8 +95,7 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - let (out_message_sender, out_message_receiver) = - new_unbounded(); + let (out_message_sender, out_message_receiver) = new_unbounded(); let out_message_sender = Rc::new(out_message_sender); let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference { @@ -160,7 +159,7 @@ async fn receive_out_messages( .get(channel_out_message.0.connection_id.0) { match reference.out_message_sender.try_send(channel_out_message) { - Ok(()) | Err(GlommioError::Closed(_)) => {}, + Ok(()) | Err(GlommioError::Closed(_)) => {} Err(err) => { ::log::error!( "Couldn't send out_message from shared channel to local receiver: {:?}", diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 241a136..54f9208 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -97,7 +97,6 @@ fn create_tls_config() -> anyhow::Result> { Ok(Arc::new(config)) } - fn monitor_statistics(state: LoadTestState, config: &Config) { let start_time = Instant::now(); let mut report_avg_response_vec: Vec = Vec::new(); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 267f077..b0f78ce 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -7,12 +7,12 @@ use std::{ }; use aquatic_ws_protocol::{InMessage, JsonValue, OfferId, OutMessage, PeerId}; -use async_tungstenite::{WebSocketStream, client_async}; -use futures::{StreamExt, SinkExt}; -use futures_rustls::{TlsConnector, client::TlsStream}; +use async_tungstenite::{client_async, WebSocketStream}; +use futures::{SinkExt, StreamExt}; +use futures_rustls::{client::TlsStream, TlsConnector}; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; -use rand::{Rng, SeedableRng, prelude::SmallRng}; +use rand::{prelude::SmallRng, Rng, SeedableRng}; use crate::{common::LoadTestState, config::Config, utils::create_random_request}; @@ -80,7 +80,9 @@ impl Connection { let stream = TcpStream::connect(config.server_address) .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; - let stream = TlsConnector::from(tls_config).connect("example.com".try_into().unwrap(), stream).await?; + let stream = TlsConnector::from(tls_config) + .connect("example.com".try_into().unwrap(), stream) + .await?; let request = format!( "ws://{}:{}", config.server_address.ip(), @@ -114,8 +116,12 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { if self.can_send { - let request = - create_random_request(&self.config, &self.load_test_state, &mut self.rng, self.peer_id); + let request = create_random_request( + &self.config, + &self.load_test_state, + &mut self.rng, + self.peer_id, + ); // If self.send_answer is set and request is announce request, make // the request an offer answer From 9b75b5080289ef1ee951ea0c758f285842e47e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 21:06:57 +0100 Subject: [PATCH 3/5] aquatic_udp: mio: access list: use SIGUSR1, improve errors --- aquatic_common/src/access_list.rs | 31 +++++++++++------------------ aquatic_udp/src/lib/mio/mod.rs | 33 ++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index a2c3f31..f71f967 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -3,6 +3,7 @@ use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::sync::Arc; +use anyhow::Context; use arc_swap::ArcSwap; use hashbrown::HashSet; use serde::{Deserialize, Serialize}; @@ -59,7 +60,11 @@ impl AccessList { let mut new_list = Self::default(); for line in reader.lines() { - new_list.insert_from_line(&line?)?; + let line = line?; + + new_list + .insert_from_line(&line) + .with_context(|| format!("Invalid line in access list: {}", line))?; } Ok(new_list) @@ -75,31 +80,17 @@ impl AccessList { } pub trait AccessListQuery { - fn update(&self, config: &AccessListConfig); + fn update(&self, config: &AccessListConfig) -> anyhow::Result<()>; fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; } pub type AccessListArcSwap = ArcSwap; impl AccessListQuery for AccessListArcSwap { - fn update(&self, config: &AccessListConfig) { - match config.mode { - AccessListMode::White | AccessListMode::Black => { - match AccessList::create_from_path(&config.path) { - Ok(new) => { - self.store(Arc::new(new)); - } - Err(err) => { - ::log::error!("Updating access list failed: {:?}", err); - } - } - } - AccessListMode::Off => { - ::log::error!( - "AccessListQuery::update_from_path called, but AccessListMode is Off" - ); - } - } + fn update(&self, config: &AccessListConfig) -> anyhow::Result<()> { + self.store(Arc::new(AccessList::create_from_path(&config.path)?)); + + Ok(()) } fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index b247c8b..4104d47 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -10,7 +10,7 @@ use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; use aquatic_common::access_list::AccessListQuery; -use signal_hook::consts::SIGHUP; +use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; use crate::config::Config; @@ -29,10 +29,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { }); } - let mut signals = Signals::new(::std::iter::once(SIGHUP))?; - let state = State::default(); + update_access_list(&config, &state)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + { let config = config.clone(); let state = state.clone(); @@ -42,10 +44,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { for signal in &mut signals { match signal { - SIGHUP => { - ::log::info!("Updating access list"); - - state.access_list.update(&config.access_list); + SIGUSR1 => { + let _ = update_access_list(&config, &state); } _ => unreachable!(), } @@ -61,8 +61,6 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { }); } - state.access_list.update(&config.access_list); - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let (request_sender, request_receiver) = unbounded(); @@ -154,3 +152,20 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { .clean(&config, state.access_list.load_full().deref()); } } + +fn update_access_list(config: &Config, state: &State) -> anyhow::Result<()> { + if config.access_list.mode.is_on() { + match state.access_list.update(&config.access_list) { + Ok(()) => { + ::log::info!("Access list updated") + } + Err(err) => { + ::log::error!("Updating access list failed: {:#}", err); + + return Err(err); + } + } + } + + Ok(()) +} From c265b6ae7f2e3123512780c09ef4fa166d2d1421 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 21:09:32 +0100 Subject: [PATCH 4/5] Update TODO --- TODO.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/TODO.md b/TODO.md index e6b5ca3..a4b2b1c 100644 --- a/TODO.md +++ b/TODO.md @@ -3,6 +3,8 @@ * readme * document privilige dropping, cpu pinning +* socket_recv_size and ipv6_only in glommio implementations + * config: fail on unrecognized keys * access lists: @@ -13,14 +15,12 @@ * aquatic_udp * CI for both implementations * glommio - * ipv6 only flag * consider sending local responses immediately * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) containing TransactionId and BTreeMap, and same for response * aquatic_http: - * ipv6 only flag * optimize? * get_peer_addr only once (takes 1.2% of runtime) * queue response: allocating takes 2.8% of runtime @@ -37,7 +37,6 @@ Relevant for mio implementation too. * aquatic_ws - * ipv6 only flag * load test cpu pinning * test with multiple socket and request workers * should it send back error on message parse error, or does that From f7b7005e9a92d79632da8594823a511a31630287 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 21:18:12 +0100 Subject: [PATCH 5/5] Update README --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 97f7b28..ef7075e 100644 --- a/README.md +++ b/README.md @@ -171,8 +171,8 @@ paths in the configuration file, e.g.: ```toml [network] address = '0.0.0.0:3000' -tls_certificate_path = './cert.crt' -tls_private_key_path = './key.pk8' +tls_certificate_path = './cert.pem' +tls_private_key_path = './key.pem' ``` ### aquatic_ws: WebTorrent tracker