diff --git a/Cargo.lock b/Cargo.lock index 8c6a434..cea9d22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + [[package]] name = "aho-corasick" version = "1.0.1" @@ -220,6 +229,7 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", + "compact_str", "constant_time_eq", "crossbeam-channel", "getrandom", @@ -231,6 +241,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "mio", "num-format", @@ -885,6 +896,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "env_logger" version = "0.8.4" @@ -1672,12 +1689,16 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "111cb375987443c3de8d503580b536f77dc8416d32db62d9456db5d93bd7ac47" dependencies = [ + "aho-corasick 0.7.20", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.2", + "indexmap", "metrics", "num_cpus", + "ordered-float", "quanta", + "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -1735,6 +1756,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.23.2" @@ -1876,6 +1906,15 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "ordered-float" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.5.0" @@ -2068,6 +2107,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2145,7 +2194,7 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.1", "memchr", "regex-syntax", ] diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 88bc11b..7f779bc 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -19,7 +19,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] cpu-pinning = ["aquatic_common/hwloc"] -prometheus = ["metrics", "metrics-exporter-prometheus"] +prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"] io-uring = ["dep:io-uring"] [dependencies] @@ -30,6 +30,7 @@ aquatic_udp_protocol.workspace = true anyhow = "1" blake3 = "1" cfg-if = "1" +compact_str = "0.7" constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" @@ -40,6 +41,7 @@ io-uring = { version = "0.6", optional = true } libc = "0.2" log = "0.4" metrics = { version = "0.21", optional = true } +metrics-util = { version = "0.15", optional = true } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 1eba011..2be8729 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -161,11 +161,16 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, - /// Enable extended statistics (on peers per torrent and on peer clients). - /// Also, see `prometheus_peer_clients`. + /// Collect statistics on number of peers per torrent /// - /// Will increase time taken for request handling and torrent cleaning. - pub extended: bool, + /// Will increase time taken for torrent cleaning. + pub torrent_peer_histograms: bool, + /// Collect statistics on peer clients. + /// + /// Also, see `prometheus_peer_id_prefixes`. + /// + /// Quite costly when it comes to CPU and RAM. + pub peer_clients: bool, /// Print statistics to standard output pub print_to_stdout: bool, /// Save statistics as HTML to a file @@ -178,14 +183,13 @@ pub struct StatisticsConfig { /// Address to run prometheus endpoint on #[cfg(feature = "prometheus")] pub prometheus_endpoint_address: SocketAddr, - /// Serve information on all peer clients on the prometheus endpoint. - /// Requires extended statistics to be activated. + /// Serve information on all peer id prefixes on the prometheus endpoint. + /// Requires `peer_clients` to be activated. /// - /// NOT RECOMMENDED. May consume lots of CPU and RAM since data on every - /// single peer client will be kept around by the endpoint, even those - /// which are no longer in the swarm. + /// May consume quite a bit of CPU and RAM, since data on every single peer + /// client will be reported continuously on the endpoint #[cfg(feature = "prometheus")] - pub prometheus_peer_clients: bool, + pub prometheus_peer_id_prefixes: bool, } impl StatisticsConfig { @@ -207,7 +211,8 @@ impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 5, - extended: false, + torrent_peer_histograms: false, + peer_clients: false, print_to_stdout: false, write_html_to_file: false, html_file_path: "tmp/statistics.html".into(), @@ -216,7 +221,7 @@ impl Default for StatisticsConfig { #[cfg(feature = "prometheus")] prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), #[cfg(feature = "prometheus")] - prometheus_peer_clients: false, + prometheus_peer_id_prefixes: false, } } } diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index a869c4f..6f8ffe6 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -4,6 +4,7 @@ pub mod workers; use std::collections::BTreeMap; use std::thread::Builder; +use std::time::Duration; use anyhow::Context; use crossbeam_channel::{bounded, unbounded}; @@ -143,8 +144,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { #[cfg(feature = "prometheus")] if config.statistics.run_prometheus_endpoint { use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; PrometheusBuilder::new() + .idle_timeout( + MetricKindMask::ALL, + Some(Duration::from_secs(config.statistics.interval * 2)), + ) .with_http_listener(config.statistics.prometheus_endpoint_address) .install() .with_context(|| { diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs index 3e3ed97..33c0a4e 100644 --- a/aquatic_udp/src/workers/statistics/collector.rs +++ b/aquatic_udp/src/workers/statistics/collector.rs @@ -134,7 +134,7 @@ impl StatisticsCollector { ); } - if config.statistics.extended { + if config.statistics.torrent_peer_histograms { self.last_complete_histogram .update_metrics(self.ip_version.clone()); } diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index 90a730b..6d1ee2c 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -6,7 +6,8 @@ use std::time::{Duration, Instant}; use anyhow::Context; use aquatic_common::{IndexMap, PanicSentinel}; -use aquatic_udp_protocol::PeerClient; +use aquatic_udp_protocol::{PeerClient, PeerId}; +use compact_str::CompactString; use crossbeam_channel::Receiver; use num_format::{Locale, ToFormattedString}; use serde::Serialize; @@ -46,6 +47,17 @@ pub fn run_statistics_worker( shared_state: State, statistics_receiver: Receiver, ) { + let process_peer_client_data = { + let mut collect = config.statistics.write_html_to_file; + + #[cfg(feature = "prometheus")] + { + collect |= config.statistics.run_prometheus_endpoint; + } + + collect & config.statistics.peer_clients + }; + let opt_tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); @@ -71,7 +83,7 @@ pub fn run_statistics_worker( "6".into(), ); - let mut peer_clients: IndexMap = IndexMap::default(); + let mut peers: IndexMap = IndexMap::default(); loop { let start_time = Instant::now(); @@ -81,45 +93,16 @@ pub fn run_statistics_worker( StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), StatisticsMessage::PeerAdded(peer_id) => { - let peer_client = peer_id.client(); + if process_peer_client_data { + let peer_client = peer_id.client(); + let prefix = peer_id.first_8_bytes_hex(); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint - && config.statistics.prometheus_peer_clients - { - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => peer_client.to_string(), - "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), - ); + peers.insert(peer_id, (peer_client, prefix)); } - - *peer_clients.entry(peer_client).or_insert(0) += 1; } StatisticsMessage::PeerRemoved(peer_id) => { - let peer_client = peer_id.client(); - - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint - && config.statistics.prometheus_peer_clients - { - ::metrics::decrement_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => peer_client.to_string(), - "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), - ); - } - - if let Some(count) = peer_clients.get_mut(&peer_client) { - if *count == 1 { - drop(count); - - peer_clients.remove(&peer_client); - } else { - *count -= 1; - } + if process_peer_client_data { + peers.remove(&peer_id); } } } @@ -134,6 +117,60 @@ pub fn run_statistics_worker( &config, ); + let peer_clients = if process_peer_client_data { + let mut clients: IndexMap = IndexMap::default(); + + #[cfg(feature = "prometheus")] + let mut prefixes: IndexMap = IndexMap::default(); + + for (peer_client, prefix) in peers.values() { + *clients.entry(peer_client.to_owned()).or_insert(0) += 1; + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_id_prefixes + { + *prefixes.entry(prefix.to_owned()).or_insert(0) += 1; + } + } + + clients.sort_unstable_by(|_, a, _, b| b.cmp(a)); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_id_prefixes + { + for (prefix, count) in prefixes { + ::metrics::gauge!( + "aquatic_peer_id_prefixes", + count as f64, + "prefix_hex" => prefix.to_string(), + ); + } + } + + let mut client_vec = Vec::with_capacity(clients.len()); + + for (client, count) in clients { + if config.statistics.write_html_to_file { + client_vec.push((client.to_string(), count.to_formatted_string(&Locale::en))); + } + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_peer_clients", + count as f64, + "client" => client.to_string(), + ); + } + } + + client_vec + } else { + Vec::new() + }; + if config.statistics.print_to_stdout { println!("General:"); println!( @@ -154,29 +191,11 @@ pub fn run_statistics_worker( } if let Some(tt) = opt_tt.as_ref() { - let mut peer_clients = if config.statistics.extended { - peer_clients.iter().collect() - } else { - Vec::new() - }; - - peer_clients.sort_unstable_by(|a, b| b.1.cmp(a.1)); - - let peer_clients = peer_clients - .into_iter() - .map(|(peer_client, count)| { - ( - peer_client.to_string(), - count.to_formatted_string(&Locale::en), - ) - }) - .collect(); - let template_data = TemplateData { stylesheet: STYLESHEET_CONTENTS.to_string(), ipv4_active: config.network.ipv4_active(), ipv6_active: config.network.ipv6_active(), - extended_active: config.statistics.extended, + extended_active: config.statistics.torrent_peer_histograms, ipv4: statistics_ipv4, ipv6: statistics_ipv6, last_updated: OffsetDateTime::now_utc() @@ -191,6 +210,8 @@ pub fn run_statistics_worker( } } + peers.shrink_to_fit(); + if let Some(time_remaining) = Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed()) { @@ -236,7 +257,7 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) { statistics.num_peers, config.cleaning.torrent_cleaning_interval ); - if config.statistics.extended { + if config.statistics.torrent_peer_histograms { println!( " peers per torrent (updated every {}s)", config.cleaning.torrent_cleaning_interval diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 2c864e8..777dba4 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -82,7 +82,7 @@ impl TorrentData { PeerStatus::Stopped => self.peers.remove(&peer_id), }; - if config.statistics.extended { + if config.statistics.peer_clients { match (status, opt_removed_peer.is_some()) { // We added a new peer (PeerStatus::Leeching | PeerStatus::Seeding, false) => { @@ -158,7 +158,7 @@ impl TorrentData { if peer.is_seeder { self.num_seeders -= 1; } - if config.statistics.extended { + if config.statistics.peer_clients { if let Err(_) = statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)) { @@ -201,7 +201,8 @@ impl TorrentMap { ) -> (usize, Option>) { let mut num_peers = 0; - let mut opt_histogram: Option> = if config.statistics.extended { + let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms + { match Histogram::new(3) { Ok(histogram) => Some(histogram), Err(err) => {