diff --git a/Cargo.lock b/Cargo.lock index 59407d7..3faac6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,12 +165,16 @@ dependencies = [ "indexmap 2.1.0", "libc", "log", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "privdrop", "rand", "rustls", "rustls-pemfile", "serde", "simple_logger", + "tokio", "toml 0.5.11", ] @@ -309,8 +313,6 @@ dependencies = [ "libc", "log", "metrics", - "metrics-exporter-prometheus", - "metrics-util", "mimalloc", "mio", "num-format", @@ -324,7 +326,6 @@ dependencies = [ "tempfile", "time", "tinytemplate", - "tokio", ] [[package]] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2dafac8..b4c624a 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -14,6 +14,7 @@ name = "aquatic_common" [features] rustls = ["dep:rustls", "rustls-pemfile"] +prometheus = ["dep:metrics", "dep:metrics-util", "dep:metrics-exporter-prometheus", "dep:tokio"] [dependencies] aquatic_toml_config.workspace = true @@ -34,8 +35,16 @@ serde = { version = "1", features = ["derive"] } simple_logger = { version = "4", features = ["stderr"] } toml = "0.5" -# Optional -glommio = { version = "0.8", optional = true } -hwloc = { version = "0.5", optional = true } +# rustls feature rustls = { version = "0.22", optional = true } rustls-pemfile = { version = "2", optional = true } + +# prometheus feature +metrics = { version = "0.22", optional = true } +metrics-util = { version = "0.16", optional = true } +metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } +tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } + +# other optional +glommio = { version = "0.8", optional = true } +hwloc = { version = "0.5", optional = true } \ No newline at end of file diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 6ef4240..28bd7ee 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -138,3 +138,63 @@ impl CanonicalSocketAddr { self.0.is_ipv4() } } + +#[cfg(feature = "prometheus")] +pub fn spawn_prometheus_endpoint( + addr: SocketAddr, + timeout: Option<::std::time::Duration>, +) -> anyhow::Result<::std::thread::JoinHandle>> { + use std::thread::Builder; + use std::time::Duration; + + use anyhow::Context; + + let handle = Builder::new() + .name("prometheus".into()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + config.swarm_workers, + WorkerIndex::Util, + ); + + use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; + + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("build prometheus tokio runtime")?; + + rt.block_on(async { + let (recorder, exporter) = PrometheusBuilder::new() + .idle_timeout(MetricKindMask::ALL, timeout) + .with_http_listener(addr) + .build() + .context("build prometheus recorder and exporter")?; + + let recorder_handle = recorder.handle(); + + ::metrics::set_global_recorder(recorder).context("set global metrics recorder")?; + + ::tokio::spawn(async move { + let mut interval = ::tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Periodically render metrics to make sure + // idles are cleaned up + recorder_handle.render(); + } + }); + + exporter.await.context("run prometheus exporter") + }) + }) + .context("spawn prometheus endpoint")?; + + Ok(handle) +} diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 3828132..ffc602c 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -20,7 +20,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] # Export prometheus metrics -prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"] +prometheus = ["metrics", "aquatic_common/prometheus"] # Experimental io_uring support (Linux 6.0 or later required) io-uring = ["dep:io-uring"] # Experimental CPU pinning support @@ -58,9 +58,6 @@ tinytemplate = "1" # prometheus feature metrics = { version = "0.22", optional = true } -metrics-util = { version = "0.16", optional = true } -metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } -tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } # io-uring feature io-uring = { version = "0.6", optional = true } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 639e2a9..5e4c043 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -162,58 +162,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { #[cfg(feature = "prometheus")] if config.statistics.active() && config.statistics.run_prometheus_endpoint { - let config = config.clone(); - - let handle = Builder::new() - .name("prometheus".into()) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - - use metrics_exporter_prometheus::PrometheusBuilder; - use metrics_util::MetricKindMask; - - let rt = ::tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("build prometheus tokio runtime")?; - - rt.block_on(async { - let (recorder, exporter) = PrometheusBuilder::new() - .idle_timeout( - MetricKindMask::ALL, - Some(Duration::from_secs(config.statistics.interval * 2)), - ) - .with_http_listener(config.statistics.prometheus_endpoint_address) - .build() - .context("build prometheus recorder and exporter")?; - - let recorder_handle = recorder.handle(); - - ::metrics::set_global_recorder(recorder) - .context("set global metrics recorder")?; - - ::tokio::spawn(async move { - let mut interval = ::tokio::time::interval(Duration::from_secs(5)); - - loop { - interval.tick().await; - - // Periodically render metrics to make sure - // idles are cleaned up - recorder_handle.render(); - } - }); - - exporter.await.context("run prometheus exporter") - }) - }) - .with_context(|| "spawn prometheus exporter worker")?; + let handle = aquatic_common::spawn_prometheus_endpoint( + config.statistics.prometheus_endpoint_address, + Some(Duration::from_secs( + config.cleaning.torrent_cleaning_interval * 2, + )), + )?; join_handles.push((WorkerType::Prometheus, handle)); }