udp: store prometheus exporter thread handle, periodically render

This commit is contained in:
Joakim Frostegård 2024-01-29 22:09:20 +01:00
parent 8f838098aa
commit 6dec985d45
3 changed files with 89 additions and 47 deletions

View file

@ -20,7 +20,7 @@ name = "aquatic_udp"
[features]
default = ["prometheus"]
# Export prometheus metrics
prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"]
prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"]
# Experimental io_uring support (Linux 6.0 or later required)
io-uring = ["dep:io-uring"]
# Experimental CPU pinning support
@ -59,6 +59,7 @@ tinytemplate = "1"
metrics = { version = "0.22", optional = true }
metrics-util = { version = "0.16", optional = true }
metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] }
tokio = { version = "1", optional = true, features = ["rt", "net", "time"] }
# io-uring feature
io-uring = { version = "0.6", optional = true }

View file

@ -136,26 +136,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = state.clone();
let config = config.clone();
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
PrometheusBuilder::new()
.idle_timeout(
MetricKindMask::ALL,
Some(Duration::from_secs(config.statistics.interval * 2)),
)
.with_http_listener(config.statistics.prometheus_endpoint_address)
.install()
.with_context(|| {
format!(
"Install prometheus endpoint on {}",
config.statistics.prometheus_endpoint_address
)
})?;
}
let handle = Builder::new()
.name("statistics".into())
.spawn(move || {
@ -174,6 +154,90 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Statistics, handle));
}
#[cfg(feature = "prometheus")]
if config.statistics.active() && config.statistics.run_prometheus_endpoint {
let config = config.clone();
let handle = Builder::new()
.name("prometheus".into())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
let rt = ::tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build prometheus tokio runtime")?;
rt.block_on(async {
let (recorder, exporter) = PrometheusBuilder::new()
.idle_timeout(
MetricKindMask::ALL,
Some(Duration::from_secs(config.statistics.interval * 2)),
)
.with_http_listener(config.statistics.prometheus_endpoint_address)
.build()
.context("build prometheus recorder and exporter")?;
::tokio::spawn(async move {
let mut interval = ::tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
// Periodically render metrics to make sure
// idles are cleaned up
recorder.handle().render();
}
});
exporter.await.context("run prometheus exporter")
})
})
.with_context(|| "spawn prometheus exporter worker")?;
join_handles.push((WorkerType::Prometheus, handle));
}
// Spawn signal handler thread
{
let config = config.clone();
let handle: JoinHandle<anyhow::Result<()>> = Builder::new()
.name("signals".into())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
})
.context("spawn signal worker")?;
join_handles.push((WorkerType::Signals, handle));
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
@ -182,32 +246,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::Util,
);
let handle: JoinHandle<anyhow::Result<()>> = Builder::new()
.name("signals".into())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
})
.context("spawn signal worker")?;
join_handles.push((WorkerType::Signals, handle));
loop {
for (i, (_, handle)) in join_handles.iter().enumerate() {
if handle.is_finished() {
@ -236,6 +274,7 @@ enum WorkerType {
Socket(usize),
Statistics,
Signals,
#[cfg(feature = "prometheus")]
Prometheus,
}
@ -246,6 +285,7 @@ impl Display for WorkerType {
Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index)),
Self::Statistics => f.write_str("Statistics worker"),
Self::Signals => f.write_str("Signals worker"),
#[cfg(feature = "prometheus")]
Self::Prometheus => f.write_str("Prometheus worker"),
}
}