diff --git a/Cargo.lock b/Cargo.lock
index 0dcb145..6764f9f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -231,6 +231,8 @@ dependencies = [
"hex",
"libc",
"log",
+ "metrics",
+ "metrics-exporter-prometheus",
"mimalloc",
"mio",
"num-format",
diff --git a/README.md b/README.md
index e99def3..abdcf8f 100644
--- a/README.md
+++ b/README.md
@@ -68,12 +68,13 @@ Make adjustments to the files. You will likely want to adjust `address`
(listening address) under the `network` section.
Note that both `aquatic_http` and `aquatic_ws` require configuring certificate
-and private key files to run over TLS (which is optional for `aquatic_ws`).
+and private key files to run over TLS. `aquatic_http` __only__ runs over TLS.
More details are available in the respective configuration files.
#### Workers
-To increase performance, number of worker threads can be increased. Recommended proportions based on number of available CPU cores:
+To increase performance, number of worker threads can be increased.
+Recommended proportions based on number of physical CPU cores:
@@ -125,17 +126,18 @@ in emitting of an info-level log message.
#### Prometheus
-`aquatic_http` and `aquatic_ws` support exporting [Prometheus](https://prometheus.io/) metrics.
+Exporting [Prometheus](https://prometheus.io/) metrics is supported. Activate
+the endpoint in the configuration file:
-Pass the `prometheus` feature when building:
+##### aquatic_udp
-```sh
-. ./scripts/env-native-cpu-without-avx-512
-cargo build --release -p aquatic_ws --features "prometheus"
-cargo build --release -p aquatic_http --features "prometheus"
+```toml
+[statistics]
+run_prometheus_endpoint = true
+prometheus_endpoint_address = "0.0.0.0:9000"
```
-Then activate the prometheus endpoint in the configuration file:
+##### aquatic_http and aquatic_ws
```toml
[metrics]
diff --git a/TODO.md b/TODO.md
index cdebd4d..e86bada 100644
--- a/TODO.md
+++ b/TODO.md
@@ -2,7 +2,6 @@
## High priority
-* udp prometheus metrics
* ws: wait for crates release of glommio with membarrier fix (PR #558)
* Release new version
* More non-CI integration tests?
diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml
index de3640c..89be7df 100644
--- a/aquatic_http/Cargo.toml
+++ b/aquatic_http/Cargo.toml
@@ -17,6 +17,7 @@ name = "aquatic_http"
name = "aquatic_http"
[features]
+default = ["prometheus"]
prometheus = ["metrics", "metrics-exporter-prometheus"]
metrics = ["dep:metrics"]
diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml
index c561bd9..505595a 100644
--- a/aquatic_udp/Cargo.toml
+++ b/aquatic_udp/Cargo.toml
@@ -17,7 +17,9 @@ name = "aquatic_udp"
name = "aquatic_udp"
[features]
+default = ["prometheus"]
cpu-pinning = ["aquatic_common/hwloc"]
+prometheus = ["metrics", "metrics-exporter-prometheus"]
[dependencies]
aquatic_common.workspace = true
@@ -35,6 +37,8 @@ hdrhistogram = "7"
hex = "0.4"
libc = "0.2"
log = "0.4"
+metrics = { version = "0.20", optional = true }
+metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] }
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.8", features = ["net", "os-poll"] }
num-format = "0.4"
diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs
index f932395..cf310d9 100644
--- a/aquatic_udp/src/config.rs
+++ b/aquatic_udp/src/config.rs
@@ -1,6 +1,7 @@
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
+use cfg_if::cfg_if;
use serde::Deserialize;
use aquatic_common::cli::LogLevel;
@@ -153,11 +154,26 @@ pub struct StatisticsConfig {
pub write_html_to_file: bool,
/// Path to save HTML file to
pub html_file_path: PathBuf,
+ /// Run a prometheus endpoint
+ #[cfg(feature = "prometheus")]
+ pub run_prometheus_endpoint: bool,
+ /// Address to run prometheus endpoint on
+ #[cfg(feature = "prometheus")]
+ pub prometheus_endpoint_address: SocketAddr,
}
impl StatisticsConfig {
- pub fn active(&self) -> bool {
- (self.interval != 0) & (self.print_to_stdout | self.write_html_to_file)
+ cfg_if! {
+ if #[cfg(feature = "prometheus")] {
+ pub fn active(&self) -> bool {
+ (self.interval != 0) &
+ (self.print_to_stdout | self.write_html_to_file | self.run_prometheus_endpoint)
+ }
+ } else {
+ pub fn active(&self) -> bool {
+ (self.interval != 0) & (self.print_to_stdout | self.write_html_to_file)
+ }
+ }
}
}
@@ -169,6 +185,10 @@ impl Default for StatisticsConfig {
print_to_stdout: false,
write_html_to_file: false,
html_file_path: "tmp/statistics.html".into(),
+ #[cfg(feature = "prometheus")]
+ run_prometheus_endpoint: false,
+ #[cfg(feature = "prometheus")]
+ prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)),
}
}
}
diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs
index a6b22f8..10b65ef 100644
--- a/aquatic_udp/src/lib.rs
+++ b/aquatic_udp/src/lib.rs
@@ -141,6 +141,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = state.clone();
let config = config.clone();
+ #[cfg(feature = "prometheus")]
+ if config.statistics.run_prometheus_endpoint {
+ use metrics_exporter_prometheus::PrometheusBuilder;
+
+ PrometheusBuilder::new()
+ .with_http_listener(config.statistics.prometheus_endpoint_address)
+ .install()
+ .with_context(|| {
+ format!(
+ "Install prometheus endpoint on {}",
+ config.statistics.prometheus_endpoint_address
+ )
+ })?;
+ }
+
Builder::new()
.name("statistics".into())
.spawn(move || {
diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs
index a741ebc..3e3ed97 100644
--- a/aquatic_udp/src/workers/statistics/collector.rs
+++ b/aquatic_udp/src/workers/statistics/collector.rs
@@ -14,15 +14,19 @@ pub struct StatisticsCollector {
last_update: Instant,
pending_histograms: Vec>,
last_complete_histogram: PeerHistogramStatistics,
+ #[cfg(feature = "prometheus")]
+ ip_version: String,
}
impl StatisticsCollector {
- pub fn new(shared: Arc) -> Self {
+ pub fn new(shared: Arc, #[cfg(feature = "prometheus")] ip_version: String) -> Self {
Self {
shared,
last_update: Instant::now(),
pending_histograms: Vec::new(),
last_complete_histogram: Default::default(),
+ #[cfg(feature = "prometheus")]
+ ip_version,
}
}
@@ -35,16 +39,31 @@ impl StatisticsCollector {
}
}
- pub fn collect_from_shared(&mut self) -> CollectedStatistics {
+ pub fn collect_from_shared(
+ &mut self,
+ #[cfg(feature = "prometheus")] config: &Config,
+ ) -> CollectedStatistics {
let requests_received = Self::fetch_and_reset(&self.shared.requests_received);
let responses_sent_connect = Self::fetch_and_reset(&self.shared.responses_sent_connect);
let responses_sent_announce = Self::fetch_and_reset(&self.shared.responses_sent_announce);
let responses_sent_scrape = Self::fetch_and_reset(&self.shared.responses_sent_scrape);
let responses_sent_error = Self::fetch_and_reset(&self.shared.responses_sent_error);
+
let bytes_received = Self::fetch_and_reset(&self.shared.bytes_received);
let bytes_sent = Self::fetch_and_reset(&self.shared.bytes_sent);
- let num_torrents = Self::sum_atomic_usizes(&self.shared.torrents);
- let num_peers = Self::sum_atomic_usizes(&self.shared.peers);
+
+ let num_torrents_by_worker: Vec = self
+ .shared
+ .torrents
+ .iter()
+ .map(|n| n.load(Ordering::Relaxed))
+ .collect();
+ let num_peers_by_worker: Vec = self
+ .shared
+ .peers
+ .iter()
+ .map(|n| n.load(Ordering::Relaxed))
+ .collect();
let elapsed = {
let now = Instant::now();
@@ -56,13 +75,81 @@ impl StatisticsCollector {
elapsed
};
- let requests_per_second = requests_received / elapsed;
- let responses_per_second_connect = responses_sent_connect / elapsed;
- let responses_per_second_announce = responses_sent_announce / elapsed;
- let responses_per_second_scrape = responses_sent_scrape / elapsed;
- let responses_per_second_error = responses_sent_error / elapsed;
- let bytes_received_per_second = bytes_received / elapsed;
- let bytes_sent_per_second = bytes_sent / elapsed;
+ #[cfg(feature = "prometheus")]
+ if config.statistics.run_prometheus_endpoint {
+ ::metrics::counter!(
+ "aquatic_requests_total",
+ requests_received.try_into().unwrap(),
+ "ip_version" => self.ip_version.clone(),
+ );
+ ::metrics::counter!(
+ "aquatic_responses_total",
+ responses_sent_connect.try_into().unwrap(),
+ "type" => "connect",
+ "ip_version" => self.ip_version.clone(),
+ );
+ ::metrics::counter!(
+ "aquatic_responses_total",
+ responses_sent_announce.try_into().unwrap(),
+ "type" => "announce",
+ "ip_version" => self.ip_version.clone(),
+ );
+ ::metrics::counter!(
+ "aquatic_responses_total",
+ responses_sent_scrape.try_into().unwrap(),
+ "type" => "scrape",
+ "ip_version" => self.ip_version.clone(),
+ );
+ ::metrics::counter!(
+ "aquatic_responses_total",
+ responses_sent_error.try_into().unwrap(),
+ "type" => "error",
+ "ip_version" => self.ip_version.clone(),
+ );
+ ::metrics::counter!(
+ "aquatic_rx_bytes",
+ bytes_received.try_into().unwrap(),
+ "ip_version" => self.ip_version.clone(),
+ );
+ ::metrics::counter!(
+ "aquatic_tx_bytes",
+ bytes_sent.try_into().unwrap(),
+ "ip_version" => self.ip_version.clone(),
+ );
+
+ for (worker_index, n) in num_torrents_by_worker.iter().copied().enumerate() {
+ ::metrics::gauge!(
+ "aquatic_torrents",
+ n as f64,
+ "ip_version" => self.ip_version.clone(),
+ "worker_index" => worker_index.to_string(),
+ );
+ }
+ for (worker_index, n) in num_peers_by_worker.iter().copied().enumerate() {
+ ::metrics::gauge!(
+ "aquatic_peers",
+ n as f64,
+ "ip_version" => self.ip_version.clone(),
+ "worker_index" => worker_index.to_string(),
+ );
+ }
+
+ if config.statistics.extended {
+ self.last_complete_histogram
+ .update_metrics(self.ip_version.clone());
+ }
+ }
+
+ let num_peers: usize = num_peers_by_worker.into_iter().sum();
+ let num_torrents: usize = num_torrents_by_worker.into_iter().sum();
+
+ let requests_per_second = requests_received as f64 / elapsed;
+ let responses_per_second_connect = responses_sent_connect as f64 / elapsed;
+ let responses_per_second_announce = responses_sent_announce as f64 / elapsed;
+ let responses_per_second_scrape = responses_sent_scrape as f64 / elapsed;
+ let responses_per_second_error = responses_sent_error as f64 / elapsed;
+ let bytes_received_per_second = bytes_received as f64 / elapsed;
+ let bytes_sent_per_second = bytes_sent as f64 / elapsed;
let responses_per_second_total = responses_per_second_connect
+ responses_per_second_announce
@@ -89,12 +176,8 @@ impl StatisticsCollector {
}
}
- fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {
- values.iter().map(|n| n.load(Ordering::Relaxed)).sum()
- }
-
- fn fetch_and_reset(atomic: &AtomicUsize) -> f64 {
- atomic.fetch_and(0, Ordering::Relaxed) as f64
+ fn fetch_and_reset(atomic: &AtomicUsize) -> usize {
+ atomic.fetch_and(0, Ordering::Relaxed)
}
}
@@ -115,7 +198,7 @@ pub struct CollectedStatistics {
#[derive(Clone, Debug, Serialize, Default)]
pub struct PeerHistogramStatistics {
- pub p0: u64,
+ pub min: u64,
pub p10: u64,
pub p20: u64,
pub p30: u64,
@@ -127,13 +210,14 @@ pub struct PeerHistogramStatistics {
pub p90: u64,
pub p95: u64,
pub p99: u64,
- pub p100: u64,
+ pub p999: u64,
+ pub max: u64,
}
impl PeerHistogramStatistics {
fn new(h: Histogram) -> Self {
Self {
- p0: h.value_at_percentile(0.0),
+ min: h.min(),
p10: h.value_at_percentile(10.0),
p20: h.value_at_percentile(20.0),
p30: h.value_at_percentile(30.0),
@@ -145,7 +229,90 @@ impl PeerHistogramStatistics {
p90: h.value_at_percentile(90.0),
p95: h.value_at_percentile(95.0),
p99: h.value_at_percentile(99.0),
- p100: h.value_at_percentile(100.0),
+ p999: h.value_at_percentile(99.9),
+ max: h.max(),
}
}
+
+ #[cfg(feature = "prometheus")]
+ fn update_metrics(&self, ip_version: String) {
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.min as f64,
+ "type" => "max",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p10 as f64,
+ "type" => "p10",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p20 as f64,
+ "type" => "p20",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p30 as f64,
+ "type" => "p30",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p40 as f64,
+ "type" => "p40",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p50 as f64,
+ "type" => "p50",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p60 as f64,
+ "type" => "p60",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p70 as f64,
+ "type" => "p70",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p80 as f64,
+ "type" => "p80",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p90 as f64,
+ "type" => "p90",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p99 as f64,
+ "type" => "p99",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.p999 as f64,
+ "type" => "p99.9",
+ "ip_version" => ip_version.clone(),
+ );
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ self.max as f64,
+ "type" => "max",
+ "ip_version" => ip_version.clone(),
+ );
+ }
}
diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs
index e7b24a6..f54f08b 100644
--- a/aquatic_udp/src/workers/statistics/mod.rs
+++ b/aquatic_udp/src/workers/statistics/mod.rs
@@ -57,8 +57,16 @@ pub fn run_statistics_worker(
None
};
- let mut ipv4_collector = StatisticsCollector::new(shared_state.statistics_ipv4);
- let mut ipv6_collector = StatisticsCollector::new(shared_state.statistics_ipv6);
+ let mut ipv4_collector = StatisticsCollector::new(
+ shared_state.statistics_ipv4,
+ #[cfg(feature = "prometheus")]
+ "4".into(),
+ );
+ let mut ipv6_collector = StatisticsCollector::new(
+ shared_state.statistics_ipv6,
+ #[cfg(feature = "prometheus")]
+ "6".into(),
+ );
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
@@ -70,8 +78,14 @@ pub fn run_statistics_worker(
}
}
- let statistics_ipv4 = ipv4_collector.collect_from_shared();
- let statistics_ipv6 = ipv6_collector.collect_from_shared();
+ let statistics_ipv4 = ipv4_collector.collect_from_shared(
+ #[cfg(feature = "prometheus")]
+ &config,
+ );
+ let statistics_ipv6 = ipv6_collector.collect_from_shared(
+ #[cfg(feature = "prometheus")]
+ &config,
+ );
if config.statistics.print_to_stdout {
println!("General:");
@@ -151,7 +165,7 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) {
" peers per torrent (updated every {}s)",
config.cleaning.torrent_cleaning_interval
);
- println!(" min {:>10}", statistics.peer_histogram.p0);
+ println!(" min {:>10}", statistics.peer_histogram.min);
println!(" p10 {:>10}", statistics.peer_histogram.p10);
println!(" p20 {:>10}", statistics.peer_histogram.p20);
println!(" p30 {:>10}", statistics.peer_histogram.p30);
@@ -163,7 +177,8 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) {
println!(" p90 {:>10}", statistics.peer_histogram.p90);
println!(" p95 {:>10}", statistics.peer_histogram.p95);
println!(" p99 {:>10}", statistics.peer_histogram.p99);
- println!(" max {:>10}", statistics.peer_histogram.p100);
+ println!(" p99.9 {:>10}", statistics.peer_histogram.p999);
+ println!(" max {:>10}", statistics.peer_histogram.max);
}
}
diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html
index a62f42d..2007d01 100644
--- a/aquatic_udp/templates/statistics.html
+++ b/aquatic_udp/templates/statistics.html
@@ -76,7 +76,7 @@
Updated every { peer_update_interval } seconds
| Minimum |
- { ipv4.peer_histogram.p0 } |
+ { ipv4.peer_histogram.min } |
| 10th percentile |
@@ -122,9 +122,13 @@
99th percentile |
{ ipv4.peer_histogram.p99 } |
+
+ | 99.9th percentile |
+ { ipv4.peer_histogram.p999 } |
+
| Maximum |
- { ipv4.peer_histogram.p100 } |
+ { ipv4.peer_histogram.max } |
@@ -188,7 +192,7 @@
Updated every { peer_update_interval } seconds
| Minimum |
- { ipv6.peer_histogram.p0 } |
+ { ipv6.peer_histogram.min } |
| 10th percentile |
@@ -234,9 +238,13 @@
99th percentile |
{ ipv6.peer_histogram.p99 } |
+
+ | 99.9th percentile |
+ { ipv6.peer_histogram.p999 } |
+
| Maximum |
- { ipv6.peer_histogram.p100 } |
+ { ipv6.peer_histogram.max } |
diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml
index a3da870..6e35944 100644
--- a/aquatic_ws/Cargo.toml
+++ b/aquatic_ws/Cargo.toml
@@ -17,6 +17,7 @@ name = "aquatic_ws"
name = "aquatic_ws"
[features]
+default = ["prometheus"]
prometheus = ["metrics", "metrics-exporter-prometheus"]
metrics = ["dep:metrics"]