diff --git a/Cargo.lock b/Cargo.lock index a61454d..114c02b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,6 +335,7 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", + "crossbeam-channel", "hdrhistogram", "mimalloc", "quickcheck", diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index 076810d..8cce143 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -25,6 +25,7 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" +crossbeam-channel = "0.5" hdrhistogram = "7" mimalloc = { version = "0.1", default-features = false } rand_distr = "0.4" diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7a17374..847a24c 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::IndexMap; use aquatic_udp_protocol::*; #[derive(Clone)] @@ -19,8 +20,13 @@ pub struct SharedStatistics { } pub struct Peer { + pub announce_info_hash_index: usize, pub announce_info_hash: InfoHash, pub announce_port: Port, pub scrape_info_hash_indices: Box<[usize]>, pub socket_index: u8, } + +pub enum StatisticsMessage { + ResponsesPerInfoHash(IndexMap), +} diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 4ea4144..5dbbd33 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -50,6 +50,12 @@ impl Default for Config { } } +impl aquatic_common::cli::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} + #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct NetworkConfig { diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index bc24709..cd0d2bf 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -9,6 +9,7 @@ use std::time::{Duration, Instant}; use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::IndexMap; use aquatic_udp_protocol::{InfoHash, Port}; +use crossbeam_channel::{unbounded, Receiver}; use hdrhistogram::Histogram; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; @@ -22,11 +23,7 @@ use common::*; use config::Config; use worker::*; -impl aquatic_common::cli::Config for Config { - fn get_log_level(&self) -> Option { - Some(self.log_level) - } -} +const PERCENTILES: &[f64] = &[10.0, 25.0, 50.0, 75.0, 90.0, 95.0, 99.0, 99.9, 100.0]; pub fn run(config: Config) -> ::anyhow::Result<()> { if config.requests.weight_announce @@ -51,6 +48,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { statistics: Arc::new(SharedStatistics::default()), }; + let (statistics_sender, statistics_receiver) = unbounded(); + // Start workers for (i, peers) in (0..config.workers).zip(peers_by_worker) { @@ -65,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let addr = SocketAddr::new(ip, 0); let config = config.clone(); let state = state.clone(); + let statistics_sender = statistics_sender.clone(); Builder::new().name("load-test".into()).spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -75,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - Worker::run(config, state, peers, addr) + Worker::run(config, state, statistics_sender, peers, addr) })?; } @@ -87,12 +87,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - monitor_statistics(state, &config); + monitor_statistics(state, &config, statistics_receiver); Ok(()) } -fn monitor_statistics(state: LoadTestState, config: &Config) { +fn monitor_statistics( + state: LoadTestState, + config: &Config, + statistics_receiver: Receiver, +) { let mut report_avg_connect: Vec = Vec::new(); let mut report_avg_announce: Vec = Vec::new(); let mut report_avg_scrape: Vec = Vec::new(); @@ -108,6 +112,21 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let time_elapsed = loop { thread::sleep(Duration::from_secs(INTERVAL)); + let mut opt_responses_per_info_hash: Option> = + config.peer_histogram.then_some(Default::default()); + + for message in statistics_receiver.try_iter() { + match message { + StatisticsMessage::ResponsesPerInfoHash(data) => { + if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_mut() { + for (k, v) in data { + *responses_per_info_hash.entry(k).or_default() += v; + } + } + } + } + } + let requests = fetch_and_reset(&state.statistics.requests); let response_peers = fetch_and_reset(&state.statistics.response_peers); let responses_connect = fetch_and_reset(&state.statistics.responses_connect); @@ -151,6 +170,20 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { peers_per_announce_response ); + if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_ref() { + let mut histogram = Histogram::::new(2).unwrap(); + + for num_responses in responses_per_info_hash.values().copied() { + histogram.record(num_responses).unwrap(); + } + + println!("Announce responses per info hash:"); + + for p in PERCENTILES { + println!(" - p{}: {}", p, histogram.value_at_percentile(*p)); + } + } + let time_elapsed = start_time.elapsed(); if config.duration != 0 && time_elapsed >= duration { @@ -225,6 +258,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec Vec, + announce_responses_per_info_hash: IndexMap, } impl Worker { - pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { + pub fn run( + config: Config, + shared_state: LoadTestState, + statistics_sender: Sender, + peers: Box<[Peer]>, + addr: SocketAddr, + ) { let mut sockets = Vec::new(); for _ in 0..config.network.sockets_per_worker { @@ -50,6 +61,8 @@ impl Worker { buffer, rng, statistics, + statistics_sender, + announce_responses_per_info_hash: Default::default(), }; instance.run_inner(); @@ -267,10 +280,30 @@ impl Worker { Response::AnnounceIpv4(r) => { self.statistics.responses_announce += 1; self.statistics.response_peers += r.peers.len(); + + let peer_index = + u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize; + + if let Some(peer) = self.peers.get(peer_index) { + *self + .announce_responses_per_info_hash + .entry(peer.announce_info_hash_index) + .or_default() += 1; + } } Response::AnnounceIpv6(r) => { self.statistics.responses_announce += 1; self.statistics.response_peers += r.peers.len(); + + let peer_index = + u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize; + + if let Some(peer) = self.peers.get(peer_index) { + *self + .announce_responses_per_info_hash + .entry(peer.announce_info_hash_index) + .or_default() += 1; + } } Response::Scrape(_) => { self.statistics.responses_scrape += 1; @@ -303,6 +336,14 @@ impl Worker { .response_peers .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + if self.config.peer_histogram { + let message = StatisticsMessage::ResponsesPerInfoHash( + self.announce_responses_per_info_hash.split_off(0), + ); + + self.statistics_sender.try_send(message).unwrap(); + } + self.statistics = LocalStatistics::default(); } }