diff --git a/Cargo.lock b/Cargo.lock index acd4ec9..2fc0003 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,7 @@ dependencies = [ "crossbeam-channel", "either", "hashbrown 0.8.2", + "histogram", "indexmap", "itoa", "log", @@ -211,6 +212,7 @@ dependencies = [ "crossbeam-channel", "either", "hashbrown 0.8.2", + "histogram", "indexmap", "log", "mimalloc", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 0a77fa9..dfc7cc2 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -21,6 +21,7 @@ aquatic_http_protocol = { path = "../aquatic_http_protocol" } crossbeam-channel = "0.4" either = "1" hashbrown = "0.8" +histogram = "0.6" indexmap = "1" itoa = "0.4" log = "0.4" diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index 8120f41..5285cb4 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -20,6 +20,7 @@ pub struct Config { pub protocol: ProtocolConfig, pub handlers: HandlerConfig, pub cleaning: CleaningConfig, + pub statistics: StatisticsConfig, pub privileges: PrivilegeConfig, } @@ -66,7 +67,6 @@ pub struct ProtocolConfig { } - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct HandlerConfig { @@ -89,6 +89,14 @@ pub struct CleaningConfig { } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct StatisticsConfig { + /// Print statistics this often (seconds). Don't print when set to zero. + pub interval: u64, +} + + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct PrivilegeConfig { @@ -111,6 +119,7 @@ impl Default for Config { protocol: ProtocolConfig::default(), handlers: HandlerConfig::default(), cleaning: CleaningConfig::default(), + statistics: StatisticsConfig::default(), privileges: PrivilegeConfig::default(), } } @@ -163,6 +172,15 @@ impl Default for CleaningConfig { } +impl Default for StatisticsConfig { + fn default() -> Self { + Self { + interval: 0, + } + } +} + + impl Default for PrivilegeConfig { fn default() -> Self { Self { diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 203c7ed..f8a52df 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -125,6 +125,21 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { })?; } + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new().name("statistics".to_string()).spawn(move || + loop { + ::std::thread::sleep(Duration::from_secs( + config.statistics.interval + )); + + tasks::print_statistics(&state); + } + ).expect("spawn statistics thread"); + } + Ok(()) } diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs index 6c04be6..cabc4a0 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/tasks.rs @@ -1,5 +1,7 @@ use std::time::Instant; +use histogram::Histogram; + use crate::common::*; @@ -26,4 +28,41 @@ fn clean_torrent_map( }); torrent_map.shrink_to_fit(); +} + + +pub fn print_statistics(state: &State){ + let mut peers_per_torrent = Histogram::new(); + + { + let torrents = &mut state.torrent_maps.lock(); + + for torrent in torrents.ipv4.values(){ + let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; + + if let Err(err) = peers_per_torrent.increment(num_peers){ + eprintln!("error incrementing peers_per_torrent histogram: {}", err) + } + } + for torrent in torrents.ipv6.values(){ + let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; + + if let Err(err) = peers_per_torrent.increment(num_peers){ + eprintln!("error incrementing peers_per_torrent histogram: {}", err) + } + } + } + + if peers_per_torrent.entries() != 0 { + println!( + "peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}", + peers_per_torrent.minimum().unwrap(), + peers_per_torrent.percentile(50.0).unwrap(), + peers_per_torrent.percentile(75.0).unwrap(), + peers_per_torrent.percentile(90.0).unwrap(), + peers_per_torrent.percentile(99.0).unwrap(), + peers_per_torrent.percentile(99.9).unwrap(), + peers_per_torrent.maximum().unwrap(), + ); + } } \ No newline at end of file diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 36594da..1114c57 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -21,6 +21,7 @@ aquatic_ws_protocol = { path = "../aquatic_ws_protocol" } crossbeam-channel = "0.4" either = "1" hashbrown = { version = "0.8", features = ["serde"] } +histogram = "0.6" indexmap = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_ws/src/lib/config.rs b/aquatic_ws/src/lib/config.rs index a499652..b37bfe6 100644 --- a/aquatic_ws/src/lib/config.rs +++ b/aquatic_ws/src/lib/config.rs @@ -20,6 +20,7 @@ pub struct Config { pub protocol: ProtocolConfig, pub handlers: HandlerConfig, pub cleaning: CleaningConfig, + pub statistics: StatisticsConfig, pub privileges: PrivilegeConfig, } @@ -81,6 +82,14 @@ pub struct CleaningConfig { } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct StatisticsConfig { + /// Print statistics this often (seconds). Don't print when set to zero. + pub interval: u64, +} + + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct PrivilegeConfig { @@ -103,6 +112,7 @@ impl Default for Config { protocol: ProtocolConfig::default(), handlers: HandlerConfig::default(), cleaning: CleaningConfig::default(), + statistics: StatisticsConfig::default(), privileges: PrivilegeConfig::default(), } } @@ -158,6 +168,15 @@ impl Default for CleaningConfig { } +impl Default for StatisticsConfig { + fn default() -> Self { + Self { + interval: 0, + } + } +} + + impl Default for PrivilegeConfig { fn default() -> Self { Self { diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index ee53da7..b5a29eb 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -127,6 +127,21 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { })?; } + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new().name("statistics".to_string()).spawn(move || + loop { + ::std::thread::sleep(Duration::from_secs( + config.statistics.interval + )); + + tasks::print_statistics(&state); + } + ).expect("spawn statistics thread"); + } + Ok(()) } diff --git a/aquatic_ws/src/lib/tasks.rs b/aquatic_ws/src/lib/tasks.rs index e91e5e4..159477f 100644 --- a/aquatic_ws/src/lib/tasks.rs +++ b/aquatic_ws/src/lib/tasks.rs @@ -1,5 +1,7 @@ use std::time::Instant; +use histogram::Histogram; + use crate::common::*; @@ -24,4 +26,41 @@ pub fn clean_torrents(state: &State){ clean_torrent_map(&mut torrent_maps.ipv4); clean_torrent_map(&mut torrent_maps.ipv6); +} + + +pub fn print_statistics(state: &State){ + let mut peers_per_torrent = Histogram::new(); + + { + let torrents = &mut state.torrent_maps.lock(); + + for torrent in torrents.ipv4.values(){ + let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; + + if let Err(err) = peers_per_torrent.increment(num_peers){ + eprintln!("error incrementing peers_per_torrent histogram: {}", err) + } + } + for torrent in torrents.ipv6.values(){ + let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; + + if let Err(err) = peers_per_torrent.increment(num_peers){ + eprintln!("error incrementing peers_per_torrent histogram: {}", err) + } + } + } + + if peers_per_torrent.entries() != 0 { + println!( + "peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}", + peers_per_torrent.minimum().unwrap(), + peers_per_torrent.percentile(50.0).unwrap(), + peers_per_torrent.percentile(75.0).unwrap(), + peers_per_torrent.percentile(90.0).unwrap(), + peers_per_torrent.percentile(99.0).unwrap(), + peers_per_torrent.percentile(99.9).unwrap(), + peers_per_torrent.maximum().unwrap(), + ); + } } \ No newline at end of file