From 8f838098aa9ed1968b7c9ebd16bf17c7b076c218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 21:43:49 +0100 Subject: [PATCH 1/7] udp: replace PanicSentinel with loop over JoinHandles --- crates/udp/src/lib.rs | 115 +++++++++++++++------ crates/udp/src/workers/socket/mio.rs | 8 +- crates/udp/src/workers/socket/mod.rs | 14 +-- crates/udp/src/workers/socket/uring/mod.rs | 8 +- crates/udp/src/workers/statistics/mod.rs | 5 +- crates/udp/src/workers/swarm/mod.rs | 5 +- 6 files changed, 97 insertions(+), 58 deletions(-) diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 50a0439..97fc16a 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,19 +3,20 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::thread::Builder; +use std::fmt::Display; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; use crossbeam_channel::{bounded, unbounded}; -use signal_hook::consts::{SIGTERM, SIGUSR1}; +use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use common::{ ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, @@ -28,12 +29,12 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let mut signals = Signals::new([SIGUSR1])?; let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let mut join_handles = Vec::new(); update_access_list(&config.access_list, &state.access_list)?; @@ -62,14 +63,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.swarm_workers { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); let response_sender = ConnectedResponseSender::new(response_senders.clone()); let statistics_sender = statistics_sender.clone(); - Builder::new() + let handle = Builder::new() .name(format!("swarm-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -81,7 +81,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); let mut worker = SwarmWorker { - _sentinel: sentinel, config, state, server_start_instant, @@ -91,13 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { worker_index: SwarmWorkerIndex(i), }; - worker.run(); + worker.run() }) .with_context(|| "spawn swarm worker")?; + + join_handles.push((WorkerType::Swarm(i), handle)); } for i in 0..config.socket_workers { - let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); let connection_validator = connection_validator.clone(); @@ -106,7 +106,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let response_receiver = response_receivers.remove(&i).unwrap(); let priv_dropper = priv_dropper.clone(); - Builder::new() + let handle = Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -118,7 +118,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( - sentinel, state, config, connection_validator, @@ -126,13 +125,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_sender, response_receiver, priv_dropper, - ); + ) }) .with_context(|| "spawn socket worker")?; + + join_handles.push((WorkerType::Socket(i), handle)); } if config.statistics.active() { - let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); @@ -156,7 +156,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { })?; } - Builder::new() + let handle = Builder::new() .name("statistics".into()) .spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -167,14 +167,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker( - sentinel, - config, - state, - statistics_receiver, - ); + workers::statistics::run_statistics_worker(config, state, statistics_receiver) }) .with_context(|| "spawn statistics worker")?; + + join_handles.push((WorkerType::Statistics, handle)); } #[cfg(feature = "cpu-pinning")] @@ -185,21 +182,71 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); - break; + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } } - _ => unreachable!(), + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + + loop { + for (i, (_, handle)) in join_handles.iter().enumerate() { + if handle.is_finished() { + let (worker_type, handle) = join_handles.remove(i); + + match handle.join() { + Ok(Ok(())) => { + return Err(anyhow::anyhow!("{} stopped", worker_type)); + } + Ok(Err(err)) => { + return Err(err.context(format!("{} stopped", worker_type))); + } + Err(_) => { + return Err(anyhow::anyhow!("{} panicked", worker_type)); + } + } + } + } + + sleep(Duration::from_secs(5)); + } +} + +enum WorkerType { + Swarm(usize), + Socket(usize), + Statistics, + Signals, + Prometheus, +} + +impl Display for WorkerType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index)), + Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index)), + Self::Statistics => f.write_str("Statistics worker"), + Self::Signals => f.write_str("Signals worker"), + Self::Prometheus => f.write_str("Prometheus worker"), } } - - Ok(()) } diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 38a1865..b24ab94 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -9,7 +9,7 @@ use mio::{Events, Interest, Poll, Token}; use aquatic_common::{ access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, - PanicSentinel, ValidUntil, + ValidUntil, }; use aquatic_udp_protocol::*; @@ -49,9 +49,7 @@ pub struct SocketWorker { } impl SocketWorker { - #[allow(clippy::too_many_arguments)] pub fn run( - _sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -59,7 +57,7 @@ impl SocketWorker { request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, - ) { + ) -> anyhow::Result<()> { let socket = UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); let access_list_cache = create_access_list_cache(&shared_state.access_list); @@ -82,6 +80,8 @@ impl SocketWorker { }; worker.run_inner(); + + Ok(()) } pub fn run_inner(&mut self) { diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index 6889c79..857ef33 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -5,7 +5,7 @@ mod uring; mod validator; use anyhow::Context; -use aquatic_common::{privileges::PrivilegeDropper, PanicSentinel, ServerStartInstant}; +use aquatic_common::{privileges::PrivilegeDropper, ServerStartInstant}; use socket2::{Domain, Protocol, Socket, Type}; use crate::{ @@ -36,9 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8; /// - 8 bit udp header const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8; -#[allow(clippy::too_many_arguments)] pub fn run_socket_worker( - sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -46,12 +44,11 @@ pub fn run_socket_worker( request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, -) { +) -> anyhow::Result<()> { #[cfg(all(target_os = "linux", feature = "io-uring"))] match self::uring::supported_on_current_kernel() { Ok(()) => { - self::uring::SocketWorker::run( - sentinel, + return self::uring::SocketWorker::run( shared_state, config, validator, @@ -60,8 +57,6 @@ pub fn run_socket_worker( response_receiver, priv_dropper, ); - - return; } Err(err) => { ::log::warn!( @@ -71,8 +66,7 @@ pub fn run_socket_worker( } } - self::mio::SocketWorker::run( - sentinel, + return self::mio::SocketWorker::run( shared_state, config, validator, diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index 43c78c9..955a7b3 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -18,7 +18,7 @@ use io_uring::{IoUring, Probe}; use aquatic_common::{ access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr, - PanicSentinel, ValidUntil, + ValidUntil, }; use aquatic_udp_protocol::*; @@ -96,9 +96,7 @@ pub struct SocketWorker { } impl SocketWorker { - #[allow(clippy::too_many_arguments)] pub fn run( - _sentinel: PanicSentinel, shared_state: State, config: Config, validator: ConnectionValidator, @@ -106,7 +104,7 @@ impl SocketWorker { request_sender: ConnectedRequestSender, response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, - ) { + ) -> anyhow::Result<()> { let ring_entries = config.network.ring_size.next_power_of_two(); // Try to fill up the ring with send requests let send_buffer_entries = ring_entries; @@ -191,6 +189,8 @@ impl SocketWorker { }; CurrentRing::with(|ring| worker.run_inner(ring)); + + Ok(()) } fn run_inner(&mut self, ring: &mut IoUring) { diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index e9a8565..c4f98b4 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -5,7 +5,7 @@ use std::io::Write; use std::time::{Duration, Instant}; use anyhow::Context; -use aquatic_common::{IndexMap, PanicSentinel}; +use aquatic_common::IndexMap; use aquatic_udp_protocol::{PeerClient, PeerId}; use compact_str::CompactString; use crossbeam_channel::Receiver; @@ -42,11 +42,10 @@ struct TemplateData { } pub fn run_statistics_worker( - _sentinel: PanicSentinel, config: Config, shared_state: State, statistics_receiver: Receiver, -) { +) -> anyhow::Result<()> { let process_peer_client_data = { let mut collect = config.statistics.write_html_to_file; diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index d989477..4474d71 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -10,7 +10,7 @@ use crossbeam_channel::Receiver; use crossbeam_channel::Sender; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil}; +use aquatic_common::{CanonicalSocketAddr, ValidUntil}; use crate::common::*; use crate::config::Config; @@ -18,7 +18,6 @@ use crate::config::Config; use storage::TorrentMaps; pub struct SwarmWorker { - pub _sentinel: PanicSentinel, pub config: Config, pub state: State, pub server_start_instant: ServerStartInstant, @@ -29,7 +28,7 @@ pub struct SwarmWorker { } impl SwarmWorker { - pub fn run(&mut self) { + pub fn run(&mut self) -> anyhow::Result<()> { let mut torrents = TorrentMaps::default(); let mut rng = SmallRng::from_entropy(); From 6dec985d45e20b0f72960056e69eba724f142035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:09:20 +0100 Subject: [PATCH 2/7] udp: store prometheus exporter thread handle, periodically render --- Cargo.lock | 1 + crates/udp/Cargo.toml | 3 +- crates/udp/src/lib.rs | 132 +++++++++++++++++++++++++++--------------- 3 files changed, 89 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b12caa0..7ac690c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,6 +323,7 @@ dependencies = [ "tempfile", "time", "tinytemplate", + "tokio", ] [[package]] diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 430ab49..041a3cb 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -20,7 +20,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] # Export prometheus metrics -prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"] +prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"] # Experimental io_uring support (Linux 6.0 or later required) io-uring = ["dep:io-uring"] # Experimental CPU pinning support @@ -59,6 +59,7 @@ tinytemplate = "1" metrics = { version = "0.22", optional = true } metrics-util = { version = "0.16", optional = true } metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } +tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } # io-uring feature io-uring = { version = "0.6", optional = true } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 97fc16a..09145e0 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -136,26 +136,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); let config = config.clone(); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - use metrics_exporter_prometheus::PrometheusBuilder; - use metrics_util::MetricKindMask; - - PrometheusBuilder::new() - .idle_timeout( - MetricKindMask::ALL, - Some(Duration::from_secs(config.statistics.interval * 2)), - ) - .with_http_listener(config.statistics.prometheus_endpoint_address) - .install() - .with_context(|| { - format!( - "Install prometheus endpoint on {}", - config.statistics.prometheus_endpoint_address - ) - })?; - } - let handle = Builder::new() .name("statistics".into()) .spawn(move || { @@ -174,6 +154,90 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Statistics, handle)); } + #[cfg(feature = "prometheus")] + if config.statistics.active() && config.statistics.run_prometheus_endpoint { + let config = config.clone(); + + let handle = Builder::new() + .name("prometheus".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; + + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("build prometheus tokio runtime")?; + + rt.block_on(async { + let (recorder, exporter) = PrometheusBuilder::new() + .idle_timeout( + MetricKindMask::ALL, + Some(Duration::from_secs(config.statistics.interval * 2)), + ) + .with_http_listener(config.statistics.prometheus_endpoint_address) + .build() + .context("build prometheus recorder and exporter")?; + + ::tokio::spawn(async move { + let mut interval = ::tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Periodically render metrics to make sure + // idles are cleaned up + recorder.handle().render(); + } + }); + + exporter.await.context("run prometheus exporter") + }) + }) + .with_context(|| "spawn prometheus exporter worker")?; + + join_handles.push((WorkerType::Prometheus, handle)); + } + + // Spawn signal handler thread + { + let config = config.clone(); + + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + } + #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -182,32 +246,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - let handle: JoinHandle> = Builder::new() - .name("signals".into()) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) - }) - .context("spawn signal worker")?; - - join_handles.push((WorkerType::Signals, handle)); - loop { for (i, (_, handle)) in join_handles.iter().enumerate() { if handle.is_finished() { @@ -236,6 +274,7 @@ enum WorkerType { Socket(usize), Statistics, Signals, + #[cfg(feature = "prometheus")] Prometheus, } @@ -246,6 +285,7 @@ impl Display for WorkerType { Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index)), Self::Statistics => f.write_str("Statistics worker"), Self::Signals => f.write_str("Signals worker"), + #[cfg(feature = "prometheus")] Self::Prometheus => f.write_str("Prometheus worker"), } } From ae75d0cbe47e85714c377affb06a0ace70645a64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:12:08 +0100 Subject: [PATCH 3/7] udp socket worker: fix warning --- crates/udp/src/workers/socket/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index 857ef33..f8597cc 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -66,7 +66,7 @@ pub fn run_socket_worker( } } - return self::mio::SocketWorker::run( + self::mio::SocketWorker::run( shared_state, config, validator, @@ -74,7 +74,7 @@ pub fn run_socket_worker( request_sender, response_receiver, priv_dropper, - ); + ) } fn create_socket( From 1967d8aa3e8d08499d3dbefe9b085c12555cdb4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:17:30 +0100 Subject: [PATCH 4/7] udp: return errors from threads instead of panicking in some cases --- crates/udp/src/workers/socket/mio.rs | 17 +++++++---------- crates/udp/src/workers/statistics/mod.rs | 9 +++------ 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index b24ab94..05f90fa 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -2,6 +2,7 @@ use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; +use anyhow::Context; use aquatic_common::access_list::AccessListCache; use aquatic_common::ServerStartInstant; use mio::net::UdpSocket; @@ -58,8 +59,7 @@ impl SocketWorker { response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { - let socket = - UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); + let socket = UdpSocket::from_std(create_socket(&config, priv_dropper)?); let access_list_cache = create_access_list_cache(&shared_state.access_list); let opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new()); @@ -79,18 +79,16 @@ impl SocketWorker { pending_requests: Default::default(), }; - worker.run_inner(); - - Ok(()) + worker.run_inner() } - pub fn run_inner(&mut self) { + pub fn run_inner(&mut self) -> anyhow::Result<()> { let mut events = Events::with_capacity(1); - let mut poll = Poll::new().expect("create poll"); + let mut poll = Poll::new().context("create poll")?; poll.registry() .register(&mut self.socket, Token(0), Interest::READABLE) - .expect("register poll"); + .context("register poll")?; let poll_timeout = Duration::from_millis(self.config.network.poll_timeout_ms); @@ -108,8 +106,7 @@ impl SocketWorker { loop { match self.polling_mode { PollMode::Regular => { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); + poll.poll(&mut events, Some(poll_timeout)).context("poll")?; for event in events.iter() { if event.is_readable() { diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index c4f98b4..7600b33 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -60,13 +60,10 @@ pub fn run_statistics_worker( let opt_tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); - if let Err(err) = tt.add_template(TEMPLATE_KEY, TEMPLATE_CONTENTS) { - ::log::error!("Couldn't parse statistics html template: {:#}", err); + tt.add_template(TEMPLATE_KEY, TEMPLATE_CONTENTS) + .context("parse statistics html template")?; - None - } else { - Some(tt) - } + Some(tt) } else { None }; From 9d8aca8f86589b62c822968f766300a235f29a9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:19:20 +0100 Subject: [PATCH 5/7] Update TODO --- TODO.md | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/TODO.md b/TODO.md index 72cd3ac..829af77 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,7 @@ ## High priority -* general +* http and ws * add task to generate prometheus exports on regular interval to clean up data. this is important if peer_clients is activated @@ -12,7 +12,9 @@ In that case, a shorter duration (e.g., 30 seconds) would be a good idea. * general - * panic sentinel not working? at least seemingly not in http? + * Replace panic sentinel with checking threads like in udp implementation. + It seems to be broken + ## Medium priority @@ -25,14 +27,6 @@ * toml v0.7 * syn v2.0 -* quit whole program if any thread panics - * But it would be nice not to panic in workers, but to return errors instead. - Once JoinHandle::is_finished is available in stable Rust (#90470), an - option would be to - * Save JoinHandles - * When preparing to quit because of PanicSentinel sending SIGTERM, loop - through them, extract error and log it - * Run cargo-deny in CI * aquatic_ws From 216bb9308889996553a69a9fa29afdf9f11f043b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:27:03 +0100 Subject: [PATCH 6/7] udp: improve WorkerType Display implementation --- crates/udp/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 09145e0..d26e8bb 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -281,8 +281,8 @@ enum WorkerType { impl Display for WorkerType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index)), - Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index)), + Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)), + Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), Self::Statistics => f.write_str("Statistics worker"), Self::Signals => f.write_str("Signals worker"), #[cfg(feature = "prometheus")] From 5c2cd9a71937f414c5841d379c5a8fdafc8106af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:29:28 +0100 Subject: [PATCH 7/7] Update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa9ff32..00908f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,10 @@ * Speed up parsing and serialization of requests and responses by using [zerocopy](https://crates.io/crates/zerocopy) +#### Fixed + +* Quit whole application if any worker thread quits + ### aquatic_http #### Added