diff --git a/Cargo.lock b/Cargo.lock index 41caaab..bc446f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,9 +78,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" @@ -145,7 +145,7 @@ dependencies = [ "regex", "serde", "tempfile", - "toml 0.8.9", + "toml 0.8.10", ] [[package]] @@ -335,9 +335,9 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", - "hashbrown 0.14.3", + "crossbeam-channel", + "hdrhistogram", "mimalloc", - "mio", "quickcheck", "quickcheck_macros", "rand", @@ -1349,9 +1349,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" [[package]] name = "hex" @@ -1527,7 +1527,7 @@ version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ - "hermit-abi 0.3.4", + "hermit-abi 0.3.5", "rustix", "windows-sys 0.52.0", ] @@ -1774,7 +1774,7 @@ dependencies = [ "ordered-float", "quanta", "radix_trie", - "sketches-ddsketch 0.2.1", + "sketches-ddsketch 0.2.2", ] [[package]] @@ -1940,7 +1940,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.4", + "hermit-abi 0.3.5", "libc", ] @@ -2269,15 +2269,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "ref-cast" version = "1.0.22" @@ -2573,9 +2564,9 @@ checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee" [[package]] name = "sketches-ddsketch" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" [[package]] name = "slab" @@ -2705,13 +2696,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", "fastrand 2.0.1", - "redox_syscall", "rustix", "windows-sys 0.52.0", ] @@ -2834,9 +2824,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" dependencies = [ "serde", "serde_spanned", @@ -2855,9 +2845,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.21.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +checksum = "0c9ffdf896f8daaabf9b66ba8e77ea1ed5ed0f72821b398aba62352e95062951" dependencies = [ "indexmap 2.2.2", "serde", diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 70cbc44..4c2b305 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -25,7 +25,7 @@ struct Args { #[arg(long, default_value_t = Priority::Medium)] min_priority: Priority, /// How long to run each load test for - #[arg(long, default_value_t = 90)] + #[arg(long, default_value_t = 30)] duration: usize, /// Only include data for last N seconds of load test runs. /// @@ -33,7 +33,7 @@ struct Args { /// maximum throughput /// /// 0 = use data for whole run - #[arg(long, default_value_t = 30)] + #[arg(long, default_value_t = 0)] summarize_last: usize, #[command(subcommand)] command: Command, diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index f6cc780..9926abf 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -443,6 +443,8 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { c.duration = self.parameters.duration; c.summarize_last = self.parameters.summarize_last; + c.extra_statistics = false; + c.requests.announce_peers_wanted = 30; c.requests.weight_connect = 0; c.requests.weight_announce = 100; diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index a6419fc..bc1246b 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -433,7 +433,7 @@ impl LargePeerMap { } fn remove_peer(&mut self, key: &ResponsePeer) -> Option { - let opt_removed_peer = self.peers.remove(key); + let opt_removed_peer = self.peers.swap_remove(key); if let Some(Peer { is_seeder: true, .. diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index 25fc157..beafa2d 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -97,7 +97,7 @@ pub fn run_statistics_worker( *count -= 1; if *count == 0 { - peers.remove(&peer_id); + peers.swap_remove(&peer_id); } } } diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 4289370..3b042ea 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -407,7 +407,7 @@ impl LargePeerMap { } fn remove_peer(&mut self, key: &ResponsePeer) -> Option { - let opt_removed_peer = self.peers.remove(key); + let opt_removed_peer = self.peers.swap_remove(key); if let Some(Peer { is_seeder: true, .. diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index 2eedf92..8cce143 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -25,9 +25,9 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" -hashbrown = "0.14" +crossbeam-channel = "0.5" +hdrhistogram = "7" mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.8", features = ["net", "os-poll"] } rand_distr = "0.4" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7cbffbd..847a24c 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -1,22 +1,16 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use hashbrown::HashMap; - +use aquatic_common::IndexMap; use aquatic_udp_protocol::*; -#[derive(PartialEq, Eq, Clone)] -pub struct TorrentPeer { - pub info_hash: InfoHash, - pub scrape_hash_indices: Box<[usize]>, - pub connection_id: ConnectionId, - pub peer_id: PeerId, - pub port: Port, +#[derive(Clone)] +pub struct LoadTestState { + pub info_hashes: Arc<[InfoHash]>, + pub statistics: Arc, } -pub type TorrentPeerMap = HashMap; - #[derive(Default)] -pub struct Statistics { +pub struct SharedStatistics { pub requests: AtomicUsize, pub response_peers: AtomicUsize, pub responses_connect: AtomicUsize, @@ -25,25 +19,14 @@ pub struct Statistics { pub responses_error: AtomicUsize, } -#[derive(Clone)] -pub struct LoadTestState { - pub info_hashes: Arc<[InfoHash]>, - pub statistics: Arc, +pub struct Peer { + pub announce_info_hash_index: usize, + pub announce_info_hash: InfoHash, + pub announce_port: Port, + pub scrape_info_hash_indices: Box<[usize]>, + pub socket_index: u8, } -#[derive(PartialEq, Eq, Clone, Copy)] -pub enum RequestType { - Announce, - Connect, - Scrape, -} - -#[derive(Default)] -pub struct SocketWorkerLocalStatistics { - pub requests: usize, - pub response_peers: usize, - pub responses_connect: usize, - pub responses_announce: usize, - pub responses_scrape: usize, - pub responses_error: usize, +pub enum StatisticsMessage { + ResponsesPerInfoHash(IndexMap), } diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 5571b11..27e7eb2 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -25,6 +25,8 @@ pub struct Config { /// /// 0 = include whole run pub summarize_last: usize, + /// Display extra statistics + pub extra_statistics: bool, pub network: NetworkConfig, pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] @@ -39,6 +41,7 @@ impl Default for Config { workers: 1, duration: 0, summarize_last: 0, + extra_statistics: true, network: NetworkConfig::default(), requests: RequestConfig::default(), #[cfg(feature = "cpu-pinning")] @@ -47,6 +50,12 @@ impl Default for Config { } } +impl aquatic_common::cli::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} + #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct NetworkConfig { @@ -57,10 +66,8 @@ pub struct NetworkConfig { /// /// Setting this to true can cause issues on macOS. pub multiple_client_ipv4s: bool, - /// Number of first client port - pub first_port: u16, - /// Socket worker poll timeout in microseconds - pub poll_timeout: u64, + /// Number of sockets to open per worker + pub sockets_per_worker: u8, /// Size of socket recv buffer. Use 0 for OS default. /// /// This setting can have a big impact on dropped packages. It might @@ -80,8 +87,7 @@ impl Default for NetworkConfig { fn default() -> Self { Self { multiple_client_ipv4s: true, - first_port: 45_000, - poll_timeout: 1, + sockets_per_worker: 4, recv_buffer: 8_000_000, } } @@ -92,6 +98,8 @@ impl Default for NetworkConfig { pub struct RequestConfig { /// Number of torrents to simulate pub number_of_torrents: usize, + /// Number of peers to simulate + pub number_of_peers: usize, /// Maximum number of torrents to ask about in scrape requests pub scrape_max_torrents: usize, /// Ask for this number of peers in announce requests @@ -105,30 +113,21 @@ pub struct RequestConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, - /// Peers choose torrents according to this Gamma distribution shape - pub torrent_gamma_shape: f64, - /// Peers choose torrents according to this Gamma distribution scale - pub torrent_gamma_scale: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, - /// Probability that an additional connect request will be sent for each - /// mio event - pub additional_request_probability: f32, } impl Default for RequestConfig { fn default() -> Self { Self { - number_of_torrents: 10_000, + number_of_torrents: 1_000_000, + number_of_peers: 2_000_000, scrape_max_torrents: 10, announce_peers_wanted: 30, - weight_connect: 0, + weight_connect: 1, weight_announce: 100, weight_scrape: 1, - torrent_gamma_shape: 0.2, - torrent_gamma_scale: 100.0, peer_seeder_probability: 0.75, - additional_request_probability: 0.5, } } } diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 42a4ada..fced2ca 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -1,3 +1,4 @@ +use std::iter::repeat_with; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; @@ -6,23 +7,23 @@ use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use rand_distr::Gamma; +use aquatic_common::IndexMap; +use aquatic_udp_protocol::{InfoHash, Port}; +use crossbeam_channel::{unbounded, Receiver}; +use hdrhistogram::Histogram; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use rand_distr::{Distribution, WeightedAliasIndex}; -pub mod common; +mod common; pub mod config; -pub mod utils; -pub mod worker; +mod worker; use common::*; use config::Config; -use utils::*; use worker::*; -impl aquatic_common::cli::Config for Config { - fn get_log_level(&self) -> Option { - Some(self.log_level) - } -} +const PERCENTILES: &[f64] = &[10.0, 25.0, 50.0, 75.0, 90.0, 95.0, 99.0, 99.9, 100.0]; pub fn run(config: Config) -> ::anyhow::Result<()> { if config.requests.weight_announce @@ -37,30 +38,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { panic!("Error: report_last_seconds can't be larger than duration"); } - println!("Starting client with config: {:#?}", config); + println!("Starting client with config: {:#?}\n", config); - let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); - - for _ in 0..config.requests.number_of_torrents { - info_hashes.push(generate_info_hash()); - } + let info_hash_dist = InfoHashDist::new(&config)?; + let peers_by_worker = create_peers(&config, &info_hash_dist); let state = LoadTestState { - info_hashes: Arc::from(info_hashes.into_boxed_slice()), - statistics: Arc::new(Statistics::default()), + info_hashes: info_hash_dist.into_arc_info_hashes(), + statistics: Arc::new(SharedStatistics::default()), }; - let gamma = Gamma::new( - config.requests.torrent_gamma_shape, - config.requests.torrent_gamma_scale, - ) - .unwrap(); + let (statistics_sender, statistics_receiver) = unbounded(); // Start workers - for i in 0..config.workers { - let port = config.network.first_port + (i as u16); - + for (i, peers) in (0..config.workers).zip(peers_by_worker) { let ip = if config.server_address.is_ipv6() { Ipv6Addr::LOCALHOST.into() } else if config.network.multiple_client_ipv4s { @@ -69,9 +61,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ipv4Addr::LOCALHOST.into() }; - let addr = SocketAddr::new(ip, port); + let addr = SocketAddr::new(ip, 0); let config = config.clone(); let state = state.clone(); + let statistics_sender = statistics_sender.clone(); Builder::new().name("load-test".into()).spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -82,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - Worker::run(state, gamma, config, addr) + Worker::run(config, state, statistics_sender, peers, addr) })?; } @@ -94,12 +87,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - monitor_statistics(state, &config); + monitor_statistics(state, &config, statistics_receiver); Ok(()) } -fn monitor_statistics(state: LoadTestState, config: &Config) { +fn monitor_statistics( + state: LoadTestState, + config: &Config, + statistics_receiver: Receiver, +) { let mut report_avg_connect: Vec = Vec::new(); let mut report_avg_announce: Vec = Vec::new(); let mut report_avg_scrape: Vec = Vec::new(); @@ -115,6 +112,21 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let time_elapsed = loop { thread::sleep(Duration::from_secs(INTERVAL)); + let mut opt_responses_per_info_hash: Option> = + config.extra_statistics.then_some(Default::default()); + + for message in statistics_receiver.try_iter() { + match message { + StatisticsMessage::ResponsesPerInfoHash(data) => { + if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_mut() { + for (k, v) in data { + *responses_per_info_hash.entry(k).or_default() += v; + } + } + } + } + } + let requests = fetch_and_reset(&state.statistics.requests); let response_peers = fetch_and_reset(&state.statistics.response_peers); let responses_connect = fetch_and_reset(&state.statistics.responses_connect); @@ -158,6 +170,20 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { peers_per_announce_response ); + if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_ref() { + let mut histogram = Histogram::::new(2).unwrap(); + + for num_responses in responses_per_info_hash.values().copied() { + histogram.record(num_responses).unwrap(); + } + + println!("Announce responses per info hash:"); + + for p in PERCENTILES { + println!(" - p{}: {}", p, histogram.value_at_percentile(*p)); + } + } + let time_elapsed = start_time.elapsed(); if config.duration != 0 && time_elapsed >= duration { @@ -208,3 +234,121 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 } + +fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec> { + let mut rng = SmallRng::seed_from_u64(0xc3a58be617b3acce); + + let mut opt_peers_per_info_hash: Option> = + config.extra_statistics.then_some(IndexMap::default()); + + let mut all_peers = repeat_with(|| { + let num_scrape_indices = rng.gen_range(1..config.requests.scrape_max_torrents + 1); + + let scrape_info_hash_indices = repeat_with(|| info_hash_dist.get_random_index(&mut rng)) + .take(num_scrape_indices) + .collect::>() + .into_boxed_slice(); + + let (announce_info_hash_index, announce_info_hash) = info_hash_dist.get_random(&mut rng); + + if let Some(peers_per_info_hash) = opt_peers_per_info_hash.as_mut() { + *peers_per_info_hash + .entry(announce_info_hash_index) + .or_default() += 1; + } + + Peer { + announce_info_hash_index, + announce_info_hash, + announce_port: Port::new(rng.gen()), + scrape_info_hash_indices, + socket_index: rng.gen_range(0..config.network.sockets_per_worker), + } + }) + .take(config.requests.number_of_peers) + .collect::>(); + + if let Some(peers_per_info_hash) = opt_peers_per_info_hash { + println!("Number of info hashes: {}", peers_per_info_hash.len()); + + let mut histogram = Histogram::::new(2).unwrap(); + + for num_peers in peers_per_info_hash.values() { + histogram.record(*num_peers).unwrap(); + } + + println!("Peers per info hash:"); + + for p in PERCENTILES { + println!(" - p{}: {}", p, histogram.value_at_percentile(*p)); + } + } + + let mut peers_by_worker = Vec::new(); + + let num_peers_per_worker = all_peers.len() / config.workers as usize; + + for _ in 0..(config.workers as usize) { + peers_by_worker.push( + all_peers + .split_off(all_peers.len() - num_peers_per_worker) + .into_boxed_slice(), + ); + + all_peers.shrink_to_fit(); + } + + peers_by_worker +} + +struct InfoHashDist { + info_hashes: Box<[InfoHash]>, + dist: WeightedAliasIndex, +} + +impl InfoHashDist { + fn new(config: &Config) -> anyhow::Result { + let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + + let info_hashes = repeat_with(|| { + let mut bytes = [0u8; 20]; + + for byte in bytes.iter_mut() { + *byte = rng.gen(); + } + + InfoHash(bytes) + }) + .take(config.requests.number_of_torrents) + .collect::>() + .into_boxed_slice(); + + let num_torrents = config.requests.number_of_torrents as u32; + + let weights = (0..num_torrents) + .map(|i| { + let floor = num_torrents as f64 / config.requests.number_of_peers as f64; + + floor + (6.5f64 - ((500.0 * f64::from(i)) / f64::from(num_torrents))).exp() + }) + .collect(); + + let dist = WeightedAliasIndex::new(weights)?; + + Ok(Self { info_hashes, dist }) + } + + fn get_random(&self, rng: &mut impl Rng) -> (usize, InfoHash) { + let index = self.dist.sample(rng); + + (index, self.info_hashes[index]) + } + + fn get_random_index(&self, rng: &mut impl Rng) -> usize { + self.dist.sample(rng) + } + + fn into_arc_info_hashes(self) -> Arc<[InfoHash]> { + Arc::from(self.info_hashes) + } +} diff --git a/crates/udp_load_test/src/utils.rs b/crates/udp_load_test/src/utils.rs deleted file mode 100644 index 6b7f748..0000000 --- a/crates/udp_load_test/src/utils.rs +++ /dev/null @@ -1,36 +0,0 @@ -use rand::prelude::*; -use rand_distr::Gamma; - -use aquatic_udp_protocol::*; - -pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma, max: usize) -> usize { - let p: f64 = rng.sample(gamma); - let p = (p.min(101.0f64) - 1.0) / 100.0; - - (p * max as f64) as usize -} - -pub fn generate_peer_id() -> PeerId { - PeerId(random_20_bytes()) -} - -pub fn generate_info_hash() -> InfoHash { - InfoHash(random_20_bytes()) -} - -pub fn generate_transaction_id(rng: &mut impl Rng) -> TransactionId { - TransactionId::new(rng.gen()) -} - -pub fn create_connect_request(transaction_id: TransactionId) -> Request { - (ConnectRequest { transaction_id }).into() -} - -// Don't use SmallRng here for now -fn random_20_bytes() -> [u8; 20] { - let mut bytes = [0; 20]; - - thread_rng().fill_bytes(&mut bytes[..]); - - bytes -} diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs new file mode 100644 index 0000000..570b1bb --- /dev/null +++ b/crates/udp_load_test/src/worker.rs @@ -0,0 +1,439 @@ +use std::io::{Cursor, ErrorKind}; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use aquatic_common::IndexMap; +use crossbeam_channel::Sender; +use rand::Rng; +use rand::{prelude::SmallRng, SeedableRng}; +use rand_distr::{Distribution, WeightedIndex}; +use socket2::{Domain, Protocol, Socket, Type}; + +use aquatic_udp_protocol::*; + +use crate::common::{LoadTestState, Peer}; +use crate::config::Config; +use crate::StatisticsMessage; + +const MAX_PACKET_SIZE: usize = 8192; + +pub struct Worker { + config: Config, + shared_state: LoadTestState, + peers: Box<[Peer]>, + request_type_dist: RequestTypeDist, + addr: SocketAddr, + sockets: Vec, + buffer: [u8; MAX_PACKET_SIZE], + rng: SmallRng, + statistics: LocalStatistics, + statistics_sender: Sender, + announce_responses_per_info_hash: IndexMap, +} + +impl Worker { + pub fn run( + config: Config, + shared_state: LoadTestState, + statistics_sender: Sender, + peers: Box<[Peer]>, + addr: SocketAddr, + ) { + let mut sockets = Vec::new(); + + for _ in 0..config.network.sockets_per_worker { + sockets.push(create_socket(&config, addr)); + } + + let buffer = [0u8; MAX_PACKET_SIZE]; + let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + let statistics = LocalStatistics::default(); + let request_type_dist = RequestTypeDist::new(&config).unwrap(); + + let mut instance = Self { + config, + shared_state, + peers, + request_type_dist, + addr, + sockets, + buffer, + rng, + statistics, + statistics_sender, + announce_responses_per_info_hash: Default::default(), + }; + + instance.run_inner(); + } + + fn run_inner(&mut self) { + let mut connection_ids = Vec::new(); + + for _ in 0..self.config.network.sockets_per_worker { + connection_ids.push(self.acquire_connection_id()); + } + + let mut requests_sent = 0usize; + let mut responses_received = 0usize; + + let mut connect_socket_index = 0u8; + let mut peer_index = 0usize; + let mut loop_index = 0usize; + + loop { + let response_ratio = responses_received as f64 / requests_sent.max(1) as f64; + + if response_ratio >= 0.90 || requests_sent == 0 || self.rng.gen::() == 0 { + for _ in 0..self.sockets.len() { + match self.request_type_dist.sample(&mut self.rng) { + RequestType::Connect => { + self.send_connect_request( + connect_socket_index, + connect_socket_index.into(), + ); + + connect_socket_index = connect_socket_index.wrapping_add(1) + % self.config.network.sockets_per_worker; + } + RequestType::Announce => { + self.send_announce_request(&connection_ids, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } + RequestType::Scrape => { + self.send_scrape_request(&connection_ids, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } + } + + requests_sent += 1; + } + } + + for socket_index in 0..self.sockets.len() { + // Do this instead of iterating over Vec to fix borrow checker complaint + let socket = self.sockets.get(socket_index).unwrap(); + + match socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(Response::Connect(r)) => { + // If we're sending connect requests, we might + // as well keep connection IDs valid + let connection_id_index = + u32::from_ne_bytes(r.transaction_id.0.get().to_ne_bytes()) + as usize; + connection_ids[connection_id_index] = r.connection_id; + + self.handle_response(Response::Connect(r)); + } + Ok(response) => { + self.handle_response(response); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + + responses_received += 1; + } + Err(err) if err.kind() == ErrorKind::WouldBlock => (), + Err(err) => { + eprintln!("recv error: {:#}", err); + } + } + } + + if loop_index % 1024 == 0 { + self.update_shared_statistics(); + } + + loop_index = loop_index.wrapping_add(1); + } + } + + fn acquire_connection_id(&mut self) -> ConnectionId { + loop { + self.send_connect_request(0, u32::MAX); + + for _ in 0..100 { + match self.sockets[0].recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(Response::Connect(r)) => { + return r.connection_id; + } + Ok(r) => { + eprintln!("Received non-connect response: {:?}", r); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + ::std::thread::sleep(Duration::from_millis(10)); + } + Err(err) => { + eprintln!("recv error: {:#}", err); + } + }; + } + } + } + + fn send_connect_request(&mut self, socket_index: u8, transaction_id: u32) { + let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes())); + + let request = ConnectRequest { transaction_id }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.sockets[socket_index as usize].send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn send_announce_request(&mut self, connection_ids: &[ConnectionId], peer_index: usize) { + let peer = self.peers.get(peer_index).unwrap(); + + let (event, bytes_left) = { + if self + .rng + .gen_bool(self.config.requests.peer_seeder_probability) + { + (AnnounceEvent::Completed, NumberOfBytes::new(0)) + } else { + (AnnounceEvent::Started, NumberOfBytes::new(50)) + } + }; + + let transaction_id = + TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); + + let request = AnnounceRequest { + connection_id: connection_ids[peer.socket_index as usize], + action_placeholder: Default::default(), + transaction_id, + info_hash: peer.announce_info_hash, + peer_id: PeerId([0; 20]), + bytes_downloaded: NumberOfBytes::new(50), + bytes_uploaded: NumberOfBytes::new(50), + bytes_left, + event: event.into(), + ip_address: Ipv4AddrBytes([0; 4]), + key: PeerKey::new(0), + peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), + port: peer.announce_port, + }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn send_scrape_request(&mut self, connection_ids: &[ConnectionId], peer_index: usize) { + let peer = self.peers.get(peer_index).unwrap(); + + let transaction_id = + TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); + + let mut info_hashes = Vec::with_capacity(peer.scrape_info_hash_indices.len()); + + for i in peer.scrape_info_hash_indices.iter() { + info_hashes.push(self.shared_state.info_hashes[*i].to_owned()) + } + + let request = ScrapeRequest { + connection_id: connection_ids[peer.socket_index as usize], + transaction_id, + info_hashes, + }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn handle_response(&mut self, response: Response) { + match response { + Response::Connect(_) => { + self.statistics.responses_connect += 1; + } + Response::AnnounceIpv4(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + + let peer_index = + u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize; + + if let Some(peer) = self.peers.get(peer_index) { + *self + .announce_responses_per_info_hash + .entry(peer.announce_info_hash_index) + .or_default() += 1; + } + } + Response::AnnounceIpv6(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + + let peer_index = + u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize; + + if let Some(peer) = self.peers.get(peer_index) { + *self + .announce_responses_per_info_hash + .entry(peer.announce_info_hash_index) + .or_default() += 1; + } + } + Response::Scrape(_) => { + self.statistics.responses_scrape += 1; + } + Response::Error(_) => { + self.statistics.responses_error += 1; + } + } + } + + fn update_shared_statistics(&mut self) { + let shared_statistics = &self.shared_state.statistics; + + shared_statistics + .requests + .fetch_add(self.statistics.requests, Ordering::Relaxed); + shared_statistics + .responses_connect + .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); + shared_statistics + .responses_announce + .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); + shared_statistics + .responses_scrape + .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); + shared_statistics + .responses_error + .fetch_add(self.statistics.responses_error, Ordering::Relaxed); + shared_statistics + .response_peers + .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + + if self.config.extra_statistics { + let message = StatisticsMessage::ResponsesPerInfoHash( + self.announce_responses_per_info_hash.split_off(0), + ); + + self.statistics_sender.try_send(message).unwrap(); + } + + self.statistics = LocalStatistics::default(); + } +} + +fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, err + ); + } + } + + socket + .bind(&addr.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); + + socket + .connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into() +} + +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum RequestType { + Announce, + Connect, + Scrape, +} + +pub struct RequestTypeDist(WeightedIndex); + +impl RequestTypeDist { + fn new(config: &Config) -> anyhow::Result { + let weights = [ + config.requests.weight_announce, + config.requests.weight_connect, + config.requests.weight_scrape, + ]; + + Ok(Self(WeightedIndex::new(weights)?)) + } + + fn sample(&self, rng: &mut impl Rng) -> RequestType { + const ITEMS: [RequestType; 3] = [ + RequestType::Announce, + RequestType::Connect, + RequestType::Scrape, + ]; + + ITEMS[self.0.sample(rng)] + } +} + +#[derive(Default)] +pub struct LocalStatistics { + pub requests: usize, + pub response_peers: usize, + pub responses_connect: usize, + pub responses_announce: usize, + pub responses_scrape: usize, + pub responses_error: usize, +} diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs deleted file mode 100644 index 8c7c606..0000000 --- a/crates/udp_load_test/src/worker/mod.rs +++ /dev/null @@ -1,371 +0,0 @@ -use std::io::Cursor; -use std::net::SocketAddr; -use std::sync::atomic::Ordering; -use std::time::Duration; - -use mio::{net::UdpSocket, Events, Interest, Poll, Token}; -use rand::Rng; -use rand::{prelude::SmallRng, SeedableRng}; -use rand_distr::{Distribution, Gamma, WeightedIndex}; -use socket2::{Domain, Protocol, Socket, Type}; - -use aquatic_udp_protocol::*; - -use crate::config::Config; -use crate::{common::*, utils::*}; - -const MAX_PACKET_SIZE: usize = 8192; - -pub struct Worker { - config: Config, - shared_state: LoadTestState, - gamma: Gamma, - addr: SocketAddr, - socket: UdpSocket, - buffer: [u8; MAX_PACKET_SIZE], - rng: SmallRng, - torrent_peers: TorrentPeerMap, - statistics: SocketWorkerLocalStatistics, -} - -impl Worker { - pub fn run(shared_state: LoadTestState, gamma: Gamma, config: Config, addr: SocketAddr) { - let socket = UdpSocket::from_std(create_socket(&config, addr)); - let buffer = [0u8; MAX_PACKET_SIZE]; - let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); - let torrent_peers = TorrentPeerMap::default(); - let statistics = SocketWorkerLocalStatistics::default(); - - let mut instance = Self { - config, - shared_state, - gamma, - addr, - socket, - buffer, - rng, - torrent_peers, - statistics, - }; - - instance.run_inner(); - } - - fn run_inner(&mut self) { - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(1); - - poll.registry() - .register(&mut self.socket, Token(0), Interest::READABLE) - .unwrap(); - - // Bootstrap request cycle - let initial_request = create_connect_request(generate_transaction_id(&mut self.rng)); - self.send_request(initial_request); - - let timeout = Duration::from_micros(self.config.network.poll_timeout); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - for _ in events.iter() { - while let Ok(amt) = self.socket.recv(&mut self.buffer) { - match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { - Ok(response) => { - if let Some(request) = self.process_response(response) { - self.send_request(request); - } - } - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); - } - } - } - - if self.rng.gen::() <= self.config.requests.additional_request_probability { - let additional_request = - create_connect_request(generate_transaction_id(&mut self.rng)); - - self.send_request(additional_request); - } - - self.update_shared_statistics(); - } - } - } - - fn process_response(&mut self, response: Response) -> Option { - match response { - Response::Connect(r) => { - self.statistics.responses_connect += 1; - - // Fetch the torrent peer or create it if is doesn't exists. Update - // the connection id if fetched. Create a request and move the - // torrent peer appropriately. - - let mut torrent_peer = self - .torrent_peers - .remove(&r.transaction_id) - .unwrap_or_else(|| self.create_torrent_peer(r.connection_id)); - - torrent_peer.connection_id = r.connection_id; - - let new_transaction_id = generate_transaction_id(&mut self.rng); - let request = self.create_random_request(new_transaction_id, &torrent_peer); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - Response::AnnounceIpv4(r) => { - self.statistics.responses_announce += 1; - self.statistics.response_peers += r.peers.len(); - - self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) - } - Response::AnnounceIpv6(r) => { - self.statistics.responses_announce += 1; - self.statistics.response_peers += r.peers.len(); - - self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) - } - Response::Scrape(r) => { - self.statistics.responses_scrape += 1; - - self.if_torrent_peer_move_and_create_random_request(r.transaction_id) - } - Response::Error(r) => { - self.statistics.responses_error += 1; - - if !r.message.to_lowercase().contains("connection") { - eprintln!( - "Received error response which didn't contain the word 'connection': {}", - r.message - ); - } - - if let Some(torrent_peer) = self.torrent_peers.remove(&r.transaction_id) { - let new_transaction_id = generate_transaction_id(&mut self.rng); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(create_connect_request(new_transaction_id)) - } else { - Some(create_connect_request(generate_transaction_id( - &mut self.rng, - ))) - } - } - } - } - - fn if_torrent_peer_move_and_create_random_request( - &mut self, - transaction_id: TransactionId, - ) -> Option { - let torrent_peer = self.torrent_peers.remove(&transaction_id)?; - - let new_transaction_id = generate_transaction_id(&mut self.rng); - - let request = self.create_random_request(new_transaction_id, &torrent_peer); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - - fn create_torrent_peer(&mut self, connection_id: ConnectionId) -> TorrentPeer { - let num_scrape_hashes = self - .rng - .gen_range(1..self.config.requests.scrape_max_torrents); - - let scrape_hash_indices = (0..num_scrape_hashes) - .map(|_| self.random_info_hash_index()) - .collect::>() - .into_boxed_slice(); - - let info_hash_index = self.random_info_hash_index(); - - TorrentPeer { - info_hash: self.shared_state.info_hashes[info_hash_index], - scrape_hash_indices, - connection_id, - peer_id: generate_peer_id(), - port: Port::new(self.rng.gen()), - } - } - - fn create_random_request( - &mut self, - transaction_id: TransactionId, - torrent_peer: &TorrentPeer, - ) -> Request { - const ITEMS: [RequestType; 3] = [ - RequestType::Announce, - RequestType::Connect, - RequestType::Scrape, - ]; - - let weights = [ - self.config.requests.weight_announce as u32, - self.config.requests.weight_connect as u32, - self.config.requests.weight_scrape as u32, - ]; - - let dist = WeightedIndex::new(weights).expect("random request weighted index"); - - match ITEMS[dist.sample(&mut self.rng)] { - RequestType::Announce => self.create_announce_request(torrent_peer, transaction_id), - RequestType::Connect => (ConnectRequest { transaction_id }).into(), - RequestType::Scrape => self.create_scrape_request(torrent_peer, transaction_id), - } - } - - fn create_announce_request( - &mut self, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, - ) -> Request { - let (event, bytes_left) = { - if self - .rng - .gen_bool(self.config.requests.peer_seeder_probability) - { - (AnnounceEvent::Completed, NumberOfBytes::new(0)) - } else { - (AnnounceEvent::Started, NumberOfBytes::new(50)) - } - }; - - (AnnounceRequest { - connection_id: torrent_peer.connection_id, - action_placeholder: Default::default(), - transaction_id, - info_hash: torrent_peer.info_hash, - peer_id: torrent_peer.peer_id, - bytes_downloaded: NumberOfBytes::new(50), - bytes_uploaded: NumberOfBytes::new(50), - bytes_left, - event: event.into(), - ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), - port: torrent_peer.port, - }) - .into() - } - - fn create_scrape_request( - &self, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, - ) -> Request { - let indeces = &torrent_peer.scrape_hash_indices; - - let mut scape_hashes = Vec::with_capacity(indeces.len()); - - for i in indeces.iter() { - scape_hashes.push(self.shared_state.info_hashes[*i].to_owned()) - } - - (ScrapeRequest { - connection_id: torrent_peer.connection_id, - transaction_id, - info_hashes: scape_hashes, - }) - .into() - } - - fn random_info_hash_index(&mut self) -> usize { - gamma_usize( - &mut self.rng, - self.gamma, - &self.config.requests.number_of_torrents - 1, - ) - } - - fn send_request(&mut self, request: Request) { - let mut cursor = Cursor::new(self.buffer); - - match request.write_bytes(&mut cursor) { - Ok(()) => { - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match self.socket.send(&inner[..position]) { - Ok(_) => { - self.statistics.requests += 1; - } - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); - } - } - } - Err(err) => { - eprintln!("request_to_bytes err: {}", err); - } - } - } - - fn update_shared_statistics(&mut self) { - self.shared_state - .statistics - .requests - .fetch_add(self.statistics.requests, Ordering::Relaxed); - self.shared_state - .statistics - .responses_connect - .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); - self.shared_state - .statistics - .responses_announce - .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); - self.shared_state - .statistics - .responses_scrape - .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); - self.shared_state - .statistics - .responses_error - .fetch_add(self.statistics.responses_error, Ordering::Relaxed); - self.shared_state - .statistics - .response_peers - .fetch_add(self.statistics.response_peers, Ordering::Relaxed); - - self.statistics = SocketWorkerLocalStatistics::default(); - } -} - -fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, err - ); - } - } - - socket - .bind(&addr.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); - - socket - .connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into() -} diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 43820b4..1120305 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -356,7 +356,7 @@ impl TorrentData { peer.valid_until = valid_until; } PeerStatus::Stopped => { - let peer = entry.remove(); + let peer = entry.swap_remove(); if peer.seeder { self.num_seeders -= 1; @@ -477,7 +477,7 @@ impl TorrentData { if answer_receiver .expecting_answers - .remove(&expecting_answer) + .swap_remove(&expecting_answer) .is_some() { let answer_out_message = AnswerOutMessage { @@ -519,7 +519,7 @@ impl TorrentData { peer_id: PeerId, #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) { - if let Some(peer) = self.peers.remove(&peer_id) { + if let Some(peer) = self.peers.swap_remove(&peer_id) { if peer.seeder { self.num_seeders -= 1; }