diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 0a66014..79b91e3 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,4 +1,6 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::{Duration, Instant}; use ahash::RandomState; @@ -30,14 +32,31 @@ impl ValidUntil { } } +pub struct PanicSentinelWatcher(Arc); + +impl PanicSentinelWatcher { + pub fn create_with_sentinel() -> (Self, PanicSentinel) { + let triggered = Arc::new(AtomicBool::new(false)); + let sentinel = PanicSentinel(triggered.clone()); + + (Self(triggered), sentinel) + } + + pub fn panic_was_triggered(&self) -> bool { + self.0.load(Ordering::SeqCst) + } +} + /// Raises SIGTERM when dropped /// /// Pass to threads to have panics in them cause whole program to exit. #[derive(Clone)] -pub struct PanicSentinel; +pub struct PanicSentinel(Arc); impl Drop for PanicSentinel { fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + if unsafe { libc::raise(15) } == -1 { panic!( "Could not raise SIGTERM: {:#}", diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 663eba0..4b02340 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -2,7 +2,7 @@ pub mod common; pub mod config; pub mod workers; -use aquatic_common::PanicSentinel; +use aquatic_common::PanicSentinelWatcher; use config::Config; use std::collections::BTreeMap; @@ -32,6 +32,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let mut request_senders = Vec::new(); @@ -63,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.request_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); @@ -80,7 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::request::run_request_worker( - PanicSentinel, + sentinel, config, state, request_receiver, @@ -92,6 +94,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.socket_workers { + let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); let request_sender = @@ -111,7 +114,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( - PanicSentinel, + sentinel, state, config, i, @@ -124,6 +127,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } if config.statistics.active() { + let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); @@ -138,7 +142,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker(PanicSentinel, config, state); + workers::statistics::run_statistics_worker(sentinel, config, state); }) .with_context(|| "spawn statistics worker")?; } @@ -157,6 +161,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let _ = update_access_list(&config.access_list, &state.access_list); } SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } + break; } _ => unreachable!(), diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 046ab6d..7a83e5b 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,7 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` -use aquatic_common::PanicSentinel; +use aquatic_common::PanicSentinelWatcher; use aquatic_udp::workers::request::run_request_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; @@ -42,6 +42,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers let mut aquatic_config = Config::default(); + let (_, sentinel) = PanicSentinelWatcher::create_with_sentinel(); aquatic_config.cleaning.torrent_cleaning_interval = 60 * 60 * 24; @@ -56,7 +57,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { ::std::thread::spawn(move || { run_request_worker( - PanicSentinel, + sentinel, config, state, request_receiver,