From 6675126d08ddf3fe100a8f82898609602f3d457e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 14 Jun 2023 11:09:23 +0200 Subject: [PATCH] ws: add prometheus peer client metrics --- CHANGELOG.md | 6 +++ Cargo.lock | 2 + aquatic_ws/Cargo.toml | 4 +- aquatic_ws/src/config.rs | 5 +++ aquatic_ws/src/lib.rs | 12 ++++++ aquatic_ws/src/workers/socket.rs | 67 ++++++++++++++++++++++++++++++++ 6 files changed, 95 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a05ce30..3d6e8f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,12 @@ * Add support for reporting peer client information +### aquatic_ws + +#### Added + +* Add support for reporting peer client information + ## 0.8.0 - 2023-03-17 ### General diff --git a/Cargo.lock b/Cargo.lock index cea9d22..5136f89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -310,6 +310,7 @@ version = "0.8.0" dependencies = [ "anyhow", "aquatic_common", + "aquatic_peer_id", "aquatic_toml_config", "aquatic_ws_protocol", "async-tungstenite", @@ -323,6 +324,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "privdrop", "quickcheck", diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 7a65872..90b656c 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -19,10 +19,11 @@ name = "aquatic_ws" [features] default = ["prometheus"] prometheus = ["metrics", "metrics-exporter-prometheus"] -metrics = ["dep:metrics"] +metrics = ["dep:metrics", "metrics-util"] [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } +aquatic_peer_id.workspace = true aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true @@ -37,6 +38,7 @@ hashbrown = { version = "0.13", features = ["serde"] } httparse = "1" log = "0.4" metrics = { version = "0.21", optional = true } +metrics-util = { version = "0.15", optional = true } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 42b34f7..36bb828 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -156,6 +156,10 @@ pub struct MetricsConfig { pub prometheus_endpoint_address: SocketAddr, /// Update metrics for torrent count this often (seconds) pub torrent_count_update_interval: u64, + /// Collect data on peer clients. + /// + /// Expect a certain CPU hit + pub peer_clients: bool, } #[cfg(feature = "metrics")] @@ -165,6 +169,7 @@ impl Default for MetricsConfig { run_prometheus_endpoint: false, prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), torrent_count_update_interval: 10, + peer_clients: false, } } } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 1263175..423b006 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -3,6 +3,7 @@ pub mod config; pub mod workers; use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; @@ -39,7 +40,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { if config.metrics.run_prometheus_endpoint { use metrics_exporter_prometheus::PrometheusBuilder; + let idle_timeout = config + .cleaning + .connection_cleaning_interval + .max(config.cleaning.torrent_cleaning_interval) + .max(config.metrics.torrent_count_update_interval) + * 2; + PrometheusBuilder::new() + .idle_timeout( + metrics_util::MetricKindMask::GAUGE, + Some(Duration::from_secs(idle_timeout)), + ) .with_http_listener(config.metrics.prometheus_endpoint_address) .install() .with_context(|| { diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index f0c76ae..8559612 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -11,6 +11,7 @@ use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, A use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{PanicSentinel, ServerStartInstant}; +use aquatic_peer_id::PeerClient; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -51,6 +52,7 @@ struct ConnectionReference { valid_until: ValidUntil, announced_info_hashes: HashMap, ip_version: IpVersion, + opt_peer_client: Option, } pub async fn run_socket_worker( @@ -154,6 +156,7 @@ pub async fn run_socket_worker( ), announced_info_hashes: Default::default(), ip_version, + opt_peer_client: None, }); ::log::trace!("accepting stream, assigning id {}", key); @@ -221,6 +224,15 @@ pub async fn run_socket_worker( .await .unwrap(); } + + #[cfg(feature = "prometheus")] + if let Some(peer_client) = reference.opt_peer_client { + ::metrics::decrement_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => peer_client.to_string(), + ); + } } }), tq_regular) .unwrap() @@ -246,6 +258,18 @@ async fn clean_connections( connection_slab.borrow_mut().retain(|_, reference| { if reference.valid_until.valid(now) { + #[cfg(feature = "prometheus")] + if let Some(peer_client) = &reference.opt_peer_client { + // As long as connection is still alive, increment peer client + // gauge by zero to prevent it from being removed due to + // idleness + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 0.0, + "client" => peer_client.to_string(), + ); + } + true } else { if let Some(ref handle) = reference.task_handle { @@ -258,6 +282,31 @@ async fn clean_connections( connection_slab.borrow_mut().shrink_to_fit(); + #[cfg(feature = "metrics")] + { + // Increment gauges by zero to prevent them from being removed due to + // idleness + + let worker_index = WORKER_INDEX.with(|index| index.get()).to_string(); + + if config.network.address.is_ipv4() || !config.network.only_ipv6 { + ::metrics::increment_gauge!( + "aquatic_active_connections", + 0.0, + "ip_version" => "4", + "worker_index" => worker_index.clone(), + ); + } + if config.network.address.is_ipv6() { + ::metrics::increment_gauge!( + "aquatic_active_connections", + 0.0, + "ip_version" => "6", + "worker_index" => worker_index, + ); + } + } + Some(Duration::from_secs( config.cleaning.connection_cleaning_interval, )) @@ -575,6 +624,24 @@ impl ConnectionReader { } } Entry::Vacant(entry) => { + #[cfg(feature = "prometheus")] + if self.config.metrics.run_prometheus_endpoint + && self.config.metrics.peer_clients + && connection_reference.opt_peer_client.is_none() + { + let client = + aquatic_peer_id::PeerId(announce_request.peer_id.0) + .client(); + + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + ); + + connection_reference.opt_peer_client = Some(client); + }; + entry.insert(announce_request.peer_id); } }