diff --git a/CHANGELOG.md b/CHANGELOG.md index 560f800..3134f9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ #### Fixed * Fix bug where clean up after closing connections wasn't always done +* Quit whole application if any worker thread quits ### aquatic_ws diff --git a/Cargo.lock b/Cargo.lock index 3faac6e..75e69e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,7 +200,6 @@ dependencies = [ "log", "memchr", "metrics", - "metrics-exporter-prometheus", "mimalloc", "once_cell", "privdrop", diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 28bd7ee..033fffc 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,3 +1,4 @@ +use std::fmt::Display; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -198,3 +199,25 @@ pub fn spawn_prometheus_endpoint( Ok(handle) } + +pub enum WorkerType { + Swarm(usize), + Socket(usize), + Statistics, + Signals, + #[cfg(feature = "prometheus")] + Prometheus, +} + +impl Display for WorkerType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)), + Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), + Self::Statistics => f.write_str("Statistics worker"), + Self::Signals => f.write_str("Signals worker"), + #[cfg(feature = "prometheus")] + Self::Prometheus => f.write_str("Prometheus worker"), + } + } +} diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 3967ce2..4cb25db 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -19,11 +19,11 @@ name = "aquatic_http" [features] default = ["prometheus"] -prometheus = ["metrics", "metrics-exporter-prometheus"] +prometheus = ["aquatic_common/prometheus", "metrics"] metrics = ["dep:metrics"] [dependencies] -aquatic_common = { workspace = true, features = ["rustls", "glommio"] } +aquatic_common = { workspace = true, features = ["rustls"] } aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true @@ -40,8 +40,6 @@ httparse = "1" itoa = "1" libc = "0.2" log = "0.4" -metrics = { version = "0.22", optional = true } -metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } memchr = "2" privdrop = "0.5" @@ -54,6 +52,9 @@ slotmap = "1" socket2 = { version = "0.5", features = ["all"] } thiserror = "1" +# metrics feature +metrics = { version = "0.22", optional = true } + [dev-dependencies] quickcheck = "1" quickcheck_macros = "1" diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 3bb0484..f62d923 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -1,22 +1,17 @@ use anyhow::Context; use aquatic_common::{ - access_list::update_access_list, - cpu_pinning::{ - glommio::{get_worker_placement, set_affinity_for_util_worker}, - WorkerIndex, - }, - privileges::PrivilegeDropper, - rustls_config::create_rustls_config, - PanicSentinelWatcher, ServerStartInstant, + access_list::update_access_list, privileges::PrivilegeDropper, + rustls_config::create_rustls_config, ServerStartInstant, WorkerType, }; use arc_swap::ArcSwap; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{ - consts::{SIGTERM, SIGUSR1}, - iterator::Signals, +use signal_hook::{consts::SIGUSR1, iterator::Signals}; +use std::{ + sync::Arc, + thread::{sleep, Builder, JoinHandle}, + time::Duration, }; -use std::sync::Arc; use crate::config::Config; @@ -30,32 +25,16 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - - #[cfg(feature = "prometheus")] - if config.metrics.run_prometheus_endpoint { - use metrics_exporter_prometheus::PrometheusBuilder; - - PrometheusBuilder::new() - .with_http_listener(config.metrics.prometheus_endpoint_address) - .install() - .with_context(|| { - format!( - "Install prometheus endpoint on {}", - config.metrics.prometheus_endpoint_address - ) - })?; - } + let mut signals = Signals::new([SIGUSR1])?; let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.swarm_workers; - - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let request_mesh_builder = MeshBuilder::partial( + config.socket_workers + config.swarm_workers, + SHARED_CHANNEL_SIZE, + ); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let opt_tls_config = if config.network.enable_tls { @@ -69,111 +48,128 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let server_start_instant = ServerStartInstant::new(); - let mut executors = Vec::new(); + let mut join_handles = Vec::new(); for i in 0..(config.socket_workers) { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let opt_tls_config = opt_tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let priv_dropper = priv_dropper.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SocketWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name(&format!("socket-{:02}", i + 1)); - - let executor = builder - .spawn(move || async move { - workers::socket::run_socket_worker( - sentinel, - config, - state, - opt_tls_config, - request_mesh_builder, - priv_dropper, - server_start_instant, - i, - ) - .await + let handle = Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + LocalExecutorBuilder::default() + .make() + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))? + .run(workers::socket::run_socket_worker( + config, + state, + opt_tls_config, + request_mesh_builder, + priv_dropper, + server_start_instant, + i, + )) }) - .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + .context("spawn socket worker")?; - executors.push(executor); + join_handles.push((WorkerType::Socket(i), handle)); } for i in 0..(config.swarm_workers) { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SwarmWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name(&format!("swarm-{:02}", i + 1)); - - let executor = builder - .spawn(move || async move { - workers::swarm::run_swarm_worker( - sentinel, - config, - state, - request_mesh_builder, - server_start_instant, - i, - ) - .await + let handle = Builder::new() + .name(format!("swarm-{:02}", i + 1)) + .spawn(move || { + LocalExecutorBuilder::default() + .make() + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))? + .run(workers::swarm::run_swarm_worker( + config, + state, + request_mesh_builder, + server_start_instant, + i, + )) }) - .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + .context("spawn swarm worker")?; - executors.push(executor); + join_handles.push((WorkerType::Swarm(i), handle)); } - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, + #[cfg(feature = "prometheus")] + if config.metrics.run_prometheus_endpoint { + let handle = aquatic_common::spawn_prometheus_endpoint( + config.metrics.prometheus_endpoint_address, + Some(Duration::from_secs( + config.cleaning.torrent_cleaning_interval * 2, + )), )?; + + join_handles.push((WorkerType::Prometheus, handle)); } - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); + // Spawn signal handler thread + { + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); - if let Some(tls_config) = opt_tls_config.as_ref() { - match create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - ) { - Ok(config) => { - tls_config.store(Arc::new(config)); + if let Some(tls_config) = opt_tls_config.as_ref() { + match create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) { + Ok(config) => { + tls_config.store(Arc::new(config)); - ::log::info!("successfully updated tls config"); + ::log::info!("successfully updated tls config"); + } + Err(err) => { + ::log::error!("could not update tls config: {:#}", err) + } + } + } } - Err(err) => ::log::error!("could not update tls config: {:#}", err), + _ => unreachable!(), + } + } + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + } + + loop { + for (i, (_, handle)) in join_handles.iter().enumerate() { + if handle.is_finished() { + let (worker_type, handle) = join_handles.remove(i); + + match handle.join() { + Ok(Ok(())) => { + return Err(anyhow::anyhow!("{} stopped", worker_type)); + } + Ok(Err(err)) => { + return Err(err.context(format!("{} stopped", worker_type))); + } + Err(_) => { + return Err(anyhow::anyhow!("{} panicked", worker_type)); } } } - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } else { - return Ok(()); - } - } - _ => unreachable!(), } - } - Ok(()) + sleep(Duration::from_secs(5)); + } } diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index a289607..0f2fd44 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -10,7 +10,7 @@ use std::time::Duration; use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; +use aquatic_common::{CanonicalSocketAddr, ServerStartInstant}; use arc_swap::ArcSwap; use futures_lite::future::race; use futures_lite::StreamExt; @@ -32,7 +32,6 @@ struct ConnectionHandle { #[allow(clippy::too_many_arguments)] pub async fn run_socket_worker( - _sentinel: PanicSentinel, config: Config, state: State, opt_tls_config: Option>>, @@ -40,13 +39,16 @@ pub async fn run_socket_worker( priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, worker_index: usize, -) { +) -> anyhow::Result<()> { let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); + let listener = create_tcp_listener(&config, priv_dropper).context("create tcp listener")?; - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let (request_senders, _) = request_mesh_builder + .join(Role::Producer) + .await + .map_err(|err| anyhow::anyhow!("join request mesh: {:#}", err))?; let request_senders = Rc::new(request_senders); let connection_handles = Rc::new(RefCell::new(HopSlotMap::with_key())); @@ -145,6 +147,8 @@ pub async fn run_socket_worker( } } } + + Ok(()) } async fn clean_connections( diff --git a/crates/http/src/workers/swarm/mod.rs b/crates/http/src/workers/swarm/mod.rs index d50797e..f8d7f34 100644 --- a/crates/http/src/workers/swarm/mod.rs +++ b/crates/http/src/workers/swarm/mod.rs @@ -11,7 +11,7 @@ use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; -use aquatic_common::{PanicSentinel, ServerStartInstant, ValidUntil}; +use aquatic_common::{ServerStartInstant, ValidUntil}; use crate::common::*; use crate::config::Config; @@ -19,14 +19,16 @@ use crate::config::Config; use self::storage::TorrentMaps; pub async fn run_swarm_worker( - _sentinel: PanicSentinel, config: Config, state: State, request_mesh_builder: MeshBuilder, server_start_instant: ServerStartInstant, worker_index: usize, -) { - let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); +) -> anyhow::Result<()> { + let (_, mut request_receivers) = request_mesh_builder + .join(Role::Consumer) + .await + .map_err(|err| anyhow::anyhow!("join request mesh: {:#}", err))?; let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index))); let access_list = state.access_list; @@ -82,6 +84,8 @@ pub async fn run_swarm_worker( for handle in handles { handle.await; } + + Ok(()) } async fn handle_request_stream( diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 5e4c043..c216f65 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,11 +3,11 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::fmt::Display; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; +use aquatic_common::WorkerType; use crossbeam_channel::{bounded, unbounded}; use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; @@ -233,25 +233,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sleep(Duration::from_secs(5)); } } - -enum WorkerType { - Swarm(usize), - Socket(usize), - Statistics, - Signals, - #[cfg(feature = "prometheus")] - Prometheus, -} - -impl Display for WorkerType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)), - Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), - Self::Statistics => f.write_str("Statistics worker"), - Self::Signals => f.write_str("Signals worker"), - #[cfg(feature = "prometheus")] - Self::Prometheus => f.write_str("Prometheus worker"), - } - } -}