diff --git a/Cargo.lock b/Cargo.lock index 0735770..1f2308c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ dependencies = [ "hashbrown 0.11.2", "hex", "indexmap", + "log", "privdrop", "rand", "serde", @@ -181,6 +182,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", + "signal-hook", "socket2 0.4.2", ] @@ -1754,6 +1756,25 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "simd-json" version = "0.4.8" diff --git a/README.md b/README.md index 97f7b28..ef7075e 100644 --- a/README.md +++ b/README.md @@ -171,8 +171,8 @@ paths in the configuration file, e.g.: ```toml [network] address = '0.0.0.0:3000' -tls_certificate_path = './cert.crt' -tls_private_key_path = './key.pk8' +tls_certificate_path = './cert.pem' +tls_private_key_path = './key.pem' ``` ### aquatic_ws: WebTorrent tracker diff --git a/TODO.md b/TODO.md index e6b5ca3..a4b2b1c 100644 --- a/TODO.md +++ b/TODO.md @@ -3,6 +3,8 @@ * readme * document privilige dropping, cpu pinning +* socket_recv_size and ipv6_only in glommio implementations + * config: fail on unrecognized keys * access lists: @@ -13,14 +15,12 @@ * aquatic_udp * CI for both implementations * glommio - * ipv6 only flag * consider sending local responses immediately * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) containing TransactionId and BTreeMap, and same for response * aquatic_http: - * ipv6 only flag * optimize? * get_peer_addr only once (takes 1.2% of runtime) * queue response: allocating takes 2.8% of runtime @@ -37,7 +37,6 @@ Relevant for mio implementation too. * aquatic_ws - * ipv6 only flag * load test cpu pinning * test with multiple socket and request workers * should it send back error on message parse error, or does that diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index e86e0d0..3f34d1f 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -16,6 +16,7 @@ arc-swap = "1" hashbrown = "0.11.2" hex = "0.4" indexmap = "1" +log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 69f856c..f71f967 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -3,6 +3,7 @@ use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::sync::Arc; +use anyhow::Context; use arc_swap::ArcSwap; use hashbrown::HashSet; use serde::{Deserialize, Serialize}; @@ -59,7 +60,11 @@ impl AccessList { let mut new_list = Self::default(); for line in reader.lines() { - new_list.insert_from_line(&line?)?; + let line = line?; + + new_list + .insert_from_line(&line) + .with_context(|| format!("Invalid line in access list: {}", line))?; } Ok(new_list) @@ -75,15 +80,15 @@ impl AccessList { } pub trait AccessListQuery { - fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>; + fn update(&self, config: &AccessListConfig) -> anyhow::Result<()>; fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; } pub type AccessListArcSwap = ArcSwap; impl AccessListQuery for AccessListArcSwap { - fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { - self.store(Arc::new(AccessList::create_from_path(path)?)); + fn update(&self, config: &AccessListConfig) -> anyhow::Result<()> { + self.store(Arc::new(AccessList::create_from_path(&config.path)?)); Ok(()) } diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a7bf538..e418054 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -34,6 +34,7 @@ mimalloc = { version = "0.1", default-features = false } parking_lot = "0.11" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } # mio crossbeam-channel = { version = "0.5", optional = true } diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 1b8e656..4104d47 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -9,15 +9,17 @@ use anyhow::Context; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; +use aquatic_common::access_list::AccessListQuery; +use signal_hook::consts::SIGUSR1; +use signal_hook::iterator::Signals; + +use crate::config::Config; + pub mod common; pub mod handlers; pub mod network; pub mod tasks; -use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; - -use crate::config::Config; - use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { @@ -29,36 +31,38 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); - update_access_list(&config, &state.access_list); + update_access_list(&config, &state)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config, &state); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - update_access_list(&config, &state.access_list); - - state - .torrents - .lock() - .clean(&config, state.access_list.load_full().deref()); - } -} - -pub fn start_workers( - config: Config, - state: State, - num_bound_sockets: Arc, -) -> ::anyhow::Result<()> { let (request_sender, request_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded(); @@ -132,16 +136,36 @@ pub fn start_workers( .with_context(|| "spawn statistics worker")?; } - Ok(()) -} + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + state + .torrents + .lock() + .clean(&config, state.access_list.load_full().deref()); } } + +fn update_access_list(config: &Config, state: &State) -> anyhow::Result<()> { + if config.access_list.mode.is_on() { + match state.access_list.update(&config.access_list) { + Ok(()) => { + ::log::info!("Access list updated") + } + Err(err) => { + ::log::error!("Updating access list failed: {:#}", err); + + return Err(err); + } + } + } + + Ok(()) +} diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index fdbb561..d0e9679 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -17,7 +17,7 @@ use futures_lite::StreamExt; use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::channels::local_channel::{LocalReceiver, LocalSender, new_unbounded}; +use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; use glommio::timer::TimerActionRepeat; @@ -95,8 +95,7 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - let (out_message_sender, out_message_receiver) = - new_unbounded(); + let (out_message_sender, out_message_receiver) = new_unbounded(); let out_message_sender = Rc::new(out_message_sender); let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference { @@ -160,7 +159,7 @@ async fn receive_out_messages( .get(channel_out_message.0.connection_id.0) { match reference.out_message_sender.try_send(channel_out_message) { - Ok(()) | Err(GlommioError::Closed(_)) => {}, + Ok(()) | Err(GlommioError::Closed(_)) => {} Err(err) => { ::log::error!( "Couldn't send out_message from shared channel to local receiver: {:?}", diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 241a136..54f9208 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -97,7 +97,6 @@ fn create_tls_config() -> anyhow::Result> { Ok(Arc::new(config)) } - fn monitor_statistics(state: LoadTestState, config: &Config) { let start_time = Instant::now(); let mut report_avg_response_vec: Vec = Vec::new(); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 267f077..b0f78ce 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -7,12 +7,12 @@ use std::{ }; use aquatic_ws_protocol::{InMessage, JsonValue, OfferId, OutMessage, PeerId}; -use async_tungstenite::{WebSocketStream, client_async}; -use futures::{StreamExt, SinkExt}; -use futures_rustls::{TlsConnector, client::TlsStream}; +use async_tungstenite::{client_async, WebSocketStream}; +use futures::{SinkExt, StreamExt}; +use futures_rustls::{client::TlsStream, TlsConnector}; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; -use rand::{Rng, SeedableRng, prelude::SmallRng}; +use rand::{prelude::SmallRng, Rng, SeedableRng}; use crate::{common::LoadTestState, config::Config, utils::create_random_request}; @@ -80,7 +80,9 @@ impl Connection { let stream = TcpStream::connect(config.server_address) .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; - let stream = TlsConnector::from(tls_config).connect("example.com".try_into().unwrap(), stream).await?; + let stream = TlsConnector::from(tls_config) + .connect("example.com".try_into().unwrap(), stream) + .await?; let request = format!( "ws://{}:{}", config.server_address.ip(), @@ -114,8 +116,12 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { if self.can_send { - let request = - create_random_request(&self.config, &self.load_test_state, &mut self.rng, self.peer_id); + let request = create_random_request( + &self.config, + &self.load_test_state, + &mut self.rng, + self.peer_id, + ); // If self.send_answer is set and request is announce request, make // the request an offer answer