From 3ac12b947f389f66bbea29a42d9e7eaf191560bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 20:27:43 +0100 Subject: [PATCH 1/8] ws: add prometheus support (active connections, requests, responses) --- Cargo.lock | 117 ++++++++++++++++++++++++++++++- aquatic_ws/Cargo.toml | 6 ++ aquatic_ws/src/config.rs | 31 ++++++++ aquatic_ws/src/lib.rs | 15 ++++ aquatic_ws/src/workers/socket.rs | 36 +++++++++- scripts/run-aquatic-ws.sh | 2 +- 6 files changed, 201 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4ca6a8..3b4b6b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,8 @@ dependencies = [ "hashbrown 0.13.1", "httparse", "log", + "metrics", + "metrics-exporter-prometheus", "mimalloc", "privdrop", "quickcheck", @@ -1168,7 +1170,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -1230,7 +1232,7 @@ dependencies = [ "scoped-tls", "scopeguard", "signal-hook", - "sketches-ddsketch", + "sketches-ddsketch 0.1.3", "smallvec", "socket2 0.3.19", "tracing", @@ -1507,6 +1509,12 @@ dependencies = [ "memoffset 0.5.6", ] +[[package]] +name = "ipnet" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" + [[package]] name = "itertools" version = "0.10.5" @@ -1606,6 +1614,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "matchit" version = "0.5.0" @@ -1656,6 +1673,63 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849" +dependencies = [ + "ahash 0.7.6", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" +dependencies = [ + "hyper", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "thiserror", + "tokio", +] + +[[package]] +name = "metrics-macros" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "metrics-util" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.12.3", + "metrics", + "num_cpus", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "sketches-ddsketch 0.2.0", +] + [[package]] name = "mimalloc" version = "0.1.34" @@ -1694,7 +1768,7 @@ checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -2100,6 +2174,22 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.2+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quickcheck" version = "1.0.3" @@ -2171,6 +2261,15 @@ dependencies = [ "rand", ] +[[package]] +name = "raw-cpuid" +version = "10.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.6.1" @@ -2478,6 +2577,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee" +[[package]] +name = "sketches-ddsketch" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7" + [[package]] name = "slab" version = "0.4.7" @@ -3080,6 +3185,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index de37067..a3da870 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -16,6 +16,10 @@ name = "aquatic_ws" [[bin]] name = "aquatic_ws" +[features] +prometheus = ["metrics", "metrics-exporter-prometheus"] +metrics = ["dep:metrics"] + [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } aquatic_toml_config.workspace = true @@ -31,6 +35,8 @@ glommio = "0.7" hashbrown = { version = "0.13", features = ["serde"] } httparse = "1" log = "0.4" +metrics = { version = "0.20", optional = true } +metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 2964459..e222d89 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -28,6 +28,8 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, + #[cfg(feature = "metrics")] + pub metrics: MetricsConfig, pub cpu_pinning: CpuPinningConfigAsc, } @@ -42,6 +44,8 @@ impl Default for Config { cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), + #[cfg(feature = "metrics")] + metrics: Default::default(), cpu_pinning: Default::default(), } } @@ -142,6 +146,33 @@ impl Default for CleaningConfig { } } +#[cfg(feature = "metrics")] +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct MetricsConfig { + /// Run a prometheus endpoint + pub run_prometheus_endpoint: bool, + /// Address to run prometheus endpoint on + pub prometheus_endpoint_address: SocketAddr, +} + +#[cfg(feature = "metrics")] +impl Default for MetricsConfig { + fn default() -> Self { + Self { + run_prometheus_endpoint: false, + prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + } + } +} + +#[cfg(feature = "metrics")] +impl MetricsConfig { + pub fn active(&self) -> bool { + self.run_prometheus_endpoint + } +} + #[cfg(test)] mod tests { use super::Config; diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index a7c2e83..1c296c8 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -35,6 +35,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + #[cfg(feature = "prometheus")] + if config.metrics.run_prometheus_endpoint { + use metrics_exporter_prometheus::PrometheusBuilder; + + PrometheusBuilder::new() + .with_http_listener(config.metrics.prometheus_endpoint_address) + .install() + .with_context(|| { + format!( + "Install prometheus endpoint on {}", + config.metrics.prometheus_endpoint_address + ) + })?; + } + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index fc47cac..460d7a8 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -152,6 +152,9 @@ pub async fn run_socket_worker( ::log::trace!("accepting stream, assigning id {}", key); let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { + #[cfg(feature = "metrics")] + ::metrics::increment_gauge!("aquatic_active_connections", 1.0); + if let Err(err) = run_connection( config.clone(), access_list, @@ -173,6 +176,9 @@ pub async fn run_socket_worker( // Clean up after closed connection + #[cfg(feature = "metrics")] + ::metrics::decrement_gauge!("aquatic_active_connections", 1.0); + // Remove reference in separate statement to avoid // multiple RefCell borrows let opt_reference = connection_slab.borrow_mut().try_remove(key); @@ -501,6 +507,9 @@ impl ConnectionReader { async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> { match in_message { InMessage::AnnounceRequest(announce_request) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_requests_total", "type" => "announce"); + let info_hash = announce_request.info_hash; if self @@ -571,6 +580,9 @@ impl ConnectionReader { } } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_requests_total", "type" => "scrape"); + let info_hashes = if let Some(info_hashes) = info_hashes { info_hashes } else { @@ -642,10 +654,18 @@ impl ConnectionReader { info_hash, }); - self.out_message_sender + let result = self + .out_message_sender .send((self.make_connection_meta(None).into(), out_message)) .await - .map_err(|err| anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err)) + .map_err(|err| { + anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err) + }); + + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_responses_total", "type" => "error"); + + result } fn make_connection_meta(&self, pending_scrape_id: Option) -> InMessageMeta { @@ -728,6 +748,18 @@ impl ConnectionWriter { match result { Ok(Ok(())) => { + #[cfg(feature = "metrics")] + let out_message_type = match &out_message { + OutMessage::Offer(_) => "offer", + OutMessage::Answer(_) => "offer_answer", + OutMessage::AnnounceResponse(_) => "announce", + OutMessage::ScrapeResponse(_) => "scrape", + OutMessage::ErrorResponse(_) => "error", + }; + + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_responses_total", "type" => out_message_type); + self.connection_slab .borrow_mut() .get_mut(self.connection_id.0) diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 298a3d3..d6681ce 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --profile "release-debug" -p aquatic_ws -- $@ +cargo run --profile "release-debug" -p aquatic_ws --features "prometheus" -- $@ From 8b7c3c481c68a49d0c858737c7a26dfb778de764 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 20:51:12 +0100 Subject: [PATCH 2/8] ws: add metrics for number of peers --- aquatic_ws/src/workers/swarm.rs | 59 +++++++++++++++++---------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index b180fb8..861413e 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -58,7 +58,6 @@ type PeerMap = IndexMap; struct TorrentData { pub peers: PeerMap, pub num_seeders: usize, - pub num_leechers: usize, } impl Default for TorrentData { @@ -67,7 +66,6 @@ impl Default for TorrentData { Self { peers: Default::default(), num_seeders: 0, - num_leechers: 0, } } } @@ -77,11 +75,13 @@ impl TorrentData { if let Some(peer) = self.peers.remove(&peer_id) { if peer.seeder { self.num_seeders -= 1; - } else { - self.num_leechers -= 1; } } } + + pub fn num_leechers(&self) -> usize { + self.peers.len() - self.num_seeders + } } type TorrentMap = AmortizedIndexMap; @@ -102,8 +102,8 @@ impl TorrentMaps { let mut access_list_cache = create_access_list_cache(access_list); let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now, "4"); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now, "6"); } fn clean_torrent_map( @@ -111,7 +111,10 @@ impl TorrentMaps { access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, + ip_version: &'static str, ) { + let mut total_num_peers = 0u64; + torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -121,26 +124,28 @@ impl TorrentMaps { } let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { let keep = peer.valid_until.valid(now); - if !keep { - if peer.seeder { - *num_seeders -= 1; - } else { - *num_leechers -= 1; - } + if (!keep) & peer.seeder { + *num_seeders -= 1; } keep }); + total_num_peers += torrent_data.peers.len() as u64; + !torrent_data.peers.is_empty() }); torrent_map.shrink_to_fit(); + + let total_num_peers = total_num_peers as f64; + + #[cfg(feature = "metrics")] + ::metrics::gauge!("aquatic_peers", total_num_peers, "ip_version" => ip_version); } } @@ -307,11 +312,12 @@ fn handle_announce_request( request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { - let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version { - torrent_maps.ipv4.entry(request.info_hash).or_default() - } else { - torrent_maps.ipv6.entry(request.info_hash).or_default() - }; + let (torrent_data, ip_version): (&mut TorrentData, &'static str) = + if let IpVersion::V4 = request_sender_meta.ip_version { + (torrent_maps.ipv4.entry(request.info_hash).or_default(), "4") + } else { + (torrent_maps.ipv6.entry(request.info_hash).or_default(), "6") + }; // If there is already a peer with this peer_id, check that connection id // is same as that of request sender. Otherwise, ignore request. Since @@ -334,8 +340,6 @@ fn handle_announce_request( let opt_removed_peer = match peer_status { PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - let peer = Peer { connection_id: request_sender_meta.connection_id, consumer_id: request_sender_meta.out_message_consumer_id, @@ -360,14 +364,13 @@ fn handle_announce_request( PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id), }; - match opt_removed_peer.map(|peer| peer.seeder) { - Some(false) => { - torrent_data.num_leechers -= 1; - } - Some(true) => { + if let Some(removed_peer) = opt_removed_peer { + if removed_peer.seeder { torrent_data.num_seeders -= 1; } - _ => {} + } else { + #[cfg(feature = "metrics")] + ::metrics::increment_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version); } } @@ -437,7 +440,7 @@ fn handle_announce_request( action: AnnounceAction, info_hash: request.info_hash, complete: torrent_data.num_seeders, - incomplete: torrent_data.num_leechers, + incomplete: torrent_data.num_leechers(), announce_interval: config.protocol.peer_announce_interval, }); @@ -475,7 +478,7 @@ fn handle_scrape_request( let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, + incomplete: torrent_data.num_leechers(), }; out_message.files.insert(info_hash, stats); From 32253a37df6d10b8abbcde9f3b3dd62504798426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 21:06:34 +0100 Subject: [PATCH 3/8] ws: prometheus: store ip version (v4/v6) for connections and messages --- aquatic_ws/src/workers/socket.rs | 63 +++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 460d7a8..a63d7df 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -153,7 +153,11 @@ pub async fn run_socket_worker( let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { #[cfg(feature = "metrics")] - ::metrics::increment_gauge!("aquatic_active_connections", 1.0); + ::metrics::increment_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_to_metrics_str(ip_version) + ); if let Err(err) = run_connection( config.clone(), @@ -177,7 +181,11 @@ pub async fn run_socket_worker( // Clean up after closed connection #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!("aquatic_active_connections", 1.0); + ::metrics::decrement_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_to_metrics_str(ip_version) + ); // Remove reference in separate statement to avoid // multiple RefCell borrows @@ -425,6 +433,7 @@ async fn run_stream_agnostic_connection< pending_scrape_slab, connection_id, server_start_instant, + ip_version, }; let result = writer.run_out_message_loop().await; @@ -508,7 +517,11 @@ impl ConnectionReader { match in_message { InMessage::AnnounceRequest(announce_request) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_requests_total", "type" => "announce"); + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => ip_version_to_metrics_str(self.ip_version) + ); let info_hash = announce_request.info_hash; @@ -581,7 +594,11 @@ impl ConnectionReader { } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_requests_total", "type" => "scrape"); + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => ip_version_to_metrics_str(self.ip_version) + ); let info_hashes = if let Some(info_hashes) = info_hashes { info_hashes @@ -663,7 +680,11 @@ impl ConnectionReader { }); #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_responses_total", "type" => "error"); + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "error", + "ip_version" => ip_version_to_metrics_str(self.ip_version) + ); result } @@ -686,6 +707,7 @@ struct ConnectionWriter { pending_scrape_slab: Rc>>, server_start_instant: ServerStartInstant, connection_id: ConnectionId, + ip_version: IpVersion, } impl ConnectionWriter { @@ -749,16 +771,21 @@ impl ConnectionWriter { match result { Ok(Ok(())) => { #[cfg(feature = "metrics")] - let out_message_type = match &out_message { - OutMessage::Offer(_) => "offer", - OutMessage::Answer(_) => "offer_answer", - OutMessage::AnnounceResponse(_) => "announce", - OutMessage::ScrapeResponse(_) => "scrape", - OutMessage::ErrorResponse(_) => "error", - }; + { + let out_message_type = match &out_message { + OutMessage::Offer(_) => "offer", + OutMessage::Answer(_) => "offer_answer", + OutMessage::AnnounceResponse(_) => "announce", + OutMessage::ScrapeResponse(_) => "scrape", + OutMessage::ErrorResponse(_) => "error", + }; - #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_responses_total", "type" => out_message_type); + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => out_message_type, + "ip_version" => ip_version_to_metrics_str(self.ip_version), + ); + } self.connection_slab .borrow_mut() @@ -839,3 +866,11 @@ fn create_tcp_listener( Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } + +#[cfg(feature = "metrics")] +fn ip_version_to_metrics_str(ip_version: IpVersion) -> &'static str { + match ip_version { + IpVersion::V4 => "4", + IpVersion::V6 => "6", + } +} From 56fafda2346bc1e7257e3f80dcb34754c0e881f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 21:20:46 +0100 Subject: [PATCH 4/8] ws: add torrent count metrics --- aquatic_ws/src/config.rs | 3 +++ aquatic_ws/src/workers/swarm.rs | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index e222d89..6c46e33 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -154,6 +154,8 @@ pub struct MetricsConfig { pub run_prometheus_endpoint: bool, /// Address to run prometheus endpoint on pub prometheus_endpoint_address: SocketAddr, + /// Update metrics for torrent count this often, in seconds + pub torrent_count_update_interval: u64, } #[cfg(feature = "metrics")] @@ -162,6 +164,7 @@ impl Default for MetricsConfig { Self { run_prometheus_endpoint: false, prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + torrent_count_update_interval: 10, } } } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 861413e..362acc8 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -180,6 +180,27 @@ pub async fn run_swarm_worker( })() })); + // Periodically update torrent count metrics + #[cfg(feature = "metrics")] + TimerActionRepeat::repeat(enclose!((config, torrents) move || { + enclose!((config, torrents) move || async move { + let torrents = torrents.borrow_mut(); + + ::metrics::gauge!( + "aquatic_torrents", + torrents.ipv4.len() as f64, + "ip_version" => "4" + ); + ::metrics::gauge!( + "aquatic_torrents", + torrents.ipv6.len() as f64, + "ip_version" => "6" + ); + + Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) + })() + })); + let mut handles = Vec::new(); for (_, receiver) in control_message_receivers.streams() { From ac4f2ade5d17977b868b21d1b4d706a22f68560c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 21:33:24 +0100 Subject: [PATCH 5/8] ws: remove unused MetricsConfig::active --- aquatic_ws/src/config.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 6c46e33..482a2ec 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -169,13 +169,6 @@ impl Default for MetricsConfig { } } -#[cfg(feature = "metrics")] -impl MetricsConfig { - pub fn active(&self) -> bool { - self.run_prometheus_endpoint - } -} - #[cfg(test)] mod tests { use super::Config; From 20d1bd73eca2f7f05cb57019cecc48386087526d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 21:33:59 +0100 Subject: [PATCH 6/8] ws: improve docs for MetricsConfig --- aquatic_ws/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 482a2ec..42b34f7 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -154,7 +154,7 @@ pub struct MetricsConfig { pub run_prometheus_endpoint: bool, /// Address to run prometheus endpoint on pub prometheus_endpoint_address: SocketAddr, - /// Update metrics for torrent count this often, in seconds + /// Update metrics for torrent count this often (seconds) pub torrent_count_update_interval: u64, } From 9dd45fd74c031a1e5c7507bb153ea4a95c7abdb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 21:57:21 +0100 Subject: [PATCH 7/8] README: document ws prometheus support --- README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/README.md b/README.md index 5cf42dd..25dae04 100644 --- a/README.md +++ b/README.md @@ -232,6 +232,29 @@ proxied to IPv4 requests, and IPv6 requests to IPv6 requests. More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf). Please note that request workers have been renamed to swarm workers. +#### Prometheus + +`aquatic_ws` supports exporting [Prometheus](https://prometheus.io/) metrics. + +Pass the `prometheus` feature when building: + +```sh +. ./scripts/env-native-cpu-without-avx-512 +cargo build --release -p aquatic_ws --features "prometheus" +``` + +Then activate the prometheus endpoint in the configuration file: + +```toml +[metrics] +# Run a prometheus endpoint +run_prometheus_endpoint = true +# Address to run prometheus endpoint on +prometheus_endpoint_address = "0.0.0.0:9000" +# Update metrics for torrent count this often (seconds) +torrent_count_update_interval = 10 +``` + ## Load testing There are load test binaries for all protocols. They use a CLI structure From 9479e56d60ef15fe5b1717f3343239f1d0e4d86e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 21:57:51 +0100 Subject: [PATCH 8/8] CI: build ws with prometheus feature --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d12ff7b..ed37403 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: run: | cargo build --verbose -p aquatic_udp --features "cpu-pinning" cargo build --verbose -p aquatic_http - cargo build --verbose -p aquatic_ws + cargo build --verbose -p aquatic_ws --features "prometheus" build-macos: runs-on: macos-latest