From 6dec985d45e20b0f72960056e69eba724f142035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 29 Jan 2024 22:09:20 +0100 Subject: [PATCH] udp: store prometheus exporter thread handle, periodically render --- Cargo.lock | 1 + crates/udp/Cargo.toml | 3 +- crates/udp/src/lib.rs | 132 +++++++++++++++++++++++++++--------------- 3 files changed, 89 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b12caa0..7ac690c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -323,6 +323,7 @@ dependencies = [ "tempfile", "time", "tinytemplate", + "tokio", ] [[package]] diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 430ab49..041a3cb 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -20,7 +20,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] # Export prometheus metrics -prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"] +prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"] # Experimental io_uring support (Linux 6.0 or later required) io-uring = ["dep:io-uring"] # Experimental CPU pinning support @@ -59,6 +59,7 @@ tinytemplate = "1" metrics = { version = "0.22", optional = true } metrics-util = { version = "0.16", optional = true } metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } +tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } # io-uring feature io-uring = { version = "0.6", optional = true } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 97fc16a..09145e0 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -136,26 +136,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); let config = config.clone(); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - use metrics_exporter_prometheus::PrometheusBuilder; - use metrics_util::MetricKindMask; - - PrometheusBuilder::new() - .idle_timeout( - MetricKindMask::ALL, - Some(Duration::from_secs(config.statistics.interval * 2)), - ) - .with_http_listener(config.statistics.prometheus_endpoint_address) - .install() - .with_context(|| { - format!( - "Install prometheus endpoint on {}", - config.statistics.prometheus_endpoint_address - ) - })?; - } - let handle = Builder::new() .name("statistics".into()) .spawn(move || { @@ -174,6 +154,90 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Statistics, handle)); } + #[cfg(feature = "prometheus")] + if config.statistics.active() && config.statistics.run_prometheus_endpoint { + let config = config.clone(); + + let handle = Builder::new() + .name("prometheus".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; + + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("build prometheus tokio runtime")?; + + rt.block_on(async { + let (recorder, exporter) = PrometheusBuilder::new() + .idle_timeout( + MetricKindMask::ALL, + Some(Duration::from_secs(config.statistics.interval * 2)), + ) + .with_http_listener(config.statistics.prometheus_endpoint_address) + .build() + .context("build prometheus recorder and exporter")?; + + ::tokio::spawn(async move { + let mut interval = ::tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Periodically render metrics to make sure + // idles are cleaned up + recorder.handle().render(); + } + }); + + exporter.await.context("run prometheus exporter") + }) + }) + .with_context(|| "spawn prometheus exporter worker")?; + + join_handles.push((WorkerType::Prometheus, handle)); + } + + // Spawn signal handler thread + { + let config = config.clone(); + + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + } + #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -182,32 +246,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - let handle: JoinHandle> = Builder::new() - .name("signals".into()) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) - }) - .context("spawn signal worker")?; - - join_handles.push((WorkerType::Signals, handle)); - loop { for (i, (_, handle)) in join_handles.iter().enumerate() { if handle.is_finished() { @@ -236,6 +274,7 @@ enum WorkerType { Socket(usize), Statistics, Signals, + #[cfg(feature = "prometheus")] Prometheus, } @@ -246,6 +285,7 @@ impl Display for WorkerType { Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index)), Self::Statistics => f.write_str("Statistics worker"), Self::Signals => f.write_str("Signals worker"), + #[cfg(feature = "prometheus")] Self::Prometheus => f.write_str("Prometheus worker"), } }