diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 50a0439..97fc16a 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,19 +3,20 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::thread::Builder; +use std::fmt::Display; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; use crossbeam_channel::{bounded, unbounded}; -use signal_hook::consts::{SIGTERM, SIGUSR1}; +use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use common::{ ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, @@ -28,12 +29,12 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let mut signals = Signals::new([SIGUSR1])?; let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let mut join_handles = Vec::new(); update_access_list(&config.access_list, &state.access_list)?; @@ -62,14 +63,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.swarm_workers { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); let response_sender = ConnectedResponseSender::new(response_senders.clone()); let statistics_sender = statistics_sender.clone(); - Builder::new() + let handle = Builder::new() .name(format!("swarm-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -81,7 +81,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); let mut worker = SwarmWorker { - _sentinel: sentinel, config, state, server_start_instant, @@ -91,13 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { worker_index: SwarmWorkerIndex(i), }; - worker.run(); + worker.run() }) .with_context(|| "spawn swarm worker")?; + + join_handles.push((WorkerType::Swarm(i), handle)); } for i in 0..config.socket_workers { - let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); let connection_validator = connection_validator.clone(); @@ -106,7 +106,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let response_receiver = response_receivers.remove(&i).unwrap(); let priv_dropper = priv_dropper.clone(); - Builder::new() + let handle = Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -118,7 +118,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( - sentinel, state, config, connection_validator, @@ -126,13 +125,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_sender, response_receiver, priv_dropper, - ); + ) }) .with_context(|| "spawn socket worker")?; + + join_handles.push((WorkerType::Socket(i), handle)); } if config.statistics.active() { - let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); @@ -156,7 +156,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { })?; } - Builder::new() + let handle = Builder::new() .name("statistics".into()) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -167,14 +167,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker( - sentinel, - config, - state, - statistics_receiver, - ); + workers::statistics::run_statistics_worker(config, state, statistics_receiver) }) .with_context(|| "spawn statistics worker")?; + + join_handles.push((WorkerType::Statistics, handle)); } #[cfg(feature = "cpu-pinning")] @@ -185,21 +182,71 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); - break; + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } } - _ => unreachable!(), + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + + loop { + for (i, (_, handle)) in join_handles.iter().enumerate() { + if handle.is_finished() { + let (worker_type, handle) = join_handles.remove(i); + + match handle.join() { + Ok(Ok(())) => { + return Err(anyhow::anyhow!("{} stopped", worker_type)); + } + Ok(Err(err)) => { + return Err(err.context(format!("{} stopped", worker_type))); + } + Err(_) => { + return Err(anyhow::anyhow!("{} panicked", worker_type)); + } + } + } + } + + sleep(Duration::from_secs(5)); + } +} + +enum WorkerType { + Swarm(usize), + Socket(usize), + Statistics, + Signals, + Prometheus, +} + +impl Display for WorkerType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index)), + Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index)), + Self::Statistics => f.write_str("Statistics worker"), + Self::Signals => f.write_str("Signals worker"), + Self::Prometheus => f.write_str("Prometheus worker"), } } - - Ok(()) } diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 38a1865..b24ab94 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -9,7 +9,7 @@ use mio::{Events, Interest, Poll, Token}; use aquatic_common::{ access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, - PanicSentinel, ValidUntil, + ValidUntil, }; use aquatic_udp_protocol::*; @@ -49,9 +49,7 @@ pub struct SocketWorker { } impl SocketWorker { - #[allow(clippy::too_many_arguments)] pub fn run( - _sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -59,7 +57,7 @@ impl SocketWorker { request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, - ) { + ) -> anyhow::Result<()> { let socket = UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); let access_list_cache = create_access_list_cache(&shared_state.access_list); @@ -82,6 +80,8 @@ impl SocketWorker { }; worker.run_inner(); + + Ok(()) } pub fn run_inner(&mut self) { diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index 6889c79..857ef33 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -5,7 +5,7 @@ mod uring; mod validator; use anyhow::Context; -use aquatic_common::{privileges::PrivilegeDropper, PanicSentinel, ServerStartInstant}; +use aquatic_common::{privileges::PrivilegeDropper, ServerStartInstant}; use socket2::{Domain, Protocol, Socket, Type}; use crate::{ @@ -36,9 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8; /// - 8 bit udp header const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8; -#[allow(clippy::too_many_arguments)] pub fn run_socket_worker( - sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -46,12 +44,11 @@ pub fn run_socket_worker( request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, -) { +) -> anyhow::Result<()> { #[cfg(all(target_os = "linux", feature = "io-uring"))] match self::uring::supported_on_current_kernel() { Ok(()) => { - self::uring::SocketWorker::run( - sentinel, + return self::uring::SocketWorker::run( shared_state, config, validator, @@ -60,8 +57,6 @@ pub fn run_socket_worker( response_receiver, priv_dropper, ); - - return; } Err(err) => { ::log::warn!( @@ -71,8 +66,7 @@ pub fn run_socket_worker( } } - self::mio::SocketWorker::run( - sentinel, + return self::mio::SocketWorker::run( shared_state, config, validator, diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index 43c78c9..955a7b3 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -18,7 +18,7 @@ use io_uring::{IoUring, Probe}; use aquatic_common::{ access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, - PanicSentinel, ValidUntil, + ValidUntil, }; use aquatic_udp_protocol::*; @@ -96,9 +96,7 @@ pub struct SocketWorker { } impl SocketWorker { - #[allow(clippy::too_many_arguments)] pub fn run( - _sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -106,7 +104,7 @@ impl SocketWorker { request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, - ) { + ) -> anyhow::Result<()> { let ring_entries = config.network.ring_size.next_power_of_two(); // Try to fill up the ring with send requests let send_buffer_entries = ring_entries; @@ -191,6 +189,8 @@ impl SocketWorker { }; CurrentRing::with(|ring| worker.run_inner(ring)); + + Ok(()) } fn run_inner(&mut self, ring: &mut IoUring) { diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index e9a8565..c4f98b4 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -5,7 +5,7 @@ use std::io::Write; use std::time::{Duration, Instant}; use anyhow::Context; -use aquatic_common::{IndexMap, PanicSentinel}; +use aquatic_common::IndexMap; use aquatic_udp_protocol::{PeerClient, PeerId}; use compact_str::CompactString; use crossbeam_channel::Receiver; @@ -42,11 +42,10 @@ struct TemplateData { } pub fn run_statistics_worker( - _sentinel: PanicSentinel, config: Config, shared_state: State, statistics_receiver: Receiver, -) { +) -> anyhow::Result<()> { let process_peer_client_data = { let mut collect = config.statistics.write_html_to_file; diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index d989477..4474d71 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -10,7 +10,7 @@ use crossbeam_channel::Receiver; use crossbeam_channel::Sender; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil}; +use aquatic_common::{CanonicalSocketAddr, ValidUntil}; use crate::common::*; use crate::config::Config; @@ -18,7 +18,6 @@ use crate::config::Config; use storage::TorrentMaps; pub struct SwarmWorker { - pub _sentinel: PanicSentinel, pub config: Config, pub state: State, pub server_start_instant: ServerStartInstant, @@ -29,7 +28,7 @@ pub struct SwarmWorker { } impl SwarmWorker { - pub fn run(&mut self) { + pub fn run(&mut self) -> anyhow::Result<()> { let mut torrents = TorrentMaps::default(); let mut rng = SmallRng::from_entropy();