From 32884aae36f86a342dd84560adc6622dceeb2a66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 18 Jan 2023 21:04:25 +0100 Subject: [PATCH] http: add peer and torrent count metrics --- aquatic_http/src/config.rs | 27 +++++++ aquatic_http/src/workers/swarm.rs | 126 ++++++++++++++++++++++-------- 2 files changed, 120 insertions(+), 33 deletions(-) diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 995d51c..7b66866 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -29,6 +29,8 @@ pub struct Config { pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, pub cpu_pinning: CpuPinningConfigAsc, + #[cfg(feature = "metrics")] + pub metrics: MetricsConfig, } impl Default for Config { @@ -43,6 +45,8 @@ impl Default for Config { privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), cpu_pinning: Default::default(), + #[cfg(feature = "metrics")] + metrics: Default::default(), } } } @@ -128,6 +132,29 @@ impl Default for CleaningConfig { } } +#[cfg(feature = "metrics")] +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct MetricsConfig { + /// Run a prometheus endpoint + pub run_prometheus_endpoint: bool, + /// Address to run prometheus endpoint on + pub prometheus_endpoint_address: SocketAddr, + /// Update metrics for torrent count this often (seconds) + pub torrent_count_update_interval: u64, +} + +#[cfg(feature = "metrics")] +impl Default for MetricsConfig { + fn default() -> Self { + Self { + run_prometheus_endpoint: false, + prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + torrent_count_update_interval: 10, + } + } +} + #[cfg(test)] mod tests { use super::Config; diff --git a/aquatic_http/src/workers/swarm.rs b/aquatic_http/src/workers/swarm.rs index e1b056d..c2a2c6a 100644 --- a/aquatic_http/src/workers/swarm.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -27,10 +27,23 @@ use aquatic_http_protocol::response::*; use crate::common::*; use crate::config::Config; -pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { + #[cfg(feature = "metrics")] + fn ip_version_str() -> &'static str; +} -impl Ip for Ipv4Addr {} -impl Ip for Ipv6Addr {} +impl Ip for Ipv4Addr { + #[cfg(feature = "metrics")] + fn ip_version_str() -> &'static str { + "4" + } +} +impl Ip for Ipv6Addr { + #[cfg(feature = "metrics")] + fn ip_version_str() -> &'static str { + "6" + } +} #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum PeerStatus { @@ -59,8 +72,8 @@ impl PeerStatus { pub struct Peer { pub ip_address: I, pub port: u16, - pub status: PeerStatus, pub valid_until: ValidUntil, + pub seeder: bool, } impl Peer { @@ -83,7 +96,6 @@ pub type PeerMap = IndexMap, Peer>; pub struct TorrentData { pub peers: PeerMap, pub num_seeders: usize, - pub num_leechers: usize, } impl Default for TorrentData { @@ -92,11 +104,16 @@ impl Default for TorrentData { Self { peers: Default::default(), num_seeders: 0, - num_leechers: 0, } } } +impl TorrentData { + fn num_leechers(&self) -> usize { + self.peers.len() - self.num_seeders + } +} + pub type TorrentMap = AmortizedIndexMap>; #[derive(Default)] @@ -126,6 +143,8 @@ impl TorrentMaps { torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, ) { + let mut total_num_peers = 0; + torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -135,29 +154,31 @@ impl TorrentMaps { } let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { let keep = peer.valid_until.valid(now); - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; + if (!keep) & peer.seeder { + *num_seeders -= 1; } keep }); + total_num_peers += torrent_data.peers.len() as u64; + !torrent_data.peers.is_empty() }); + let total_num_peers = total_num_peers as f64; + + #[cfg(feature = "metrics")] + ::metrics::gauge!( + "aquatic_peers", + total_num_peers, + "ip_version" => I::ip_version_str(), + ); + torrent_map.shrink_to_fit(); } } @@ -198,6 +219,27 @@ pub async fn run_swarm_worker( })() })); + // Periodically update torrent count metrics + #[cfg(feature = "metrics")] + TimerActionRepeat::repeat(enclose!((config, torrents) move || { + enclose!((config, torrents) move || async move { + let torrents = torrents.borrow_mut(); + + ::metrics::gauge!( + "aquatic_torrents", + torrents.ipv4.len() as f64, + "ip_version" => "4" + ); + ::metrics::gauge!( + "aquatic_torrents", + torrents.ipv6.len() as f64, + "ip_version" => "6" + ); + + Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) + })() + })); + let mut handles = Vec::new(); for (_, receiver) in request_receivers.streams() { @@ -337,13 +379,6 @@ pub fn upsert_peer_and_get_response_peers( let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); - let peer = Peer { - ip_address: peer_ip_address, - port: request.port, - status: peer_status, - valid_until, - }; - let ip_or_key = request .key .map(Either::Right) @@ -356,24 +391,49 @@ pub fn upsert_peer_and_get_response_peers( let opt_removed_peer = match peer_status { PeerStatus::Leeching => { - torrent_data.num_leechers += 1; + let peer = Peer { + ip_address: peer_ip_address, + port: request.port, + valid_until, + seeder: false, + }; torrent_data.peers.insert(peer_map_key.clone(), peer) } PeerStatus::Seeding => { torrent_data.num_seeders += 1; + let peer = Peer { + ip_address: peer_ip_address, + port: request.port, + valid_until, + seeder: true, + }; + torrent_data.peers.insert(peer_map_key.clone(), peer) } PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), }; - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; + if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { + torrent_data.num_seeders -= 1; + } + + #[cfg(feature = "metrics")] + match peer_status { + PeerStatus::Stopped if opt_removed_peer.is_some() => { + ::metrics::decrement_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str() + ); } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; + PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { + ::metrics::increment_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str() + ); } _ => {} } @@ -397,7 +457,7 @@ pub fn upsert_peer_and_get_response_peers( ( torrent_data.num_seeders, - torrent_data.num_leechers, + torrent_data.num_leechers(), response_peers, ) } @@ -427,7 +487,7 @@ pub fn handle_scrape_request( let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, + incomplete: torrent_data.num_leechers(), }; response.files.insert(info_hash, stats); @@ -439,7 +499,7 @@ pub fn handle_scrape_request( let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, + incomplete: torrent_data.num_leechers(), }; response.files.insert(info_hash, stats);