diff --git a/Cargo.lock b/Cargo.lock index 4c33496..ef35171 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,26 +47,13 @@ checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" name = "aquatic" version = "0.2.0" dependencies = [ - "aquatic_cli_helpers", + "aquatic_common", "aquatic_http", "aquatic_udp", "aquatic_ws", "mimalloc", ] -[[package]] -name = "aquatic_cli_helpers" -version = "0.2.0" -dependencies = [ - "anyhow", - "aquatic_toml_config", - "git-testament", - "log", - "serde", - "simple_logger", - "toml", -] - [[package]] name = "aquatic_common" version = "0.2.0" @@ -76,6 +63,7 @@ dependencies = [ "aquatic_toml_config", "arc-swap", "duplicate", + "git-testament", "glommio", "hashbrown 0.12.0", "hex", @@ -88,6 +76,8 @@ dependencies = [ "rustls 0.20.4", "rustls-pemfile", "serde", + "simple_logger", + "toml", ] [[package]] @@ -95,7 +85,6 @@ name = "aquatic_http" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", @@ -127,7 +116,6 @@ name = "aquatic_http_load_test" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", @@ -149,7 +137,6 @@ name = "aquatic_http_private" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", @@ -163,6 +150,7 @@ dependencies = [ "rand", "rustls 0.20.4", "serde", + "signal-hook", "socket2 0.4.4", "sqlx", "tokio", @@ -216,7 +204,6 @@ name = "aquatic_udp" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", @@ -243,7 +230,6 @@ name = "aquatic_udp_bench" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_udp", @@ -262,7 +248,6 @@ name = "aquatic_udp_load_test" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", @@ -292,7 +277,6 @@ name = "aquatic_ws" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_ws_protocol", @@ -324,7 +308,6 @@ name = "aquatic_ws_load_test" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_ws_protocol", diff --git a/Cargo.toml b/Cargo.toml index 0ee9fc5..2092976 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "aquatic", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http", "aquatic_http_load_test", diff --git a/TODO.md b/TODO.md index aeb510a..ef68315 100644 --- a/TODO.md +++ b/TODO.md @@ -6,12 +6,19 @@ * Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead * stored procedure * test ip format + * check user token length * site will likely want num_seeders and num_leechers for all torrents.. ## Medium priority * rename request workers to swarm workers * quit whole program if any thread panics + * But it would be nice not to panic in workers, but to return errors instead. + Once JoinHandle::is_finished is available in stable Rust (#90470), an + option would be to + * Save JoinHandles + * When preparing to quit because of PanicSentinel sending SIGTERM, loop + through them, extract error and log it * config: fail on unrecognized keys? * Run cargo-deny in CI @@ -47,9 +54,6 @@ # Not important * aquatic_http: - * optimize? - * get_peer_addr only once (takes 1.2% of runtime) - * queue response: allocating takes 2.8% of runtime * consider better error type for request parsing, so that better error messages can be sent back (e.g., "full scrapes are not supported") * test torrent transfer with real clients @@ -58,15 +62,6 @@ positive number. * aquatic_ws - * mio - * shard torrent state. this could decrease dropped messages too, since - request handlers won't send large batches of them - * connection cleaning interval - * use access list cache - * use write event interest for handshakes too - * deregistering before closing is required by mio, but it hurts performance - * blocked on https://github.com/snapview/tungstenite-rs/issues/51 - * connection closing: send tls close message etc? * write new version of extract_response_peers which checks for equality with peer sending request??? diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index 2fbe263..c876fad 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" name = "aquatic" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_http = { version = "0.2.0", path = "../aquatic_http" } aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" } aquatic_ws = { version = "0.2.0", path = "../aquatic_ws" } diff --git a/aquatic/src/main.rs b/aquatic/src/main.rs index c320e41..97fba0f 100644 --- a/aquatic/src/main.rs +++ b/aquatic/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::{print_help, run_app_with_cli_and_config, Options}; +use aquatic_common::cli::{print_help, run_app_with_cli_and_config, Options}; use aquatic_http::config::Config as HttpConfig; use aquatic_udp::config::Config as UdpConfig; use aquatic_ws::config::Config as WsConfig; diff --git a/aquatic_cli_helpers/Cargo.toml b/aquatic_cli_helpers/Cargo.toml deleted file mode 100644 index 7e93b6c..0000000 --- a/aquatic_cli_helpers/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "aquatic_cli_helpers" -version = "0.2.0" -authors = ["Joakim Frostegård "] -edition = "2021" -license = "Apache-2.0" -description = "aquatic BitTorrent tracker CLI helpers" -repository = "https://github.com/greatest-ape/aquatic" -readme = "../README.md" - -[dependencies] -aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } - -anyhow = "1" -git-testament = "0.2" -log = "0.4" -serde = { version = "1", features = ["derive"] } -simple_logger = { version = "2", features = ["stderr"] } -toml = "0.5" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 1f6eb26..a487088 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -23,6 +23,7 @@ ahash = "0.7" anyhow = "1" arc-swap = "1" duplicate = "0.4" +git-testament = "0.2" hashbrown = "0.12" hex = "0.4" indexmap-amortized = "1" @@ -31,6 +32,8 @@ log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +simple_logger = { version = "2", features = ["stderr"] } +toml = "0.5" # Optional glommio = { version = "0.7", optional = true } diff --git a/aquatic_cli_helpers/src/lib.rs b/aquatic_common/src/cli.rs similarity index 99% rename from aquatic_cli_helpers/src/lib.rs rename to aquatic_common/src/cli.rs index 8455856..4763092 100644 --- a/aquatic_cli_helpers/src/lib.rs +++ b/aquatic_common/src/cli.rs @@ -22,7 +22,7 @@ pub enum LogLevel { impl Default for LogLevel { fn default() -> Self { - Self::Error + Self::Warn } } diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index b82d5bb..0e03e52 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -194,7 +194,9 @@ pub mod glommio { // 15 -> 14 and 15 // 14 -> 12 and 13 // 13 -> 10 and 11 - CpuPinningDirection::Descending => num_cpu_cores - 2 * (num_cpu_cores - core_index), + CpuPinningDirection::Descending => { + num_cpu_cores - 2 * (num_cpu_cores - core_index) + } }; get_cpu_set()? diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index b243cfd..6888649 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,10 +1,13 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::{Duration, Instant}; use ahash::RandomState; use rand::Rng; pub mod access_list; +pub mod cli; pub mod cpu_pinning; pub mod privileges; #[cfg(feature = "rustls-config")] @@ -30,6 +33,44 @@ impl ValidUntil { } } +pub struct PanicSentinelWatcher(Arc); + +impl PanicSentinelWatcher { + pub fn create_with_sentinel() -> (Self, PanicSentinel) { + let triggered = Arc::new(AtomicBool::new(false)); + let sentinel = PanicSentinel(triggered.clone()); + + (Self(triggered), sentinel) + } + + pub fn panic_was_triggered(&self) -> bool { + self.0.load(Ordering::SeqCst) + } +} + +/// Raises SIGTERM when dropped +/// +/// Pass to threads to have panics in them cause whole program to exit. +#[derive(Clone)] +pub struct PanicSentinel(Arc); + +impl Drop for PanicSentinel { + fn drop(&mut self) { + if ::std::thread::panicking() { + let already_triggered = self.0.fetch_or(true, Ordering::SeqCst); + + if !already_triggered { + if unsafe { libc::raise(15) } == -1 { + panic!( + "Could not raise SIGTERM: {:#}", + ::std::io::Error::last_os_error() + ) + } + } + } + } +} + /// Extract response peers /// /// If there are more peers in map than `max_num_peers_to_take`, do a diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index 0e4a627..1e18b9f 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -1,22 +1,23 @@ use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, + path::PathBuf, + sync::{Arc, Barrier}, }; -use aquatic_toml_config::TomlConfig; +use anyhow::Context; use privdrop::PrivDrop; use serde::Deserialize; +use aquatic_toml_config::TomlConfig; + #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default)] pub struct PrivilegeConfig { - /// Chroot and switch user after binding to sockets + /// Chroot and switch group and user after binding to sockets pub drop_privileges: bool, /// Chroot to this path - pub chroot_path: String, + pub chroot_path: PathBuf, + /// Group to switch to after chrooting + pub group: String, /// User to switch to after chrooting pub user: String, } @@ -25,41 +26,39 @@ impl Default for PrivilegeConfig { fn default() -> Self { Self { drop_privileges: false, - chroot_path: ".".to_string(), + chroot_path: ".".into(), user: "nobody".to_string(), + group: "nogroup".to_string(), } } } -pub fn drop_privileges_after_socket_binding( - config: &PrivilegeConfig, - num_bound_sockets: Arc, - target_num: usize, -) -> anyhow::Result<()> { - if config.drop_privileges { - let mut counter = 0usize; +#[derive(Clone)] +pub struct PrivilegeDropper { + barrier: Arc, + config: Arc, +} - loop { - let num_bound = num_bound_sockets.load(Ordering::SeqCst); +impl PrivilegeDropper { + pub fn new(config: PrivilegeConfig, num_sockets: usize) -> Self { + Self { + barrier: Arc::new(Barrier::new(num_sockets)), + config: Arc::new(config), + } + } - if num_bound == target_num { + pub fn after_socket_creation(self) -> anyhow::Result<()> { + if self.config.drop_privileges { + if self.barrier.wait().is_leader() { PrivDrop::default() - .chroot(config.chroot_path.clone()) - .user(config.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); + .chroot(self.config.chroot_path.clone()) + .group(self.config.group.clone()) + .user(self.config.user.clone()) + .apply() + .with_context(|| "couldn't drop privileges after socket creation")?; } } - } - Ok(()) + Ok(()) + } } diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index b9b1152..5809cd7 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -16,7 +16,6 @@ name = "aquatic_http" name = "aquatic_http" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] } aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index c94cf1e..d3cceaa 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -1,10 +1,13 @@ use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig, cpu_pinning::asc::CpuPinningConfigAsc}; +use aquatic_common::{ + access_list::AccessListConfig, cpu_pinning::asc::CpuPinningConfigAsc, + privileges::PrivilegeConfig, +}; use aquatic_toml_config::TomlConfig; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; /// aquatic_http configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] @@ -42,7 +45,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 28ca996..f1b8ca7 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -4,13 +4,17 @@ use aquatic_common::{ glommio::{get_worker_placement, set_affinity_for_util_worker}, WorkerIndex, }, - privileges::drop_privileges_after_socket_binding, + privileges::PrivilegeDropper, rustls_config::create_rustls_config, + PanicSentinelWatcher, }; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; -use std::sync::{atomic::AtomicUsize, Arc}; +use signal_hook::{ + consts::{SIGTERM, SIGUSR1}, + iterator::Signals, +}; +use std::sync::Arc; use crate::config::Config; @@ -24,17 +28,91 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + let num_peers = config.socket_workers + config.request_workers; - { + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + + let tls_config = Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?); + + let mut executors = Vec::new(); + + for i in 0..(config.socket_workers) { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); + let tls_config = tls_config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let priv_dropper = priv_dropper.clone(); - ::std::thread::spawn(move || run_inner(config, state)); + let placement = get_worker_placement( + &config.cpu_pinning, + config.socket_workers, + config.request_workers, + WorkerIndex::SocketWorker(i), + )?; + let builder = LocalExecutorBuilder::new(placement).name("socket"); + + let executor = builder + .spawn(move || async move { + workers::socket::run_socket_worker( + sentinel, + config, + state, + tls_config, + request_mesh_builder, + response_mesh_builder, + priv_dropper, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + + executors.push(executor); + } + + for i in 0..(config.request_workers) { + let sentinel = sentinel.clone(); + let config = config.clone(); + let state = state.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + + let placement = get_worker_placement( + &config.cpu_pinning, + config.socket_workers, + config.request_workers, + WorkerIndex::RequestWorker(i), + )?; + let builder = LocalExecutorBuilder::new(placement).name("request"); + + let executor = builder + .spawn(move || async move { + workers::request::run_request_worker( + sentinel, + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + + executors.push(executor); } if config.cpu_pinning.active { @@ -50,107 +128,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); } + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } else { + return Ok(()); + } + } _ => unreachable!(), } } Ok(()) } - -pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - let num_peers = config.socket_workers + config.request_workers; - - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - let tls_config = Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?); - - let mut executors = Vec::new(); - - for i in 0..(config.socket_workers) { - let config = config.clone(); - let state = state.clone(); - let tls_config = tls_config.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - WorkerIndex::SocketWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name("socket"); - - let executor = builder.spawn(move || async move { - workers::socket::run_socket_worker( - config, - state, - tls_config, - request_mesh_builder, - response_mesh_builder, - num_bound_sockets, - ) - .await - }); - - executors.push(executor); - } - - for i in 0..(config.request_workers) { - let config = config.clone(); - let state = state.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - WorkerIndex::RequestWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name("request"); - - let executor = builder.spawn(move || async move { - workers::request::run_request_worker( - config, - state, - request_mesh_builder, - response_mesh_builder, - ) - .await - }); - - executors.push(executor); - } - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - )?; - } - - for executor in executors { - executor - .expect("failed to spawn local executor") - .join() - .unwrap(); - } - - Ok(()) -} diff --git a/aquatic_http/src/main.rs b/aquatic_http/src/main.rs index 0d4b626..7f40c2c 100644 --- a/aquatic_http/src/main.rs +++ b/aquatic_http/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_http::config::Config; #[global_allocator] diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/request.rs index 8ac677e..e664c4e 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/request.rs @@ -17,8 +17,8 @@ use rand::SeedableRng; use smartstring::{LazyCompact, SmartString}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::extract_response_peers; use aquatic_common::ValidUntil; +use aquatic_common::{extract_response_peers, PanicSentinel}; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::*; @@ -175,6 +175,7 @@ impl TorrentMaps { } pub async fn run_request_worker( + _sentinel: PanicSentinel, config: Config, state: State, request_mesh_builder: MeshBuilder, diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 3992551..4b7a7f6 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -2,13 +2,14 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::CanonicalSocketAddr; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ @@ -53,18 +54,18 @@ struct ConnectionReference { } pub async fn run_socket_worker( + _sentinel: PanicSentinel, config: Config, state: State, tls_config: Arc, request_mesh_builder: MeshBuilder, response_mesh_builder: MeshBuilder, - num_bound_sockets: Arc, + priv_dropper: PrivilegeDropper, ) { let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config); - num_bound_sockets.fetch_add(1, Ordering::SeqCst); + let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); let request_senders = Rc::new(request_senders); @@ -485,29 +486,37 @@ fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usi (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config) -> TcpListener { +fn create_tcp_listener( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { socket2::Domain::IPV6 }; - let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) - .expect("create socket"); + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).expect("socket: set reuse port"); + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; socket .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: bind to {}", config.network.address))?; socket .listen(config.network.tcp_backlog) - .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: listen on {}", config.network.address))?; - unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } + priv_dropper.after_socket_creation()?; + + Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 625a157..8403959 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -13,7 +13,6 @@ readme = "../README.md" name = "aquatic_http_load_test" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] } aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index d8906df..d104c41 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -26,7 +26,7 @@ pub struct Config { pub cpu_pinning: CpuPinningConfigDesc, } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index b7e7978..1955885 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -24,7 +24,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; const MBITS_FACTOR: f64 = 1.0 / ((1024.0 * 1024.0) / 8.0); pub fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( "aquatic_http_load_test: BitTorrent load tester", env!("CARGO_PKG_VERSION"), run, @@ -70,6 +70,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { )?; LocalExecutorBuilder::new(placement) + .name("load-test") .spawn(move || async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index 0d59f6a..7d400f2 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -14,7 +14,6 @@ name = "aquatic_http_private" name = "aquatic_http_private" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] } aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol", features = ["with-axum"] } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } @@ -30,6 +29,7 @@ mimalloc = { version = "0.1", default-features = false } rand = { version = "0.8", features = ["small_rng"] } rustls = "0.20" serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } socket2 = { version = "0.4", features = ["all"] } sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } tokio = { version = "1", features = ["full"] } diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index 094c15e..b800cc8 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -4,7 +4,7 @@ use aquatic_common::privileges::PrivilegeConfig; use aquatic_toml_config::TomlConfig; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; /// aquatic_http_private configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] @@ -18,6 +18,7 @@ pub struct Config { /// generate responses and send them back to the socket workers. pub request_workers: usize, pub worker_channel_size: usize, + /// Number of database connections to establish in each socket worker pub db_connections_per_worker: u32, pub log_level: LogLevel, pub network: NetworkConfig, @@ -32,7 +33,7 @@ impl Default for Config { socket_workers: 1, request_workers: 1, worker_channel_size: 128, - db_connections_per_worker: 1, + db_connections_per_worker: 4, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), @@ -42,7 +43,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index 8e9d7a7..88609e1 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -4,9 +4,12 @@ mod workers; use std::{collections::VecDeque, sync::Arc}; -use aquatic_common::rustls_config::create_rustls_config; +use aquatic_common::{ + privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher, +}; use common::ChannelRequestSender; use dotenv::dotenv; +use signal_hook::{consts::SIGTERM, iterator::Signals}; use tokio::sync::mpsc::channel; use config::Config; @@ -15,6 +18,8 @@ pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tr pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> anyhow::Result<()> { + let mut signals = Signals::new([SIGTERM])?; + dotenv().ok(); let tls_config = Arc::new(create_rustls_config( @@ -32,37 +37,58 @@ pub fn run(config: Config) -> anyhow::Result<()> { request_receivers.push_back(request_receiver); } + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let mut handles = Vec::new(); for _ in 0..config.socket_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let tls_config = tls_config.clone(); let request_sender = ChannelRequestSender::new(request_senders.clone()); + let priv_dropper = priv_dropper.clone(); let handle = ::std::thread::Builder::new() .name("socket".into()) .spawn(move || { - workers::socket::run_socket_worker(config, tls_config, request_sender) + workers::socket::run_socket_worker( + sentinel, + config, + tls_config, + request_sender, + priv_dropper, + ) })?; handles.push(handle); } for _ in 0..config.request_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let request_receiver = request_receivers.pop_front().unwrap(); let handle = ::std::thread::Builder::new() .name("request".into()) - .spawn(move || workers::request::run_request_worker(config, request_receiver))?; + .spawn(move || { + workers::request::run_request_worker(sentinel, config, request_receiver) + })?; handles.push(handle); } - for handle in handles { - handle - .join() - .map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??; + for signal in &mut signals { + match signal { + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } else { + return Ok(()); + } + } + _ => unreachable!(), + } } Ok(()) diff --git a/aquatic_http_private/src/main.rs b/aquatic_http_private/src/main.rs index c26aaeb..caf3cbc 100644 --- a/aquatic_http_private/src/main.rs +++ b/aquatic_http_private/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_http_private::config::Config; #[global_allocator] diff --git a/aquatic_http_private/src/workers/request/mod.rs b/aquatic_http_private/src/workers/request/mod.rs index 358ead6..c684256 100644 --- a/aquatic_http_private/src/workers/request/mod.rs +++ b/aquatic_http_private/src/workers/request/mod.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::Receiver; use tokio::task::LocalSet; use tokio::time; -use aquatic_common::{extract_response_peers, CanonicalSocketAddr, ValidUntil}; +use aquatic_common::{extract_response_peers, CanonicalSocketAddr, PanicSentinel, ValidUntil}; use aquatic_http_protocol::response::{ AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6, }; @@ -22,6 +22,7 @@ use crate::config::Config; use common::*; pub fn run_request_worker( + _sentinel: PanicSentinel, config: Config, request_receiver: Receiver, ) -> anyhow::Result<()> { diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs index 2b142c7..24e561b 100644 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::Context; -use aquatic_common::rustls_config::RustlsConfig; +use aquatic_common::{privileges::PrivilegeDropper, rustls_config::RustlsConfig, PanicSentinel}; use axum::{extract::connect_info::Connected, routing::get, Extension, Router}; use hyper::server::conn::AddrIncoming; use sqlx::mysql::MySqlPoolOptions; @@ -23,11 +23,13 @@ impl<'a> Connected<&'a tls::TlsStream> for SocketAddr { } pub fn run_socket_worker( + _sentinel: PanicSentinel, config: Config, tls_config: Arc, request_sender: ChannelRequestSender, + priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { - let tcp_listener = create_tcp_listener(config.network.address)?; + let tcp_listener = create_tcp_listener(config.network.address, priv_dropper)?; let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -71,7 +73,10 @@ async fn run_app( Ok(()) } -fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result { +fn create_tcp_listener( + addr: SocketAddr, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { let domain = if addr.is_ipv4() { socket2::Domain::IPV4 } else { @@ -93,5 +98,7 @@ fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result { .listen(1024) .with_context(|| format!("listen on {}", addr))?; + priv_dropper.after_socket_creation()?; + Ok(socket.into()) } diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 970b717..ed6686b 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -19,7 +19,6 @@ name = "aquatic_udp" cpu-pinning = ["aquatic_common/with-hwloc"] [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 01a0917..eb3f3d1 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf}; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_udp configuration @@ -58,7 +58,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index d56d743..db644b9 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -2,20 +2,20 @@ pub mod common; pub mod config; pub mod workers; +use aquatic_common::PanicSentinelWatcher; use config::Config; use std::collections::BTreeMap; -use std::sync::{atomic::AtomicUsize, Arc}; use std::thread::Builder; use anyhow::Context; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::{bounded, unbounded}; use aquatic_common::access_list::update_access_list; -use signal_hook::consts::SIGUSR1; +use signal_hook::consts::{SIGTERM, SIGUSR1}; use signal_hook::iterator::Signals; use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State}; @@ -30,9 +30,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let mut request_senders = Vec::new(); let mut request_receivers = BTreeMap::new(); @@ -63,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.request_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); @@ -80,6 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::request::run_request_worker( + sentinel, config, state, request_receiver, @@ -91,12 +94,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.socket_workers { + let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); let request_sender = ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone()); let response_receiver = response_receivers.remove(&i).unwrap(); - let num_bound_sockets = num_bound_sockets.clone(); + let priv_dropper = priv_dropper.clone(); Builder::new() .name(format!("socket-{:02}", i + 1)) @@ -110,23 +114,25 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( + sentinel, state, config, i, request_sender, response_receiver, - num_bound_sockets, + priv_dropper, ); }) .with_context(|| "spawn socket worker")?; } if config.statistics.active() { + let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); Builder::new() - .name("statistics-collector".to_string()) + .name("statistics".into()) .spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( @@ -136,18 +142,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker(config, state); + workers::statistics::run_statistics_worker(sentinel, config, state); }) .with_context(|| "spawn statistics worker")?; } - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -161,6 +160,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); } + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } + + break; + } _ => unreachable!(), } } diff --git a/aquatic_udp/src/main.rs b/aquatic_udp/src/main.rs index b6df27c..9ec75cc 100644 --- a/aquatic_udp/src/main.rs +++ b/aquatic_udp/src/main.rs @@ -2,7 +2,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( aquatic_udp::APP_NAME, aquatic_udp::APP_VERSION, aquatic_udp::run, diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 447fab4..12eb6b9 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -11,6 +11,7 @@ use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::AmortizedIndexMap; use aquatic_common::CanonicalSocketAddr; +use aquatic_common::PanicSentinel; use aquatic_common::ValidUntil; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; @@ -121,6 +122,7 @@ impl TorrentMaps { } pub fn run_request_worker( + _sentinel: PanicSentinel, config: Config, state: State, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 6bf3487..c8b5e05 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -1,12 +1,11 @@ use std::collections::BTreeMap; use std::io::{Cursor, ErrorKind}; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use std::vec::Drain; +use anyhow::Context; +use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -15,8 +14,8 @@ use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; -use aquatic_common::ValidUntil; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; +use aquatic_common::{PanicSentinel, ValidUntil}; use aquatic_udp_protocol::*; use socket2::{Domain, Protocol, Socket, Type}; @@ -152,17 +151,19 @@ impl PendingScrapeResponseSlab { } pub fn run_socket_worker( + _sentinel: PanicSentinel, state: State, config: Config, token_num: usize, request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - num_bound_sockets: Arc, + priv_dropper: PrivilegeDropper, ) { let mut rng = StdRng::from_entropy(); let mut buffer = [0u8; MAX_PACKET_SIZE]; - let mut socket = UdpSocket::from_std(create_socket(&config)); + let mut socket = + UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); let mut poll = Poll::new().expect("create poll"); let interests = Interest::READABLE; @@ -171,8 +172,6 @@ pub fn run_socket_worker( .register(&mut socket, Token(token_num), interests) .unwrap(); - num_bound_sockets.fetch_add(1, Ordering::SeqCst); - let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); @@ -520,27 +519,29 @@ fn send_response( } } -pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { +pub fn create_socket( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result<::std::net::UdpSocket> { let socket = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))? } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))? + }; if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).expect("socket: set reuse port"); + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; socket .set_nonblocking(true) - .expect("socket: set nonblocking"); - - socket - .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + .with_context(|| "socket: set nonblocking")?; let recv_buffer_size = config.network.socket_recv_buffer_size; @@ -554,7 +555,13 @@ pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { } } - socket.into() + socket + .bind(&config.network.address.into()) + .with_context(|| format!("socket: bind to {}", config.network.address))?; + + priv_dropper.after_socket_creation()?; + + Ok(socket.into()) } #[cfg(test)] diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index 20ec0e7..b54cbe8 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::Context; +use aquatic_common::PanicSentinel; use num_format::{Locale, ToFormattedString}; use serde::Serialize; use time::format_description::well_known::Rfc2822; @@ -135,7 +136,7 @@ struct TemplateData { peer_update_interval: String, } -pub fn run_statistics_worker(config: Config, state: State) { +pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: State) { let tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index 6a6e7e3..d844f28 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -11,7 +11,6 @@ readme = "../README.md" name = "aquatic_udp_bench" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" } diff --git a/aquatic_udp_bench/src/config.rs b/aquatic_udp_bench/src/config.rs index 2b4b2f8..a1425d8 100644 --- a/aquatic_udp_bench/src/config.rs +++ b/aquatic_udp_bench/src/config.rs @@ -24,7 +24,7 @@ impl Default for BenchConfig { } } -impl aquatic_cli_helpers::Config for BenchConfig {} +impl aquatic_common::cli::Config for BenchConfig {} #[cfg(test)] mod tests { diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 6f65bc9..89ca6da 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,13 +7,14 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` +use aquatic_common::PanicSentinelWatcher; use aquatic_udp::workers::request::run_request_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; use std::time::Duration; -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; use aquatic_udp_protocol::*; @@ -41,6 +42,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers let mut aquatic_config = Config::default(); + let (_, sentinel) = PanicSentinelWatcher::create_with_sentinel(); aquatic_config.cleaning.torrent_cleaning_interval = 60 * 60 * 24; @@ -55,6 +57,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { ::std::thread::spawn(move || { run_request_worker( + sentinel, config, state, request_receiver, diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index 31029b7..662676b 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -16,7 +16,6 @@ cpu-pinning = ["aquatic_common/with-hwloc"] name = "aquatic_udp_load_test" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index e803eb8..a189307 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index dd6ad57..7066b5f 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -1,7 +1,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; -use std::thread; +use std::thread::{self, Builder}; use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] @@ -22,7 +22,7 @@ use worker::*; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; pub fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( "aquatic_udp_load_test: BitTorrent load tester", env!("CARGO_PKG_VERSION"), run, @@ -30,8 +30,8 @@ pub fn main() { ) } -impl aquatic_cli_helpers::Config for Config { - fn get_log_level(&self) -> Option { +impl aquatic_common::cli::Config for Config { + fn get_log_level(&self) -> Option { Some(self.log_level) } } @@ -79,7 +79,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { let config = config.clone(); let state = state.clone(); - thread::spawn(move || { + Builder::new().name("load-test".into()).spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -89,7 +89,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { ); run_worker_thread(state, pareto, &config, addr) - }); + })?; } #[cfg(feature = "cpu-pinning")] diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 0add3fa..65491c7 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -16,7 +16,6 @@ name = "aquatic_ws" name = "aquatic_ws" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index b1ea961..fbfe8bf 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -5,7 +5,7 @@ use aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration @@ -44,7 +44,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 0ba6bbc..b6ffc22 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -2,16 +2,20 @@ pub mod common; pub mod config; pub mod workers; -use std::sync::{atomic::AtomicUsize, Arc}; +use std::sync::Arc; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; +use aquatic_common::PanicSentinelWatcher; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; +use signal_hook::{ + consts::{SIGTERM, SIGUSR1}, + iterator::Signals, +}; use aquatic_common::access_list::update_access_list; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::privileges::PrivilegeDropper; use common::*; use config::Config; @@ -22,17 +26,91 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + let num_peers = config.socket_workers + config.request_workers; - { + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); + + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + + let tls_config = Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?); + + let mut executors = Vec::new(); + + for i in 0..(config.socket_workers) { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); + let tls_config = tls_config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let priv_dropper = priv_dropper.clone(); - ::std::thread::spawn(move || run_workers(config, state)); + let placement = get_worker_placement( + &config.cpu_pinning, + config.socket_workers, + config.request_workers, + WorkerIndex::SocketWorker(i), + )?; + let builder = LocalExecutorBuilder::new(placement).name("socket"); + + let executor = builder + .spawn(move || async move { + workers::socket::run_socket_worker( + sentinel, + config, + state, + tls_config, + request_mesh_builder, + response_mesh_builder, + priv_dropper, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + + executors.push(executor); + } + + for i in 0..(config.request_workers) { + let sentinel = sentinel.clone(); + let config = config.clone(); + let state = state.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + + let placement = get_worker_placement( + &config.cpu_pinning, + config.socket_workers, + config.request_workers, + WorkerIndex::RequestWorker(i), + )?; + let builder = LocalExecutorBuilder::new(placement).name("request"); + + let executor = builder + .spawn(move || async move { + workers::request::run_request_worker( + sentinel, + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + + executors.push(executor); } if config.cpu_pinning.active { @@ -48,107 +126,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); } + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } else { + return Ok(()); + } + } _ => unreachable!(), } } Ok(()) } - -fn run_workers(config: Config, state: State) -> anyhow::Result<()> { - let num_peers = config.socket_workers + config.request_workers; - - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); - let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - let tls_config = Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?); - - let mut executors = Vec::new(); - - for i in 0..(config.socket_workers) { - let config = config.clone(); - let state = state.clone(); - let tls_config = tls_config.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - WorkerIndex::SocketWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name("socket"); - - let executor = builder.spawn(move || async move { - workers::socket::run_socket_worker( - config, - state, - tls_config, - request_mesh_builder, - response_mesh_builder, - num_bound_sockets, - ) - .await - }); - - executors.push(executor); - } - - for i in 0..(config.request_workers) { - let config = config.clone(); - let state = state.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - WorkerIndex::RequestWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name("request"); - - let executor = builder.spawn(move || async move { - workers::request::run_request_worker( - config, - state, - request_mesh_builder, - response_mesh_builder, - ) - .await - }); - - executors.push(executor); - } - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - )?; - } - - for executor in executors { - executor - .expect("failed to spawn local executor") - .join() - .unwrap(); - } - - Ok(()) -} diff --git a/aquatic_ws/src/main.rs b/aquatic_ws/src/main.rs index bd241e3..cb8b58f 100644 --- a/aquatic_ws/src/main.rs +++ b/aquatic_ws/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_ws::config::Config; #[global_allocator] diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/request.rs index 693ac7d..92f07ad 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/request.rs @@ -12,7 +12,7 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{extract_response_peers, AmortizedIndexMap}; +use aquatic_common::{extract_response_peers, AmortizedIndexMap, PanicSentinel}; use aquatic_ws_protocol::*; use crate::common::*; @@ -128,6 +128,7 @@ impl TorrentMaps { } pub async fn run_request_worker( + _sentinel: PanicSentinel, config: Config, state: State, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 7c121d4..cfda49a 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -3,13 +3,14 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::CanonicalSocketAddr; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -48,19 +49,18 @@ struct ConnectionReference { } pub async fn run_socket_worker( + _sentinel: PanicSentinel, config: Config, state: State, tls_config: Arc, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, - num_bound_sockets: Arc, + priv_dropper: PrivilegeDropper, ) { let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config); - - num_bound_sockets.fetch_add(1, Ordering::SeqCst); + let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let in_message_senders = Rc::new(in_message_senders); @@ -544,7 +544,10 @@ fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config) -> TcpListener { +fn create_tcp_listener( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { @@ -552,21 +555,27 @@ fn create_tcp_listener(config: &Config) -> TcpListener { }; let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) - .expect("create socket"); + .with_context(|| "create socket")?; if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).expect("socket: set reuse port"); + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; socket .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: bind to {}", config.network.address))?; socket .listen(config.network.tcp_backlog) - .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: listen {}", config.network.address))?; - unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } + priv_dropper.after_socket_creation()?; + + Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index 82496b7..457a4d2 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -13,7 +13,6 @@ readme = "../README.md" name = "aquatic_ws_load_test" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" } diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 9949c65..9af7baf 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -19,7 +19,7 @@ pub struct Config { pub cpu_pinning: CpuPinningConfigDesc, } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index f4f16eb..c660267 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -21,7 +21,7 @@ use network::*; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; pub fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( "aquatic_ws_load_test: WebTorrent load tester", env!("CARGO_PKG_VERSION"), run, @@ -67,6 +67,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { )?; LocalExecutorBuilder::new(placement) + .name("load-test") .spawn(move || async move { run_socket_thread(config, tls_config, state).await.unwrap(); })