diff --git a/CHANGELOG.md b/CHANGELOG.md index aa9ff32..00908f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,10 @@ * Speed up parsing and serialization of requests and responses by using [zerocopy](https://crates.io/crates/zerocopy) +#### Fixed + +* Quit whole application if any worker thread quits + ### aquatic_http #### Added diff --git a/Cargo.lock b/Cargo.lock index b12caa0..7ac690c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,6 +323,7 @@ dependencies = [ "tempfile", "time", "tinytemplate", + "tokio", ] [[package]] diff --git a/TODO.md b/TODO.md index 72cd3ac..829af77 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,7 @@ ## High priority -* general +* http and ws * add task to generate prometheus exports on regular interval to clean up data. this is important if peer_clients is activated @@ -12,7 +12,9 @@ In that case, a shorter duration (e.g., 30 seconds) would be a good idea. * general - * panic sentinel not working? at least seemingly not in http? + * Replace panic sentinel with checking threads like in udp implementation. + It seems to be broken + ## Medium priority @@ -25,14 +27,6 @@ * toml v0.7 * syn v2.0 -* quit whole program if any thread panics - * But it would be nice not to panic in workers, but to return errors instead. - Once JoinHandle::is_finished is available in stable Rust (#90470), an - option would be to - * Save JoinHandles - * When preparing to quit because of PanicSentinel sending SIGTERM, loop - through them, extract error and log it - * Run cargo-deny in CI * aquatic_ws diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 430ab49..041a3cb 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -20,7 +20,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] # Export prometheus metrics -prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"] +prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"] # Experimental io_uring support (Linux 6.0 or later required) io-uring = ["dep:io-uring"] # Experimental CPU pinning support @@ -59,6 +59,7 @@ tinytemplate = "1" metrics = { version = "0.22", optional = true } metrics-util = { version = "0.16", optional = true } metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } +tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } # io-uring feature io-uring = { version = "0.6", optional = true } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 50a0439..d26e8bb 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,19 +3,20 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::thread::Builder; +use std::fmt::Display; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; use crossbeam_channel::{bounded, unbounded}; -use signal_hook::consts::{SIGTERM, SIGUSR1}; +use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use common::{ ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, @@ -28,12 +29,12 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let mut signals = Signals::new([SIGUSR1])?; let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let mut join_handles = Vec::new(); update_access_list(&config.access_list, &state.access_list)?; @@ -62,14 +63,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.swarm_workers { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); let response_sender = ConnectedResponseSender::new(response_senders.clone()); let statistics_sender = statistics_sender.clone(); - Builder::new() + let handle = Builder::new() .name(format!("swarm-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -81,7 +81,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); let mut worker = SwarmWorker { - _sentinel: sentinel, config, state, server_start_instant, @@ -91,13 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { worker_index: SwarmWorkerIndex(i), }; - worker.run(); + worker.run() }) .with_context(|| "spawn swarm worker")?; + + join_handles.push((WorkerType::Swarm(i), handle)); } for i in 0..config.socket_workers { - let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); let connection_validator = connection_validator.clone(); @@ -106,7 +106,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let response_receiver = response_receivers.remove(&i).unwrap(); let priv_dropper = priv_dropper.clone(); - Builder::new() + let handle = Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -118,7 +118,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( - sentinel, state, config, connection_validator, @@ -126,37 +125,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_sender, response_receiver, priv_dropper, - ); + ) }) .with_context(|| "spawn socket worker")?; + + join_handles.push((WorkerType::Socket(i), handle)); } if config.statistics.active() { - let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - use metrics_exporter_prometheus::PrometheusBuilder; - use metrics_util::MetricKindMask; - - PrometheusBuilder::new() - .idle_timeout( - MetricKindMask::ALL, - Some(Duration::from_secs(config.statistics.interval * 2)), - ) - .with_http_listener(config.statistics.prometheus_endpoint_address) - .install() - .with_context(|| { - format!( - "Install prometheus endpoint on {}", - config.statistics.prometheus_endpoint_address - ) - })?; - } - - Builder::new() + let handle = Builder::new() .name("statistics".into()) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -167,14 +147,95 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker( - sentinel, - config, - state, - statistics_receiver, - ); + workers::statistics::run_statistics_worker(config, state, statistics_receiver) }) .with_context(|| "spawn statistics worker")?; + + join_handles.push((WorkerType::Statistics, handle)); + } + + #[cfg(feature = "prometheus")] + if config.statistics.active() && config.statistics.run_prometheus_endpoint { + let config = config.clone(); + + let handle = Builder::new() + .name("prometheus".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; + + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("build prometheus tokio runtime")?; + + rt.block_on(async { + let (recorder, exporter) = PrometheusBuilder::new() + .idle_timeout( + MetricKindMask::ALL, + Some(Duration::from_secs(config.statistics.interval * 2)), + ) + .with_http_listener(config.statistics.prometheus_endpoint_address) + .build() + .context("build prometheus recorder and exporter")?; + + ::tokio::spawn(async move { + let mut interval = ::tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Periodically render metrics to make sure + // idles are cleaned up + recorder.handle().render(); + } + }); + + exporter.await.context("run prometheus exporter") + }) + }) + .with_context(|| "spawn prometheus exporter worker")?; + + join_handles.push((WorkerType::Prometheus, handle)); + } + + // Spawn signal handler thread + { + let config = config.clone(); + + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); } #[cfg(feature = "cpu-pinning")] @@ -185,21 +246,47 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } + loop { + for (i, (_, handle)) in join_handles.iter().enumerate() { + if handle.is_finished() { + let (worker_type, handle) = join_handles.remove(i); - break; + match handle.join() { + Ok(Ok(())) => { + return Err(anyhow::anyhow!("{} stopped", worker_type)); + } + Ok(Err(err)) => { + return Err(err.context(format!("{} stopped", worker_type))); + } + Err(_) => { + return Err(anyhow::anyhow!("{} panicked", worker_type)); + } + } } - _ => unreachable!(), + } + + sleep(Duration::from_secs(5)); + } +} + +enum WorkerType { + Swarm(usize), + Socket(usize), + Statistics, + Signals, + #[cfg(feature = "prometheus")] + Prometheus, +} + +impl Display for WorkerType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)), + Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), + Self::Statistics => f.write_str("Statistics worker"), + Self::Signals => f.write_str("Signals worker"), + #[cfg(feature = "prometheus")] + Self::Prometheus => f.write_str("Prometheus worker"), } } - - Ok(()) } diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 38a1865..05f90fa 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -2,6 +2,7 @@ use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; +use anyhow::Context; use aquatic_common::access_list::AccessListCache; use aquatic_common::ServerStartInstant; use mio::net::UdpSocket; @@ -9,7 +10,7 @@ use mio::{Events, Interest, Poll, Token}; use aquatic_common::{ access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, - PanicSentinel, ValidUntil, + ValidUntil, }; use aquatic_udp_protocol::*; @@ -49,9 +50,7 @@ pub struct SocketWorker { } impl SocketWorker { - #[allow(clippy::too_many_arguments)] pub fn run( - _sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -59,9 +58,8 @@ impl SocketWorker { request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, - ) { - let socket = - UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); + ) -> anyhow::Result<()> { + let socket = UdpSocket::from_std(create_socket(&config, priv_dropper)?); let access_list_cache = create_access_list_cache(&shared_state.access_list); let opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new()); @@ -81,16 +79,16 @@ impl SocketWorker { pending_requests: Default::default(), }; - worker.run_inner(); + worker.run_inner() } - pub fn run_inner(&mut self) { + pub fn run_inner(&mut self) -> anyhow::Result<()> { let mut events = Events::with_capacity(1); - let mut poll = Poll::new().expect("create poll"); + let mut poll = Poll::new().context("create poll")?; poll.registry() .register(&mut self.socket, Token(0), Interest::READABLE) - .expect("register poll"); + .context("register poll")?; let poll_timeout = Duration::from_millis(self.config.network.poll_timeout_ms); @@ -108,8 +106,7 @@ impl SocketWorker { loop { match self.polling_mode { PollMode::Regular => { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); + poll.poll(&mut events, Some(poll_timeout)).context("poll")?; for event in events.iter() { if event.is_readable() { diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index 6889c79..f8597cc 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -5,7 +5,7 @@ mod uring; mod validator; use anyhow::Context; -use aquatic_common::{privileges::PrivilegeDropper, PanicSentinel, ServerStartInstant}; +use aquatic_common::{privileges::PrivilegeDropper, ServerStartInstant}; use socket2::{Domain, Protocol, Socket, Type}; use crate::{ @@ -36,9 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8; /// - 8 bit udp header const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8; -#[allow(clippy::too_many_arguments)] pub fn run_socket_worker( - sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -46,12 +44,11 @@ pub fn run_socket_worker( request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, -) { +) -> anyhow::Result<()> { #[cfg(all(target_os = "linux", feature = "io-uring"))] match self::uring::supported_on_current_kernel() { Ok(()) => { - self::uring::SocketWorker::run( - sentinel, + return self::uring::SocketWorker::run( shared_state, config, validator, @@ -60,8 +57,6 @@ pub fn run_socket_worker( response_receiver, priv_dropper, ); - - return; } Err(err) => { ::log::warn!( @@ -72,7 +67,6 @@ pub fn run_socket_worker( } self::mio::SocketWorker::run( - sentinel, shared_state, config, validator, @@ -80,7 +74,7 @@ pub fn run_socket_worker( request_sender, response_receiver, priv_dropper, - ); + ) } fn create_socket( diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index 43c78c9..955a7b3 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -18,7 +18,7 @@ use io_uring::{IoUring, Probe}; use aquatic_common::{ access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, - PanicSentinel, ValidUntil, + ValidUntil, }; use aquatic_udp_protocol::*; @@ -96,9 +96,7 @@ pub struct SocketWorker { } impl SocketWorker { - #[allow(clippy::too_many_arguments)] pub fn run( - _sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -106,7 +104,7 @@ impl SocketWorker { request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, - ) { + ) -> anyhow::Result<()> { let ring_entries = config.network.ring_size.next_power_of_two(); // Try to fill up the ring with send requests let send_buffer_entries = ring_entries; @@ -191,6 +189,8 @@ impl SocketWorker { }; CurrentRing::with(|ring| worker.run_inner(ring)); + + Ok(()) } fn run_inner(&mut self, ring: &mut IoUring) { diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index e9a8565..7600b33 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -5,7 +5,7 @@ use std::io::Write; use std::time::{Duration, Instant}; use anyhow::Context; -use aquatic_common::{IndexMap, PanicSentinel}; +use aquatic_common::IndexMap; use aquatic_udp_protocol::{PeerClient, PeerId}; use compact_str::CompactString; use crossbeam_channel::Receiver; @@ -42,11 +42,10 @@ struct TemplateData { } pub fn run_statistics_worker( - _sentinel: PanicSentinel, config: Config, shared_state: State, statistics_receiver: Receiver, -) { +) -> anyhow::Result<()> { let process_peer_client_data = { let mut collect = config.statistics.write_html_to_file; @@ -61,13 +60,10 @@ pub fn run_statistics_worker( let opt_tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); - if let Err(err) = tt.add_template(TEMPLATE_KEY, TEMPLATE_CONTENTS) { - ::log::error!("Couldn't parse statistics html template: {:#}", err); + tt.add_template(TEMPLATE_KEY, TEMPLATE_CONTENTS) + .context("parse statistics html template")?; - None - } else { - Some(tt) - } + Some(tt) } else { None }; diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index d989477..4474d71 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -10,7 +10,7 @@ use crossbeam_channel::Receiver; use crossbeam_channel::Sender; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil}; +use aquatic_common::{CanonicalSocketAddr, ValidUntil}; use crate::common::*; use crate::config::Config; @@ -18,7 +18,6 @@ use crate::config::Config; use storage::TorrentMaps; pub struct SwarmWorker { - pub _sentinel: PanicSentinel, pub config: Config, pub state: State, pub server_start_instant: ServerStartInstant, @@ -29,7 +28,7 @@ pub struct SwarmWorker { } impl SwarmWorker { - pub fn run(&mut self) { + pub fn run(&mut self) -> anyhow::Result<()> { let mut torrents = TorrentMaps::default(); let mut rng = SmallRng::from_entropy();