From 6745eba2de27fa718319a0fb43920e8943a8c9f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 5 Feb 2024 22:33:55 +0100 Subject: [PATCH 01/12] Rewrite udp load tester - Less wobbly traffic patterns - More consistent info hash peer distribution --- Cargo.lock | 3 +- crates/udp_load_test/Cargo.toml | 3 +- crates/udp_load_test/src/common.rs | 42 +-- crates/udp_load_test/src/config.rs | 19 +- crates/udp_load_test/src/lib.rs | 153 ++++++++-- crates/udp_load_test/src/utils.rs | 36 --- crates/udp_load_test/src/worker.rs | 365 ++++++++++++++++++++++++ crates/udp_load_test/src/worker/mod.rs | 371 ------------------------- 8 files changed, 515 insertions(+), 477 deletions(-) delete mode 100644 crates/udp_load_test/src/utils.rs create mode 100644 crates/udp_load_test/src/worker.rs delete mode 100644 crates/udp_load_test/src/worker/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 41caaab..a61454d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,9 +335,8 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", - "hashbrown 0.14.3", + "hdrhistogram", "mimalloc", - "mio", "quickcheck", "quickcheck_macros", "rand", diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index 2eedf92..076810d 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -25,9 +25,8 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" -hashbrown = "0.14" +hdrhistogram = "7" mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.8", features = ["net", "os-poll"] } rand_distr = "0.4" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7cbffbd..7470830 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -1,22 +1,15 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use hashbrown::HashMap; - use aquatic_udp_protocol::*; -#[derive(PartialEq, Eq, Clone)] -pub struct TorrentPeer { - pub info_hash: InfoHash, - pub scrape_hash_indices: Box<[usize]>, - pub connection_id: ConnectionId, - pub peer_id: PeerId, - pub port: Port, +#[derive(Clone)] +pub struct LoadTestState { + pub info_hashes: Arc<[InfoHash]>, + pub statistics: Arc, } -pub type TorrentPeerMap = HashMap; - #[derive(Default)] -pub struct Statistics { +pub struct SharedStatistics { pub requests: AtomicUsize, pub response_peers: AtomicUsize, pub responses_connect: AtomicUsize, @@ -25,25 +18,8 @@ pub struct Statistics { pub responses_error: AtomicUsize, } -#[derive(Clone)] -pub struct LoadTestState { - pub info_hashes: Arc<[InfoHash]>, - pub statistics: Arc, -} - -#[derive(PartialEq, Eq, Clone, Copy)] -pub enum RequestType { - Announce, - Connect, - Scrape, -} - -#[derive(Default)] -pub struct SocketWorkerLocalStatistics { - pub requests: usize, - pub response_peers: usize, - pub responses_connect: usize, - pub responses_announce: usize, - pub responses_scrape: usize, - pub responses_error: usize, +pub struct Peer { + pub announce_info_hash: InfoHash, + pub announce_port: Port, + pub scrape_info_hash_indices: Box<[usize]>, } diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 5571b11..0f0b701 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -25,6 +25,8 @@ pub struct Config { /// /// 0 = include whole run pub summarize_last: usize, + /// Display data on number of peers per info hash + pub peer_histogram: bool, pub network: NetworkConfig, pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] @@ -39,6 +41,7 @@ impl Default for Config { workers: 1, duration: 0, summarize_last: 0, + peer_histogram: true, network: NetworkConfig::default(), requests: RequestConfig::default(), #[cfg(feature = "cpu-pinning")] @@ -59,8 +62,6 @@ pub struct NetworkConfig { pub multiple_client_ipv4s: bool, /// Number of first client port pub first_port: u16, - /// Socket worker poll timeout in microseconds - pub poll_timeout: u64, /// Size of socket recv buffer. Use 0 for OS default. /// /// This setting can have a big impact on dropped packages. It might @@ -81,7 +82,6 @@ impl Default for NetworkConfig { Self { multiple_client_ipv4s: true, first_port: 45_000, - poll_timeout: 1, recv_buffer: 8_000_000, } } @@ -92,6 +92,8 @@ impl Default for NetworkConfig { pub struct RequestConfig { /// Number of torrents to simulate pub number_of_torrents: usize, + /// Number of peers to simulate + pub number_of_peers: usize, /// Maximum number of torrents to ask about in scrape requests pub scrape_max_torrents: usize, /// Ask for this number of peers in announce requests @@ -105,30 +107,21 @@ pub struct RequestConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, - /// Peers choose torrents according to this Gamma distribution shape - pub torrent_gamma_shape: f64, - /// Peers choose torrents according to this Gamma distribution scale - pub torrent_gamma_scale: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, - /// Probability that an additional connect request will be sent for each - /// mio event - pub additional_request_probability: f32, } impl Default for RequestConfig { fn default() -> Self { Self { number_of_torrents: 10_000, + number_of_peers: 100_000, scrape_max_torrents: 10, announce_peers_wanted: 30, weight_connect: 0, weight_announce: 100, weight_scrape: 1, - torrent_gamma_shape: 0.2, - torrent_gamma_scale: 100.0, peer_seeder_probability: 0.75, - additional_request_probability: 0.5, } } } diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 42a4ada..61dfbd5 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -1,3 +1,4 @@ +use std::iter::repeat_with; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; @@ -6,16 +7,19 @@ use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use rand_distr::Gamma; +use aquatic_common::IndexMap; +use aquatic_udp_protocol::{InfoHash, Port}; +use hdrhistogram::Histogram; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use rand_distr::{Distribution, WeightedAliasIndex}; -pub mod common; +mod common; pub mod config; -pub mod utils; -pub mod worker; +mod worker; use common::*; use config::Config; -use utils::*; use worker::*; impl aquatic_common::cli::Config for Config { @@ -39,26 +43,17 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); - - for _ in 0..config.requests.number_of_torrents { - info_hashes.push(generate_info_hash()); - } + let info_hash_dist = InfoHashDist::new(&config)?; + let peers_by_worker = create_peers(&config, &info_hash_dist); let state = LoadTestState { - info_hashes: Arc::from(info_hashes.into_boxed_slice()), - statistics: Arc::new(Statistics::default()), + info_hashes: info_hash_dist.into_arc_info_hashes(), + statistics: Arc::new(SharedStatistics::default()), }; - let gamma = Gamma::new( - config.requests.torrent_gamma_shape, - config.requests.torrent_gamma_scale, - ) - .unwrap(); - // Start workers - for i in 0..config.workers { + for (i, peers) in (0..config.workers).zip(peers_by_worker) { let port = config.network.first_port + (i as u16); let ip = if config.server_address.is_ipv6() { @@ -82,7 +77,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - Worker::run(state, gamma, config, addr) + Worker::run(config, state, peers, addr) })?; } @@ -208,3 +203,121 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 } + +fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec> { + let mut rng = SmallRng::seed_from_u64(0xc3a58be617b3acce); + + let mut opt_peers_per_info_hash: Option> = + config.peer_histogram.then_some(IndexMap::default()); + + let mut all_peers = repeat_with(|| { + let num_scrape_indices = rng.gen_range(1..config.requests.scrape_max_torrents + 1); + + let scrape_info_hash_indices = repeat_with(|| info_hash_dist.get_random_index(&mut rng)) + .take(num_scrape_indices) + .collect::>() + .into_boxed_slice(); + + let (announce_info_hash_index, announce_info_hash) = info_hash_dist.get_random(&mut rng); + + if let Some(peers_per_info_hash) = opt_peers_per_info_hash.as_mut() { + *peers_per_info_hash + .entry(announce_info_hash_index) + .or_default() += 1; + } + + Peer { + announce_info_hash, + announce_port: Port::new(rng.gen()), + scrape_info_hash_indices, + } + }) + .take(config.requests.number_of_peers) + .collect::>(); + + if let Some(peers_per_info_hash) = opt_peers_per_info_hash { + let mut histogram = Histogram::::new(2).unwrap(); + + for num_peers in peers_per_info_hash.values() { + histogram.record(*num_peers).unwrap(); + } + + let percentiles = [ + 1.0, 10.0, 25.0, 50.0, 75.0, 85.0, 90.0, 95.0, 98.0, 99.9, 100.0, + ]; + + for p in percentiles { + let value = histogram.value_at_percentile(p); + + println!("Peers at info hash percentile {}: {}", p, value); + } + } + + let mut peers_by_worker = Vec::new(); + + let num_peers_per_worker = all_peers.len() / config.workers as usize; + + for _ in 0..(config.workers as usize) { + peers_by_worker.push( + all_peers + .split_off(all_peers.len() - num_peers_per_worker) + .into_boxed_slice(), + ); + + all_peers.shrink_to_fit(); + } + + peers_by_worker +} + +struct InfoHashDist { + info_hashes: Box<[InfoHash]>, + dist: WeightedAliasIndex, +} + +impl InfoHashDist { + fn new(config: &Config) -> anyhow::Result { + let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + + let info_hashes = repeat_with(|| { + let mut bytes = [0u8; 20]; + + for byte in bytes.iter_mut() { + *byte = rng.gen(); + } + + InfoHash(bytes) + }) + .take(config.requests.number_of_torrents) + .collect::>() + .into_boxed_slice(); + + let num_torrents = config.requests.number_of_torrents as u32; + + let weights = (0..num_torrents) + .map(|i| { + let floor = num_torrents as f64 / config.requests.number_of_peers as f64; + + floor + (7.0f64 - ((300.0 * f64::from(i)) / f64::from(num_torrents))).exp() + }) + .collect(); + + let dist = WeightedAliasIndex::new(weights)?; + + Ok(Self { info_hashes, dist }) + } + + fn get_random(&self, rng: &mut impl Rng) -> (usize, InfoHash) { + let index = self.dist.sample(rng); + + (index, self.info_hashes[index]) + } + + fn get_random_index(&self, rng: &mut impl Rng) -> usize { + self.dist.sample(rng) + } + + fn into_arc_info_hashes(self) -> Arc<[InfoHash]> { + Arc::from(self.info_hashes) + } +} diff --git a/crates/udp_load_test/src/utils.rs b/crates/udp_load_test/src/utils.rs deleted file mode 100644 index 6b7f748..0000000 --- a/crates/udp_load_test/src/utils.rs +++ /dev/null @@ -1,36 +0,0 @@ -use rand::prelude::*; -use rand_distr::Gamma; - -use aquatic_udp_protocol::*; - -pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma, max: usize) -> usize { - let p: f64 = rng.sample(gamma); - let p = (p.min(101.0f64) - 1.0) / 100.0; - - (p * max as f64) as usize -} - -pub fn generate_peer_id() -> PeerId { - PeerId(random_20_bytes()) -} - -pub fn generate_info_hash() -> InfoHash { - InfoHash(random_20_bytes()) -} - -pub fn generate_transaction_id(rng: &mut impl Rng) -> TransactionId { - TransactionId::new(rng.gen()) -} - -pub fn create_connect_request(transaction_id: TransactionId) -> Request { - (ConnectRequest { transaction_id }).into() -} - -// Don't use SmallRng here for now -fn random_20_bytes() -> [u8; 20] { - let mut bytes = [0; 20]; - - thread_rng().fill_bytes(&mut bytes[..]); - - bytes -} diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs new file mode 100644 index 0000000..102875c --- /dev/null +++ b/crates/udp_load_test/src/worker.rs @@ -0,0 +1,365 @@ +use std::io::{Cursor, ErrorKind}; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use rand::Rng; +use rand::{prelude::SmallRng, SeedableRng}; +use rand_distr::{Distribution, WeightedIndex}; +use socket2::{Domain, Protocol, Socket, Type}; + +use aquatic_udp_protocol::*; + +use crate::common::{LoadTestState, Peer}; +use crate::config::Config; + +const MAX_PACKET_SIZE: usize = 8192; + +pub struct Worker { + config: Config, + shared_state: LoadTestState, + peers: Box<[Peer]>, + request_type_dist: RequestTypeDist, + addr: SocketAddr, + socket: UdpSocket, + buffer: [u8; MAX_PACKET_SIZE], + rng: SmallRng, + statistics: LocalStatistics, +} + +impl Worker { + pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { + let socket = create_socket(&config, addr); + let buffer = [0u8; MAX_PACKET_SIZE]; + let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + let statistics = LocalStatistics::default(); + let request_type_dist = RequestTypeDist::new(&config).unwrap(); + + let mut instance = Self { + config, + shared_state, + peers, + request_type_dist, + addr, + socket, + buffer, + rng, + statistics, + }; + + instance.run_inner(); + } + + fn run_inner(&mut self) { + let connection_id = self.aquire_connection_id(); + + let mut requests_sent = 0usize; + let mut responses_received = 0usize; + + let mut peer_index = 0usize; + let mut loop_index = 0usize; + + loop { + let response_ratio = responses_received as f64 / requests_sent.max(1) as f64; + + if response_ratio >= 0.95 || requests_sent == 0 || self.rng.gen::() == 0 { + match self.request_type_dist.sample(&mut self.rng) { + RequestType::Connect => { + self.send_connect_request(u32::MAX - 1); + } + RequestType::Announce => { + self.send_announce_request(connection_id, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } + RequestType::Scrape => { + self.send_scrape_request(connection_id, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } + } + + requests_sent += 1; + } + + match self.socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(response) => { + self.handle_response(response); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + + responses_received += 1; + } + Err(err) if err.kind() == ErrorKind::WouldBlock => (), + Err(err) => { + eprintln!("recv error: {:#}", err); + } + } + + if loop_index % 1024 == 0 { + self.update_shared_statistics(); + } + + loop_index = loop_index.wrapping_add(1); + } + } + + fn aquire_connection_id(&mut self) -> ConnectionId { + loop { + self.send_connect_request(u32::MAX); + + for _ in 0..100 { + match self.socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(Response::Connect(r)) => { + return r.connection_id; + } + Ok(r) => { + eprintln!("Received non-connect response: {:?}", r); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + ::std::thread::sleep(Duration::from_millis(10)); + } + Err(err) => { + eprintln!("recv error: {:#}", err); + } + }; + } + } + } + + fn send_connect_request(&mut self, transaction_id: u32) { + let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes())); + + let request = ConnectRequest { transaction_id }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.socket.send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn send_announce_request(&mut self, connection_id: ConnectionId, peer_index: usize) { + let peer = self.peers.get(peer_index).unwrap(); + + let (event, bytes_left) = { + if self + .rng + .gen_bool(self.config.requests.peer_seeder_probability) + { + (AnnounceEvent::Completed, NumberOfBytes::new(0)) + } else { + (AnnounceEvent::Started, NumberOfBytes::new(50)) + } + }; + + let transaction_id = + TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); + + let request = AnnounceRequest { + connection_id, + action_placeholder: Default::default(), + transaction_id, + info_hash: peer.announce_info_hash, + peer_id: PeerId([0; 20]), + bytes_downloaded: NumberOfBytes::new(50), + bytes_uploaded: NumberOfBytes::new(50), + bytes_left, + event: event.into(), + ip_address: Ipv4AddrBytes([0; 4]), + key: PeerKey::new(0), + peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), + port: peer.announce_port, + }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.socket.send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn send_scrape_request(&mut self, connection_id: ConnectionId, peer_index: usize) { + let peer = self.peers.get(peer_index).unwrap(); + + let transaction_id = + TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); + + let mut info_hashes = Vec::with_capacity(peer.scrape_info_hash_indices.len()); + + for i in peer.scrape_info_hash_indices.iter() { + info_hashes.push(self.shared_state.info_hashes[*i].to_owned()) + } + + let request = ScrapeRequest { + connection_id, + transaction_id, + info_hashes, + }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.socket.send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn handle_response(&mut self, response: Response) { + match response { + Response::Connect(_) => { + self.statistics.responses_connect += 1; + } + Response::AnnounceIpv4(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + } + Response::AnnounceIpv6(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + } + Response::Scrape(_) => { + self.statistics.responses_scrape += 1; + } + Response::Error(_) => { + self.statistics.responses_error += 1; + } + } + } + + fn update_shared_statistics(&mut self) { + let shared_statistics = &self.shared_state.statistics; + + shared_statistics + .requests + .fetch_add(self.statistics.requests, Ordering::Relaxed); + shared_statistics + .responses_connect + .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); + shared_statistics + .responses_announce + .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); + shared_statistics + .responses_scrape + .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); + shared_statistics + .responses_error + .fetch_add(self.statistics.responses_error, Ordering::Relaxed); + shared_statistics + .response_peers + .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + + self.statistics = LocalStatistics::default(); + } +} + +fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, err + ); + } + } + + socket + .bind(&addr.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); + + socket + .connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into() +} + +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum RequestType { + Announce, + Connect, + Scrape, +} + +pub struct RequestTypeDist(WeightedIndex); + +impl RequestTypeDist { + fn new(config: &Config) -> anyhow::Result { + let weights = [ + config.requests.weight_announce, + config.requests.weight_connect, + config.requests.weight_scrape, + ]; + + Ok(Self(WeightedIndex::new(weights)?)) + } + + fn sample(&self, rng: &mut impl Rng) -> RequestType { + const ITEMS: [RequestType; 3] = [ + RequestType::Announce, + RequestType::Connect, + RequestType::Scrape, + ]; + + ITEMS[self.0.sample(rng)] + } +} + +#[derive(Default)] +pub struct LocalStatistics { + pub requests: usize, + pub response_peers: usize, + pub responses_connect: usize, + pub responses_announce: usize, + pub responses_scrape: usize, + pub responses_error: usize, +} diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs deleted file mode 100644 index 8c7c606..0000000 --- a/crates/udp_load_test/src/worker/mod.rs +++ /dev/null @@ -1,371 +0,0 @@ -use std::io::Cursor; -use std::net::SocketAddr; -use std::sync::atomic::Ordering; -use std::time::Duration; - -use mio::{net::UdpSocket, Events, Interest, Poll, Token}; -use rand::Rng; -use rand::{prelude::SmallRng, SeedableRng}; -use rand_distr::{Distribution, Gamma, WeightedIndex}; -use socket2::{Domain, Protocol, Socket, Type}; - -use aquatic_udp_protocol::*; - -use crate::config::Config; -use crate::{common::*, utils::*}; - -const MAX_PACKET_SIZE: usize = 8192; - -pub struct Worker { - config: Config, - shared_state: LoadTestState, - gamma: Gamma, - addr: SocketAddr, - socket: UdpSocket, - buffer: [u8; MAX_PACKET_SIZE], - rng: SmallRng, - torrent_peers: TorrentPeerMap, - statistics: SocketWorkerLocalStatistics, -} - -impl Worker { - pub fn run(shared_state: LoadTestState, gamma: Gamma, config: Config, addr: SocketAddr) { - let socket = UdpSocket::from_std(create_socket(&config, addr)); - let buffer = [0u8; MAX_PACKET_SIZE]; - let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); - let torrent_peers = TorrentPeerMap::default(); - let statistics = SocketWorkerLocalStatistics::default(); - - let mut instance = Self { - config, - shared_state, - gamma, - addr, - socket, - buffer, - rng, - torrent_peers, - statistics, - }; - - instance.run_inner(); - } - - fn run_inner(&mut self) { - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(1); - - poll.registry() - .register(&mut self.socket, Token(0), Interest::READABLE) - .unwrap(); - - // Bootstrap request cycle - let initial_request = create_connect_request(generate_transaction_id(&mut self.rng)); - self.send_request(initial_request); - - let timeout = Duration::from_micros(self.config.network.poll_timeout); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - for _ in events.iter() { - while let Ok(amt) = self.socket.recv(&mut self.buffer) { - match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { - Ok(response) => { - if let Some(request) = self.process_response(response) { - self.send_request(request); - } - } - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); - } - } - } - - if self.rng.gen::() <= self.config.requests.additional_request_probability { - let additional_request = - create_connect_request(generate_transaction_id(&mut self.rng)); - - self.send_request(additional_request); - } - - self.update_shared_statistics(); - } - } - } - - fn process_response(&mut self, response: Response) -> Option { - match response { - Response::Connect(r) => { - self.statistics.responses_connect += 1; - - // Fetch the torrent peer or create it if is doesn't exists. Update - // the connection id if fetched. Create a request and move the - // torrent peer appropriately. - - let mut torrent_peer = self - .torrent_peers - .remove(&r.transaction_id) - .unwrap_or_else(|| self.create_torrent_peer(r.connection_id)); - - torrent_peer.connection_id = r.connection_id; - - let new_transaction_id = generate_transaction_id(&mut self.rng); - let request = self.create_random_request(new_transaction_id, &torrent_peer); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - Response::AnnounceIpv4(r) => { - self.statistics.responses_announce += 1; - self.statistics.response_peers += r.peers.len(); - - self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) - } - Response::AnnounceIpv6(r) => { - self.statistics.responses_announce += 1; - self.statistics.response_peers += r.peers.len(); - - self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) - } - Response::Scrape(r) => { - self.statistics.responses_scrape += 1; - - self.if_torrent_peer_move_and_create_random_request(r.transaction_id) - } - Response::Error(r) => { - self.statistics.responses_error += 1; - - if !r.message.to_lowercase().contains("connection") { - eprintln!( - "Received error response which didn't contain the word 'connection': {}", - r.message - ); - } - - if let Some(torrent_peer) = self.torrent_peers.remove(&r.transaction_id) { - let new_transaction_id = generate_transaction_id(&mut self.rng); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(create_connect_request(new_transaction_id)) - } else { - Some(create_connect_request(generate_transaction_id( - &mut self.rng, - ))) - } - } - } - } - - fn if_torrent_peer_move_and_create_random_request( - &mut self, - transaction_id: TransactionId, - ) -> Option { - let torrent_peer = self.torrent_peers.remove(&transaction_id)?; - - let new_transaction_id = generate_transaction_id(&mut self.rng); - - let request = self.create_random_request(new_transaction_id, &torrent_peer); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - - fn create_torrent_peer(&mut self, connection_id: ConnectionId) -> TorrentPeer { - let num_scrape_hashes = self - .rng - .gen_range(1..self.config.requests.scrape_max_torrents); - - let scrape_hash_indices = (0..num_scrape_hashes) - .map(|_| self.random_info_hash_index()) - .collect::>() - .into_boxed_slice(); - - let info_hash_index = self.random_info_hash_index(); - - TorrentPeer { - info_hash: self.shared_state.info_hashes[info_hash_index], - scrape_hash_indices, - connection_id, - peer_id: generate_peer_id(), - port: Port::new(self.rng.gen()), - } - } - - fn create_random_request( - &mut self, - transaction_id: TransactionId, - torrent_peer: &TorrentPeer, - ) -> Request { - const ITEMS: [RequestType; 3] = [ - RequestType::Announce, - RequestType::Connect, - RequestType::Scrape, - ]; - - let weights = [ - self.config.requests.weight_announce as u32, - self.config.requests.weight_connect as u32, - self.config.requests.weight_scrape as u32, - ]; - - let dist = WeightedIndex::new(weights).expect("random request weighted index"); - - match ITEMS[dist.sample(&mut self.rng)] { - RequestType::Announce => self.create_announce_request(torrent_peer, transaction_id), - RequestType::Connect => (ConnectRequest { transaction_id }).into(), - RequestType::Scrape => self.create_scrape_request(torrent_peer, transaction_id), - } - } - - fn create_announce_request( - &mut self, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, - ) -> Request { - let (event, bytes_left) = { - if self - .rng - .gen_bool(self.config.requests.peer_seeder_probability) - { - (AnnounceEvent::Completed, NumberOfBytes::new(0)) - } else { - (AnnounceEvent::Started, NumberOfBytes::new(50)) - } - }; - - (AnnounceRequest { - connection_id: torrent_peer.connection_id, - action_placeholder: Default::default(), - transaction_id, - info_hash: torrent_peer.info_hash, - peer_id: torrent_peer.peer_id, - bytes_downloaded: NumberOfBytes::new(50), - bytes_uploaded: NumberOfBytes::new(50), - bytes_left, - event: event.into(), - ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), - port: torrent_peer.port, - }) - .into() - } - - fn create_scrape_request( - &self, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, - ) -> Request { - let indeces = &torrent_peer.scrape_hash_indices; - - let mut scape_hashes = Vec::with_capacity(indeces.len()); - - for i in indeces.iter() { - scape_hashes.push(self.shared_state.info_hashes[*i].to_owned()) - } - - (ScrapeRequest { - connection_id: torrent_peer.connection_id, - transaction_id, - info_hashes: scape_hashes, - }) - .into() - } - - fn random_info_hash_index(&mut self) -> usize { - gamma_usize( - &mut self.rng, - self.gamma, - &self.config.requests.number_of_torrents - 1, - ) - } - - fn send_request(&mut self, request: Request) { - let mut cursor = Cursor::new(self.buffer); - - match request.write_bytes(&mut cursor) { - Ok(()) => { - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match self.socket.send(&inner[..position]) { - Ok(_) => { - self.statistics.requests += 1; - } - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); - } - } - } - Err(err) => { - eprintln!("request_to_bytes err: {}", err); - } - } - } - - fn update_shared_statistics(&mut self) { - self.shared_state - .statistics - .requests - .fetch_add(self.statistics.requests, Ordering::Relaxed); - self.shared_state - .statistics - .responses_connect - .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); - self.shared_state - .statistics - .responses_announce - .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); - self.shared_state - .statistics - .responses_scrape - .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); - self.shared_state - .statistics - .responses_error - .fetch_add(self.statistics.responses_error, Ordering::Relaxed); - self.shared_state - .statistics - .response_peers - .fetch_add(self.statistics.response_peers, Ordering::Relaxed); - - self.statistics = SocketWorkerLocalStatistics::default(); - } -} - -fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, err - ); - } - } - - socket - .bind(&addr.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); - - socket - .connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into() -} From d8bdcfcf0af1d2b759d85365af4e90c3f15a4283 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 5 Feb 2024 23:44:34 +0100 Subject: [PATCH 02/12] udp load tester: open multiple sockets per worker; minor other fixes --- crates/udp_load_test/src/common.rs | 1 + crates/udp_load_test/src/config.rs | 6 +- crates/udp_load_test/src/lib.rs | 5 +- crates/udp_load_test/src/worker.rs | 92 ++++++++++++++++++------------ 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7470830..7a17374 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -22,4 +22,5 @@ pub struct Peer { pub announce_info_hash: InfoHash, pub announce_port: Port, pub scrape_info_hash_indices: Box<[usize]>, + pub socket_index: u8, } diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 0f0b701..7513d41 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -60,8 +60,8 @@ pub struct NetworkConfig { /// /// Setting this to true can cause issues on macOS. pub multiple_client_ipv4s: bool, - /// Number of first client port - pub first_port: u16, + /// Number of sockets to open per worker + pub sockets_per_worker: u8, /// Size of socket recv buffer. Use 0 for OS default. /// /// This setting can have a big impact on dropped packages. It might @@ -81,7 +81,7 @@ impl Default for NetworkConfig { fn default() -> Self { Self { multiple_client_ipv4s: true, - first_port: 45_000, + sockets_per_worker: 4, recv_buffer: 8_000_000, } } diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 61dfbd5..ee6b658 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -54,8 +54,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { // Start workers for (i, peers) in (0..config.workers).zip(peers_by_worker) { - let port = config.network.first_port + (i as u16); - let ip = if config.server_address.is_ipv6() { Ipv6Addr::LOCALHOST.into() } else if config.network.multiple_client_ipv4s { @@ -64,7 +62,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ipv4Addr::LOCALHOST.into() }; - let addr = SocketAddr::new(ip, port); + let addr = SocketAddr::new(ip, 0); let config = config.clone(); let state = state.clone(); @@ -230,6 +228,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec, request_type_dist: RequestTypeDist, addr: SocketAddr, - socket: UdpSocket, + sockets: Vec, buffer: [u8; MAX_PACKET_SIZE], rng: SmallRng, statistics: LocalStatistics, @@ -29,7 +29,12 @@ pub struct Worker { impl Worker { pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { - let socket = create_socket(&config, addr); + let mut sockets = Vec::new(); + + for _ in 0..config.network.sockets_per_worker { + sockets.push(create_socket(&config, addr)); + } + let buffer = [0u8; MAX_PACKET_SIZE]; let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); let statistics = LocalStatistics::default(); @@ -41,7 +46,7 @@ impl Worker { peers, request_type_dist, addr, - socket, + sockets, buffer, rng, statistics, @@ -56,48 +61,59 @@ impl Worker { let mut requests_sent = 0usize; let mut responses_received = 0usize; + let mut connect_socket_index = 0u8; let mut peer_index = 0usize; let mut loop_index = 0usize; loop { let response_ratio = responses_received as f64 / requests_sent.max(1) as f64; - if response_ratio >= 0.95 || requests_sent == 0 || self.rng.gen::() == 0 { - match self.request_type_dist.sample(&mut self.rng) { - RequestType::Connect => { - self.send_connect_request(u32::MAX - 1); - } - RequestType::Announce => { - self.send_announce_request(connection_id, peer_index); + if response_ratio >= 0.90 || requests_sent == 0 || self.rng.gen::() == 0 { + for _ in 0..self.sockets.len() { + match self.request_type_dist.sample(&mut self.rng) { + RequestType::Connect => { + self.send_connect_request(connect_socket_index, u32::MAX - 1); - peer_index = (peer_index + 1) % self.peers.len(); - } - RequestType::Scrape => { - self.send_scrape_request(connection_id, peer_index); + connect_socket_index = connect_socket_index.wrapping_add(1) + % self.config.network.sockets_per_worker; + } + RequestType::Announce => { + self.send_announce_request(connection_id, peer_index); - peer_index = (peer_index + 1) % self.peers.len(); + peer_index = (peer_index + 1) % self.peers.len(); + } + RequestType::Scrape => { + self.send_scrape_request(connection_id, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } } + + requests_sent += 1; } - - requests_sent += 1; } - match self.socket.recv(&mut self.buffer[..]) { - Ok(amt) => { - match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { - Ok(response) => { - self.handle_response(response); - } - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); - } - } + for socket_index in 0..self.sockets.len() { + // Do this instead of iterating over Vec to fix borrow checker complaint + let socket = self.sockets.get(socket_index).unwrap(); - responses_received += 1; - } - Err(err) if err.kind() == ErrorKind::WouldBlock => (), - Err(err) => { - eprintln!("recv error: {:#}", err); + match socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(response) => { + self.handle_response(response); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + + responses_received += 1; + } + Err(err) if err.kind() == ErrorKind::WouldBlock => (), + Err(err) => { + eprintln!("recv error: {:#}", err); + } } } @@ -111,10 +127,10 @@ impl Worker { fn aquire_connection_id(&mut self) -> ConnectionId { loop { - self.send_connect_request(u32::MAX); + self.send_connect_request(0, u32::MAX); for _ in 0..100 { - match self.socket.recv(&mut self.buffer[..]) { + match self.sockets[0].recv(&mut self.buffer[..]) { Ok(amt) => { match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { Ok(Response::Connect(r)) => { @@ -139,7 +155,7 @@ impl Worker { } } - fn send_connect_request(&mut self, transaction_id: u32) { + fn send_connect_request(&mut self, socket_index: u8, transaction_id: u32) { let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes())); let request = ConnectRequest { transaction_id }; @@ -150,7 +166,7 @@ impl Worker { let position = cursor.position() as usize; - match self.socket.send(&cursor.get_ref()[..position]) { + match self.sockets[socket_index as usize].send(&cursor.get_ref()[..position]) { Ok(_) => { self.statistics.requests += 1; } @@ -199,7 +215,7 @@ impl Worker { let position = cursor.position() as usize; - match self.socket.send(&cursor.get_ref()[..position]) { + match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) { Ok(_) => { self.statistics.requests += 1; } @@ -233,7 +249,7 @@ impl Worker { let position = cursor.position() as usize; - match self.socket.send(&cursor.get_ref()[..position]) { + match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) { Ok(_) => { self.statistics.requests += 1; } From 6eb3195d628c1ddd7544e6946fb5ad4ad22dd58b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 5 Feb 2024 23:54:44 +0100 Subject: [PATCH 03/12] udp load test: fix typo --- crates/udp_load_test/src/worker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs index efb078c..e37cfc1 100644 --- a/crates/udp_load_test/src/worker.rs +++ b/crates/udp_load_test/src/worker.rs @@ -56,7 +56,7 @@ impl Worker { } fn run_inner(&mut self) { - let connection_id = self.aquire_connection_id(); + let connection_id = self.acquire_connection_id(); let mut requests_sent = 0usize; let mut responses_received = 0usize; @@ -125,7 +125,7 @@ impl Worker { } } - fn aquire_connection_id(&mut self) -> ConnectionId { + fn acquire_connection_id(&mut self) -> ConnectionId { loop { self.send_connect_request(0, u32::MAX); From 5cad19c12e67c230a397cc075e6e34fca8645794 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 17:05:09 +0100 Subject: [PATCH 04/12] udp load test: tweak defaults and peer distribution algorithm --- crates/udp_load_test/src/config.rs | 4 ++-- crates/udp_load_test/src/lib.rs | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 7513d41..4ea4144 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -114,8 +114,8 @@ pub struct RequestConfig { impl Default for RequestConfig { fn default() -> Self { Self { - number_of_torrents: 10_000, - number_of_peers: 100_000, + number_of_torrents: 1_000_000, + number_of_peers: 2_000_000, scrape_max_torrents: 10, announce_peers_wanted: 30, weight_connect: 0, diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index ee6b658..bc24709 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -41,7 +41,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { panic!("Error: report_last_seconds can't be larger than duration"); } - println!("Starting client with config: {:#?}", config); + println!("Starting client with config: {:#?}\n", config); let info_hash_dist = InfoHashDist::new(&config)?; let peers_by_worker = create_peers(&config, &info_hash_dist); @@ -235,6 +235,8 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec>(); if let Some(peers_per_info_hash) = opt_peers_per_info_hash { + println!("Number of info hashes: {}", peers_per_info_hash.len()); + let mut histogram = Histogram::::new(2).unwrap(); for num_peers in peers_per_info_hash.values() { @@ -245,10 +247,12 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec Date: Tue, 6 Feb 2024 18:06:12 +0100 Subject: [PATCH 05/12] udp load test: display stats on announce responses per info hash --- Cargo.lock | 1 + crates/udp_load_test/Cargo.toml | 1 + crates/udp_load_test/src/common.rs | 6 +++ crates/udp_load_test/src/config.rs | 6 +++ crates/udp_load_test/src/lib.rs | 60 ++++++++++++++++++++++-------- crates/udp_load_test/src/worker.rs | 43 ++++++++++++++++++++- 6 files changed, 100 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a61454d..114c02b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,6 +335,7 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", + "crossbeam-channel", "hdrhistogram", "mimalloc", "quickcheck", diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index 076810d..8cce143 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -25,6 +25,7 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" +crossbeam-channel = "0.5" hdrhistogram = "7" mimalloc = { version = "0.1", default-features = false } rand_distr = "0.4" diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7a17374..847a24c 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::IndexMap; use aquatic_udp_protocol::*; #[derive(Clone)] @@ -19,8 +20,13 @@ pub struct SharedStatistics { } pub struct Peer { + pub announce_info_hash_index: usize, pub announce_info_hash: InfoHash, pub announce_port: Port, pub scrape_info_hash_indices: Box<[usize]>, pub socket_index: u8, } + +pub enum StatisticsMessage { + ResponsesPerInfoHash(IndexMap), +} diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 4ea4144..5dbbd33 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -50,6 +50,12 @@ impl Default for Config { } } +impl aquatic_common::cli::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} + #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct NetworkConfig { diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index bc24709..cd0d2bf 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -9,6 +9,7 @@ use std::time::{Duration, Instant}; use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::IndexMap; use aquatic_udp_protocol::{InfoHash, Port}; +use crossbeam_channel::{unbounded, Receiver}; use hdrhistogram::Histogram; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; @@ -22,11 +23,7 @@ use common::*; use config::Config; use worker::*; -impl aquatic_common::cli::Config for Config { - fn get_log_level(&self) -> Option { - Some(self.log_level) - } -} +const PERCENTILES: &[f64] = &[10.0, 25.0, 50.0, 75.0, 90.0, 95.0, 99.0, 99.9, 100.0]; pub fn run(config: Config) -> ::anyhow::Result<()> { if config.requests.weight_announce @@ -51,6 +48,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { statistics: Arc::new(SharedStatistics::default()), }; + let (statistics_sender, statistics_receiver) = unbounded(); + // Start workers for (i, peers) in (0..config.workers).zip(peers_by_worker) { @@ -65,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let addr = SocketAddr::new(ip, 0); let config = config.clone(); let state = state.clone(); + let statistics_sender = statistics_sender.clone(); Builder::new().name("load-test".into()).spawn(move || { #[cfg(feature = "cpu-pinning")] @@ -75,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - Worker::run(config, state, peers, addr) + Worker::run(config, state, statistics_sender, peers, addr) })?; } @@ -87,12 +87,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - monitor_statistics(state, &config); + monitor_statistics(state, &config, statistics_receiver); Ok(()) } -fn monitor_statistics(state: LoadTestState, config: &Config) { +fn monitor_statistics( + state: LoadTestState, + config: &Config, + statistics_receiver: Receiver, +) { let mut report_avg_connect: Vec = Vec::new(); let mut report_avg_announce: Vec = Vec::new(); let mut report_avg_scrape: Vec = Vec::new(); @@ -108,6 +112,21 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let time_elapsed = loop { thread::sleep(Duration::from_secs(INTERVAL)); + let mut opt_responses_per_info_hash: Option> = + config.peer_histogram.then_some(Default::default()); + + for message in statistics_receiver.try_iter() { + match message { + StatisticsMessage::ResponsesPerInfoHash(data) => { + if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_mut() { + for (k, v) in data { + *responses_per_info_hash.entry(k).or_default() += v; + } + } + } + } + } + let requests = fetch_and_reset(&state.statistics.requests); let response_peers = fetch_and_reset(&state.statistics.response_peers); let responses_connect = fetch_and_reset(&state.statistics.responses_connect); @@ -151,6 +170,20 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { peers_per_announce_response ); + if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_ref() { + let mut histogram = Histogram::::new(2).unwrap(); + + for num_responses in responses_per_info_hash.values().copied() { + histogram.record(num_responses).unwrap(); + } + + println!("Announce responses per info hash:"); + + for p in PERCENTILES { + println!(" - p{}: {}", p, histogram.value_at_percentile(*p)); + } + } + let time_elapsed = start_time.elapsed(); if config.duration != 0 && time_elapsed >= duration { @@ -225,6 +258,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec Vec, + announce_responses_per_info_hash: IndexMap, } impl Worker { - pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { + pub fn run( + config: Config, + shared_state: LoadTestState, + statistics_sender: Sender, + peers: Box<[Peer]>, + addr: SocketAddr, + ) { let mut sockets = Vec::new(); for _ in 0..config.network.sockets_per_worker { @@ -50,6 +61,8 @@ impl Worker { buffer, rng, statistics, + statistics_sender, + announce_responses_per_info_hash: Default::default(), }; instance.run_inner(); @@ -267,10 +280,30 @@ impl Worker { Response::AnnounceIpv4(r) => { self.statistics.responses_announce += 1; self.statistics.response_peers += r.peers.len(); + + let peer_index = + u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize; + + if let Some(peer) = self.peers.get(peer_index) { + *self + .announce_responses_per_info_hash + .entry(peer.announce_info_hash_index) + .or_default() += 1; + } } Response::AnnounceIpv6(r) => { self.statistics.responses_announce += 1; self.statistics.response_peers += r.peers.len(); + + let peer_index = + u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize; + + if let Some(peer) = self.peers.get(peer_index) { + *self + .announce_responses_per_info_hash + .entry(peer.announce_info_hash_index) + .or_default() += 1; + } } Response::Scrape(_) => { self.statistics.responses_scrape += 1; @@ -303,6 +336,14 @@ impl Worker { .response_peers .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + if self.config.peer_histogram { + let message = StatisticsMessage::ResponsesPerInfoHash( + self.announce_responses_per_info_hash.split_off(0), + ); + + self.statistics_sender.try_send(message).unwrap(); + } + self.statistics = LocalStatistics::default(); } } From efa79303d21831b0c09f3cfdb5480670adf32559 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:23:30 +0100 Subject: [PATCH 06/12] udp load test: acquire a connection id per socket --- crates/udp_load_test/src/worker.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs index b91cd56..a090a18 100644 --- a/crates/udp_load_test/src/worker.rs +++ b/crates/udp_load_test/src/worker.rs @@ -69,7 +69,11 @@ impl Worker { } fn run_inner(&mut self) { - let connection_id = self.acquire_connection_id(); + let mut connection_ids = Vec::new(); + + for _ in 0..self.config.network.sockets_per_worker { + connection_ids.push(self.acquire_connection_id()); + } let mut requests_sent = 0usize; let mut responses_received = 0usize; @@ -91,12 +95,12 @@ impl Worker { % self.config.network.sockets_per_worker; } RequestType::Announce => { - self.send_announce_request(connection_id, peer_index); + self.send_announce_request(&connection_ids, peer_index); peer_index = (peer_index + 1) % self.peers.len(); } RequestType::Scrape => { - self.send_scrape_request(connection_id, peer_index); + self.send_scrape_request(&connection_ids, peer_index); peer_index = (peer_index + 1) % self.peers.len(); } @@ -189,7 +193,7 @@ impl Worker { } } - fn send_announce_request(&mut self, connection_id: ConnectionId, peer_index: usize) { + fn send_announce_request(&mut self, connection_ids: &[ConnectionId], peer_index: usize) { let peer = self.peers.get(peer_index).unwrap(); let (event, bytes_left) = { @@ -207,7 +211,7 @@ impl Worker { TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); let request = AnnounceRequest { - connection_id, + connection_id: connection_ids[peer.socket_index as usize], action_placeholder: Default::default(), transaction_id, info_hash: peer.announce_info_hash, @@ -238,7 +242,7 @@ impl Worker { } } - fn send_scrape_request(&mut self, connection_id: ConnectionId, peer_index: usize) { + fn send_scrape_request(&mut self, connection_ids: &[ConnectionId], peer_index: usize) { let peer = self.peers.get(peer_index).unwrap(); let transaction_id = @@ -251,7 +255,7 @@ impl Worker { } let request = ScrapeRequest { - connection_id, + connection_id: connection_ids[peer.socket_index as usize], transaction_id, info_hashes, }; From 0b6a02e1a772e485590d92417f926a9bdcf51e60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:32:26 +0100 Subject: [PATCH 07/12] udp load test: use connection IDs from responses in requests --- crates/udp_load_test/src/config.rs | 2 +- crates/udp_load_test/src/worker.rs | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 5dbbd33..5f87318 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -124,7 +124,7 @@ impl Default for RequestConfig { number_of_peers: 2_000_000, scrape_max_torrents: 10, announce_peers_wanted: 30, - weight_connect: 0, + weight_connect: 1, weight_announce: 100, weight_scrape: 1, peer_seeder_probability: 0.75, diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs index a090a18..0d8037c 100644 --- a/crates/udp_load_test/src/worker.rs +++ b/crates/udp_load_test/src/worker.rs @@ -89,7 +89,10 @@ impl Worker { for _ in 0..self.sockets.len() { match self.request_type_dist.sample(&mut self.rng) { RequestType::Connect => { - self.send_connect_request(connect_socket_index, u32::MAX - 1); + self.send_connect_request( + connect_socket_index, + connect_socket_index.into(), + ); connect_socket_index = connect_socket_index.wrapping_add(1) % self.config.network.sockets_per_worker; @@ -117,6 +120,16 @@ impl Worker { match socket.recv(&mut self.buffer[..]) { Ok(amt) => { match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(Response::Connect(r)) => { + // If we're sending connect requests, we might + // as well keep connection IDs valid + let connection_id_index = + u32::from_ne_bytes(r.transaction_id.0.get().to_ne_bytes()) + as usize; + connection_ids[connection_id_index] = r.connection_id; + + self.handle_response(Response::Connect(r)); + } Ok(response) => { self.handle_response(response); } From e705c03981955ce2f06d35fe68586cadd161156e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:34:17 +0100 Subject: [PATCH 08/12] Explicity use IndexMap::swap_remove to silence warnings --- crates/http/src/workers/swarm/storage.rs | 2 +- crates/udp/src/workers/statistics/mod.rs | 2 +- crates/udp/src/workers/swarm/storage.rs | 2 +- crates/ws/src/workers/swarm/storage.rs | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index a6419fc..bc1246b 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -433,7 +433,7 @@ impl LargePeerMap { } fn remove_peer(&mut self, key: &ResponsePeer) -> Option { - let opt_removed_peer = self.peers.remove(key); + let opt_removed_peer = self.peers.swap_remove(key); if let Some(Peer { is_seeder: true, .. diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index 25fc157..beafa2d 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -97,7 +97,7 @@ pub fn run_statistics_worker( *count -= 1; if *count == 0 { - peers.remove(&peer_id); + peers.swap_remove(&peer_id); } } } diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 4289370..3b042ea 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -407,7 +407,7 @@ impl LargePeerMap { } fn remove_peer(&mut self, key: &ResponsePeer) -> Option { - let opt_removed_peer = self.peers.remove(key); + let opt_removed_peer = self.peers.swap_remove(key); if let Some(Peer { is_seeder: true, .. diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 43820b4..1120305 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -356,7 +356,7 @@ impl TorrentData { peer.valid_until = valid_until; } PeerStatus::Stopped => { - let peer = entry.remove(); + let peer = entry.swap_remove(); if peer.seeder { self.num_seeders -= 1; @@ -477,7 +477,7 @@ impl TorrentData { if answer_receiver .expecting_answers - .remove(&expecting_answer) + .swap_remove(&expecting_answer) .is_some() { let answer_out_message = AnswerOutMessage { @@ -519,7 +519,7 @@ impl TorrentData { peer_id: PeerId, #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) { - if let Some(peer) = self.peers.remove(&peer_id) { + if let Some(peer) = self.peers.swap_remove(&peer_id) { if peer.seeder { self.num_seeders -= 1; } From 83acaf51f4483648e6cb04077d4ed7cdd622fdbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:48:39 +0100 Subject: [PATCH 09/12] bencher: change default durations --- crates/bencher/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 70cbc44..4c2b305 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -25,7 +25,7 @@ struct Args { #[arg(long, default_value_t = Priority::Medium)] min_priority: Priority, /// How long to run each load test for - #[arg(long, default_value_t = 90)] + #[arg(long, default_value_t = 30)] duration: usize, /// Only include data for last N seconds of load test runs. /// @@ -33,7 +33,7 @@ struct Args { /// maximum throughput /// /// 0 = use data for whole run - #[arg(long, default_value_t = 30)] + #[arg(long, default_value_t = 0)] summarize_last: usize, #[command(subcommand)] command: Command, From c97a3a799678e6e85c429f2bca7804890cbc6009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:48:59 +0100 Subject: [PATCH 10/12] Run cargo update Updating anstyle v1.0.5 -> v1.0.6 Updating hermit-abi v0.3.4 -> v0.3.5 Removing redox_syscall v0.4.1 Updating sketches-ddsketch v0.2.1 -> v0.2.2 Updating tempfile v3.9.0 -> v3.10.0 Updating toml v0.8.9 -> v0.8.10 Updating toml_edit v0.21.1 -> v0.22.4 --- Cargo.lock | 42 ++++++++++++++++-------------------------- 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 114c02b..bc446f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,9 +78,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" @@ -145,7 +145,7 @@ dependencies = [ "regex", "serde", "tempfile", - "toml 0.8.9", + "toml 0.8.10", ] [[package]] @@ -1349,9 +1349,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" [[package]] name = "hex" @@ -1527,7 +1527,7 @@ version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ - "hermit-abi 0.3.4", + "hermit-abi 0.3.5", "rustix", "windows-sys 0.52.0", ] @@ -1774,7 +1774,7 @@ dependencies = [ "ordered-float", "quanta", "radix_trie", - "sketches-ddsketch 0.2.1", + "sketches-ddsketch 0.2.2", ] [[package]] @@ -1940,7 +1940,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.4", + "hermit-abi 0.3.5", "libc", ] @@ -2269,15 +2269,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "ref-cast" version = "1.0.22" @@ -2573,9 +2564,9 @@ checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee" [[package]] name = "sketches-ddsketch" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" [[package]] name = "slab" @@ -2705,13 +2696,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", "fastrand 2.0.1", - "redox_syscall", "rustix", "windows-sys 0.52.0", ] @@ -2834,9 +2824,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" dependencies = [ "serde", "serde_spanned", @@ -2855,9 +2845,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.21.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +checksum = "0c9ffdf896f8daaabf9b66ba8e77ea1ed5ed0f72821b398aba62352e95062951" dependencies = [ "indexmap 2.2.2", "serde", From b16ab82699f71e71a33811b73bac3ca426eee02c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:52:12 +0100 Subject: [PATCH 11/12] udp load test: rename config key peer_histogram to extra_statistics --- crates/udp_load_test/src/config.rs | 6 +++--- crates/udp_load_test/src/lib.rs | 4 ++-- crates/udp_load_test/src/worker.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 5f87318..27e7eb2 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -25,8 +25,8 @@ pub struct Config { /// /// 0 = include whole run pub summarize_last: usize, - /// Display data on number of peers per info hash - pub peer_histogram: bool, + /// Display extra statistics + pub extra_statistics: bool, pub network: NetworkConfig, pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] @@ -41,7 +41,7 @@ impl Default for Config { workers: 1, duration: 0, summarize_last: 0, - peer_histogram: true, + extra_statistics: true, network: NetworkConfig::default(), requests: RequestConfig::default(), #[cfg(feature = "cpu-pinning")] diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index cd0d2bf..fced2ca 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -113,7 +113,7 @@ fn monitor_statistics( thread::sleep(Duration::from_secs(INTERVAL)); let mut opt_responses_per_info_hash: Option> = - config.peer_histogram.then_some(Default::default()); + config.extra_statistics.then_some(Default::default()); for message in statistics_receiver.try_iter() { match message { @@ -239,7 +239,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec> = - config.peer_histogram.then_some(IndexMap::default()); + config.extra_statistics.then_some(IndexMap::default()); let mut all_peers = repeat_with(|| { let num_scrape_indices = rng.gen_range(1..config.requests.scrape_max_torrents + 1); diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs index 0d8037c..570b1bb 100644 --- a/crates/udp_load_test/src/worker.rs +++ b/crates/udp_load_test/src/worker.rs @@ -353,7 +353,7 @@ impl Worker { .response_peers .fetch_add(self.statistics.response_peers, Ordering::Relaxed); - if self.config.peer_histogram { + if self.config.extra_statistics { let message = StatisticsMessage::ResponsesPerInfoHash( self.announce_responses_per_info_hash.split_off(0), ); From 983d88734c6f1d0b0848d923a33e07b8a2789117 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Feb 2024 18:53:51 +0100 Subject: [PATCH 12/12] bencher: disable udp load test extra statistics --- crates/bencher/src/protocols/udp.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index f6cc780..9926abf 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -443,6 +443,8 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { c.duration = self.parameters.duration; c.summarize_last = self.parameters.summarize_last; + c.extra_statistics = false; + c.requests.announce_peers_wanted = 30; c.requests.weight_connect = 0; c.requests.weight_announce = 100;