diff --git a/Cargo.lock b/Cargo.lock index 7e83c12..f80d310 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -215,6 +215,7 @@ dependencies = [ "crossbeam-channel", "getrandom", "hashbrown 0.12.3", + "hdrhistogram", "hex", "libc", "log", @@ -702,6 +703,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.3.6" @@ -973,6 +983,16 @@ dependencies = [ "instant", ] +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "float-cmp" version = "0.9.0" @@ -1275,6 +1295,20 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "hdrhistogram" +version = "7.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0" +dependencies = [ + "base64", + "byteorder", + "crossbeam-channel", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.7" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 51b2ef4..8e9e2a2 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -31,6 +31,7 @@ constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" hashbrown = { version = "0.12", default-features = false } +hdrhistogram = "7" hex = "0.4" libc = "0.2" log = "0.4" diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 8880dfe..49aca5e 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -9,6 +9,7 @@ use crossbeam_channel::{Sender, TrySendError}; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::CanonicalSocketAddr; use aquatic_udp_protocol::*; +use hdrhistogram::Histogram; use crate::config::Config; @@ -132,6 +133,11 @@ impl PeerStatus { } } +pub enum StatisticsMessage { + Ipv4PeerHistogram(Histogram), + Ipv6PeerHistogram(Histogram), +} + pub struct Statistics { pub requests_received: AtomicUsize, pub responses_sent_connect: AtomicUsize, diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 06a6822..b37b106 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -142,6 +142,8 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, + /// Enable extended statistics (on peers per torrent) + pub extended: bool, /// Print statistics to standard output pub print_to_stdout: bool, /// Save statistics as HTML to a file @@ -160,6 +162,7 @@ impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 5, + extended: false, print_to_stdout: false, write_html_to_file: false, html_file_path: "tmp/statistics.html".into(), diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 1259076..2a735b1 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -41,6 +41,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_senders = Vec::new(); let mut response_receivers = BTreeMap::new(); + let (statistics_sender, statistics_receiver) = unbounded(); + let server_start_instant = ServerStartInstant::new(); for i in 0..config.swarm_workers { @@ -71,6 +73,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); let response_sender = ConnectedResponseSender::new(response_senders.clone()); + let statistics_sender = statistics_sender.clone(); Builder::new() .name(format!("swarm-{:02}", i + 1)) @@ -90,6 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { server_start_instant, request_receiver, response_sender, + statistics_sender, SwarmWorkerIndex(i), ) }) @@ -148,7 +152,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker(sentinel, config, state); + workers::statistics::run_statistics_worker( + sentinel, + config, + state, + statistics_receiver, + ); }) .with_context(|| "spawn statistics worker")?; } diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index b54cbe8..74cd2cb 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -1,3 +1,4 @@ +use std::fmt::Display; use std::fs::File; use std::io::Write; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -6,6 +7,8 @@ use std::time::{Duration, Instant}; use anyhow::Context; use aquatic_common::PanicSentinel; +use crossbeam_channel::Receiver; +use hdrhistogram::Histogram; use num_format::{Locale, ToFormattedString}; use serde::Serialize; use time::format_description::well_known::Rfc2822; @@ -23,6 +26,49 @@ const STYLESHEET_CONTENTS: &str = concat!( "" ); +#[derive(Clone, Copy, Debug, Serialize)] +struct PeerHistogramStatistics { + p0: u64, + p10: u64, + p20: u64, + p30: u64, + p40: u64, + p50: u64, + p60: u64, + p70: u64, + p80: u64, + p90: u64, + p95: u64, + p99: u64, + p100: u64, +} + +impl PeerHistogramStatistics { + fn new(h: &Histogram) -> Self { + Self { + p0: h.value_at_percentile(0.0), + p10: h.value_at_percentile(10.0), + p20: h.value_at_percentile(20.0), + p30: h.value_at_percentile(30.0), + p40: h.value_at_percentile(40.0), + p50: h.value_at_percentile(50.0), + p60: h.value_at_percentile(60.0), + p70: h.value_at_percentile(70.0), + p80: h.value_at_percentile(80.0), + p90: h.value_at_percentile(90.0), + p95: h.value_at_percentile(95.0), + p99: h.value_at_percentile(99.0), + p100: h.value_at_percentile(100.0), + } + } +} + +impl Display for PeerHistogramStatistics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "p0: {}, p10: {}, p20: {}, p30: {}, p40: {}, p50: {}, p60: {}, p70: {}, p80: {}, p90: {}, p95: {}, p99: {}, p100: {}", self.p0, self.p10, self.p20, self.p30, self.p40, self.p50, self.p60, self.p70, self.p80, self.p90, self.p95, self.p99, self.p100) + } +} + #[derive(Clone, Copy, Debug)] struct CollectedStatistics { requests_per_second: f64, @@ -34,10 +80,15 @@ struct CollectedStatistics { bytes_sent_per_second: f64, num_torrents: usize, num_peers: usize, + peer_histogram: PeerHistogramStatistics, } impl CollectedStatistics { - fn from_shared(statistics: &Arc, last: &mut Instant) -> Self { + fn from_shared( + statistics: &Arc, + peer_histogram: &Histogram, + last: &mut Instant, + ) -> Self { let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64; let responses_sent_connect = statistics .responses_sent_connect @@ -56,6 +107,8 @@ impl CollectedStatistics { let num_torrents = Self::sum_atomic_usizes(&statistics.torrents); let num_peers = Self::sum_atomic_usizes(&statistics.peers); + let peer_histogram = PeerHistogramStatistics::new(peer_histogram); + let now = Instant::now(); let elapsed = (now - *last).as_secs_f64(); @@ -72,6 +125,7 @@ impl CollectedStatistics { bytes_sent_per_second: bytes_sent / elapsed, num_torrents, num_peers, + peer_histogram, } } @@ -107,6 +161,7 @@ impl Into for CollectedStatistics { tx_mbits: format!("{:.2}", tx_mbits), num_torrents: self.num_torrents.to_formatted_string(&Locale::en), num_peers: self.num_peers.to_formatted_string(&Locale::en), + peer_histogram: self.peer_histogram, } } } @@ -123,6 +178,7 @@ struct FormattedStatistics { tx_mbits: String, num_torrents: String, num_peers: String, + peer_histogram: PeerHistogramStatistics, } #[derive(Debug, Serialize)] @@ -130,13 +186,47 @@ struct TemplateData { stylesheet: String, ipv4_active: bool, ipv6_active: bool, + extended_active: bool, ipv4: FormattedStatistics, ipv6: FormattedStatistics, last_updated: String, peer_update_interval: String, } -pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: State) { +struct PeerHistograms { + pending: Vec>, + last_complete: Histogram, +} + +impl Default for PeerHistograms { + fn default() -> Self { + Self { + pending: Vec::new(), + last_complete: Histogram::new(3).expect("create peer histogram"), + } + } +} + +impl PeerHistograms { + fn update(&mut self, config: &Config, histogram: Histogram) { + self.pending.push(histogram); + + if self.pending.len() == config.swarm_workers { + self.last_complete = self.pending.drain(..).sum(); + } + } + + fn current(&self) -> &Histogram { + &self.last_complete + } +} + +pub fn run_statistics_worker( + _sentinel: PanicSentinel, + config: Config, + state: State, + statistics_receiver: Receiver, +) { let tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); @@ -154,13 +244,31 @@ pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: St let mut last_ipv4 = Instant::now(); let mut last_ipv6 = Instant::now(); + let mut peer_histograms_ipv4 = PeerHistograms::default(); + let mut peer_histograms_ipv6 = PeerHistograms::default(); + loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - let statistics_ipv4 = - CollectedStatistics::from_shared(&state.statistics_ipv4, &mut last_ipv4).into(); - let statistics_ipv6 = - CollectedStatistics::from_shared(&state.statistics_ipv6, &mut last_ipv6).into(); + for message in statistics_receiver.try_iter() { + match message { + StatisticsMessage::Ipv4PeerHistogram(h) => peer_histograms_ipv4.update(&config, h), + StatisticsMessage::Ipv6PeerHistogram(h) => peer_histograms_ipv6.update(&config, h), + } + } + + let statistics_ipv4 = CollectedStatistics::from_shared( + &state.statistics_ipv4, + peer_histograms_ipv4.current(), + &mut last_ipv4, + ) + .into(); + let statistics_ipv6 = CollectedStatistics::from_shared( + &state.statistics_ipv6, + peer_histograms_ipv6.current(), + &mut last_ipv6, + ) + .into(); if config.statistics.print_to_stdout { println!("General:"); @@ -183,6 +291,7 @@ pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: St stylesheet: STYLESHEET_CONTENTS.to_string(), ipv4_active: config.network.ipv4_active(), ipv6_active: config.network.ipv6_active(), + extended_active: config.statistics.extended, ipv4: statistics_ipv4, ipv6: statistics_ipv6, last_updated: OffsetDateTime::now_utc() @@ -230,6 +339,26 @@ fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) { " number of peers: {} (updated every {} seconds)", statistics.num_peers, config.cleaning.torrent_cleaning_interval ); + + if config.statistics.extended { + println!( + " peers per torrent (updated every {} seconds):", + config.cleaning.torrent_cleaning_interval + ); + println!(" min {:>10}", statistics.peer_histogram.p0); + println!(" p10 {:>10}", statistics.peer_histogram.p10); + println!(" p20 {:>10}", statistics.peer_histogram.p20); + println!(" p30 {:>10}", statistics.peer_histogram.p30); + println!(" p40 {:>10}", statistics.peer_histogram.p40); + println!(" p50 {:>10}", statistics.peer_histogram.p50); + println!(" p60 {:>10}", statistics.peer_histogram.p60); + println!(" p70 {:>10}", statistics.peer_histogram.p70); + println!(" p80 {:>10}", statistics.peer_histogram.p80); + println!(" p90 {:>10}", statistics.peer_histogram.p90); + println!(" p95 {:>10}", statistics.peer_histogram.p95); + println!(" p99 {:>10}", statistics.peer_histogram.p99); + println!(" max {:>10}", statistics.peer_histogram.p100); + } } fn save_html_to_file( diff --git a/aquatic_udp/src/workers/swarm/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs index f80f192..dbf2b03 100644 --- a/aquatic_udp/src/workers/swarm/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -7,6 +7,7 @@ use std::time::Instant; use aquatic_common::ServerStartInstant; use crossbeam_channel::Receiver; +use crossbeam_channel::Sender; use rand::{rngs::SmallRng, SeedableRng}; use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil}; @@ -25,6 +26,7 @@ pub fn run_swarm_worker( server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, response_sender: ConnectedResponseSender, + statistics_sender: Sender, worker_index: SwarmWorkerIndex, ) { let mut torrents = TorrentMaps::default(); @@ -86,15 +88,26 @@ pub fn run_swarm_worker( peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - let (ipv4, ipv6) = torrents.clean_and_get_num_peers( + let (ipv4, ipv6) = torrents.clean_and_get_statistics( &config, &state.access_list, server_start_instant, ); if config.statistics.active() { - state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0].store(ipv6, Ordering::Release); + state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); + + if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err) + } + } + if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err) + } + } } last_cleaning = now; diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 35d6392..5870ebe 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -10,6 +10,7 @@ use aquatic_common::{ }; use aquatic_udp_protocol::*; +use hdrhistogram::Histogram; use rand::prelude::SmallRng; use crate::common::*; @@ -140,14 +141,28 @@ pub struct TorrentMap(pub AmortizedIndexMap>); impl TorrentMap { /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - fn clean_and_get_num_peers( + fn clean_and_get_statistics( &mut self, + config: &Config, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, now: SecondsSinceServerStart, - ) -> usize { + ) -> (usize, Option>) { let mut num_peers = 0; + let mut opt_histogram: Option> = if config.statistics.extended { + match Histogram::new(3) { + Ok(histogram) => Some(histogram), + Err(err) => { + ::log::error!("Couldn't create peer histogram: {:#}", err); + + None + } + } + } else { + None + }; + self.0.retain(|info_hash, torrent| { if !access_list_cache .load() @@ -160,12 +175,27 @@ impl TorrentMap { num_peers += torrent.peers.len(); + match opt_histogram { + Some(ref mut histogram) if torrent.peers.len() != 0 => { + let n = torrent + .peers + .len() + .try_into() + .expect("Couldn't fit usize into u64"); + + if let Err(err) = histogram.record(n) { + ::log::error!("Couldn't record {} to histogram: {:#}", n, err); + } + } + _ => (), + } + !torrent.peers.is_empty() }); self.0.shrink_to_fit(); - num_peers + (num_peers, opt_histogram) } pub fn num_torrents(&self) -> usize { @@ -189,18 +219,25 @@ impl Default for TorrentMaps { impl TorrentMaps { /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - pub fn clean_and_get_num_peers( + pub fn clean_and_get_statistics( &mut self, config: &Config, access_list: &Arc, server_start_instant: ServerStartInstant, - ) -> (usize, usize) { + ) -> ( + (usize, Option>), + (usize, Option>), + ) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; let now = server_start_instant.seconds_elapsed(); - let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now); - let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now); + let ipv4 = self + .ipv4 + .clean_and_get_statistics(config, &mut cache, mode, now); + let ipv6 = self + .ipv6 + .clean_and_get_statistics(config, &mut cache, mode, now); (ipv4, ipv6) } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 66f3d9e..896e7b4 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -50,6 +50,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { let (response_sender, response_receiver) = unbounded(); let response_sender = ConnectedResponseSender::new(vec![response_sender]); + let (statistics_sender, _) = unbounded(); let server_start_instant = ServerStartInstant::new(); @@ -65,6 +66,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { server_start_instant, request_receiver, response_sender, + statistics_sender, SwarmWorkerIndex(0), ) });