From 2fe57b9f67eba71da141688c591568fd7baee33d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 21:59:11 +0100 Subject: [PATCH 1/7] udp: add config key worker_channel_size to enable bounded channels --- aquatic_udp/src/common.rs | 22 +++++++++++++++++----- aquatic_udp/src/config.rs | 5 +++++ aquatic_udp/src/lib.rs | 14 +++++++++++--- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index b0f9f2e..085a82a 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -5,7 +5,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; -use crossbeam_channel::Sender; +use crossbeam_channel::{Sender, TrySendError}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; use aquatic_common::AHashIndexMap; @@ -88,8 +88,14 @@ impl ConnectedRequestSender { request: ConnectedRequest, addr: SocketAddr, ) { - if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + match self.senders[index.0].try_send((self.index, request, addr)) { + Ok(()) => {}, + Err(TrySendError::Full(_)) => { + ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0) + } + Err(TrySendError::Disconnected(_)) => { + panic!("Request channel {} is disconnected", index.0); + } } } } @@ -109,8 +115,14 @@ impl ConnectedResponseSender { response: ConnectedResponse, addr: SocketAddr, ) { - if let Err(err) = self.senders[index.0].try_send((response, addr)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + match self.senders[index.0].try_send((response, addr)) { + Ok(()) => {}, + Err(TrySendError::Full(_)) => { + ::log::error!("Response channel {} is full, dropping response. Try increasing number of socket workers or raising config.worker_channel_size.", index.0) + } + Err(TrySendError::Disconnected(_)) => { + panic!("Response channel {} is disconnected", index.0); + } } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 35f2e0a..721a43a 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -16,6 +16,10 @@ pub struct Config { /// generate responses and send them back to the socket workers. pub request_workers: usize, pub log_level: LogLevel, + /// Maximum number of items in each channel passing requests/responses + /// between workers. A value of zero means that the channel will be of + /// unbounded size. + pub worker_channel_size: usize, pub network: NetworkConfig, pub protocol: ProtocolConfig, pub handlers: HandlerConfig, @@ -33,6 +37,7 @@ impl Default for Config { socket_workers: 1, request_workers: 1, log_level: LogLevel::Error, + worker_channel_size: 0, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), handlers: HandlerConfig::default(), diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 1e7e604..3c77751 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -12,7 +12,7 @@ use anyhow::Context; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; -use crossbeam_channel::unbounded; +use crossbeam_channel::{bounded, unbounded}; use aquatic_common::access_list::update_access_list; use signal_hook::consts::SIGUSR1; @@ -40,14 +40,22 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_receivers = BTreeMap::new(); for i in 0..config.request_workers { - let (request_sender, request_receiver) = unbounded(); + let (request_sender, request_receiver) = if config.worker_channel_size == 0 { + unbounded() + } else { + bounded(config.worker_channel_size) + }; request_senders.push(request_sender); request_receivers.insert(i, request_receiver); } for i in 0..config.socket_workers { - let (response_sender, response_receiver) = unbounded(); + let (response_sender, response_receiver) = if config.worker_channel_size == 0 { + unbounded() + } else { + bounded(config.worker_channel_size) + }; response_senders.push(response_sender); response_receivers.insert(i, response_receiver); From 125d3c49d609e0519e54b6c9395831f2e746e423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 22:10:29 +0100 Subject: [PATCH 2/7] udp: config: rename channel_recv_timeout_ms and move to root, add docs --- aquatic_udp/src/config.rs | 23 +++++++---------------- aquatic_udp/src/workers/request.rs | 2 +- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 721a43a..279d6e8 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -20,9 +20,14 @@ pub struct Config { /// between workers. A value of zero means that the channel will be of /// unbounded size. pub worker_channel_size: usize, + /// How long to block waiting for requests in request workers. Higher + /// values means that with zero traffic, the worker will not unnecessarily + /// cause the CPU to wake up as often. However, high values (something like + /// larger than 1000) combined with very low traffic can cause delays + /// in torrent cleaning. + pub request_channel_recv_timeout_ms: u64, pub network: NetworkConfig, pub protocol: ProtocolConfig, - pub handlers: HandlerConfig, pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, @@ -38,9 +43,9 @@ impl Default for Config { request_workers: 1, log_level: LogLevel::Error, worker_channel_size: 0, + request_channel_recv_timeout_ms: 100, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), - handlers: HandlerConfig::default(), statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), @@ -115,20 +120,6 @@ impl Default for ProtocolConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct HandlerConfig { - pub channel_recv_timeout_ms: u64, -} - -impl Default for HandlerConfig { - fn default() -> Self { - Self { - channel_recv_timeout_ms: 100, - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct StatisticsConfig { diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index dd0498b..ea81d29 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -90,7 +90,7 @@ pub fn run_request_worker( let mut torrents = TorrentMaps::default(); let mut small_rng = SmallRng::from_entropy(); - let timeout = Duration::from_millis(config.handlers.channel_recv_timeout_ms); + let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); From dd573cdb3032de76e449b8966090bccd29d59abc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 22:11:21 +0100 Subject: [PATCH 3/7] Run cargo fmt --- aquatic_http/src/lib.rs | 11 ++++++++--- aquatic_http/src/workers/mod.rs | 2 +- aquatic_udp/src/common.rs | 4 ++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 54ed266..1975aac 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -15,8 +15,8 @@ use std::{ use crate::config::Config; mod common; -mod workers; pub mod config; +mod workers; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; @@ -115,8 +115,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { WorkerIndex::RequestWorker(i), ); - workers::request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) - .await + workers::request::run_request_worker( + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await }); executors.push(executor); diff --git a/aquatic_http/src/workers/mod.rs b/aquatic_http/src/workers/mod.rs index 5f41ab1..63fc0ec 100644 --- a/aquatic_http/src/workers/mod.rs +++ b/aquatic_http/src/workers/mod.rs @@ -1,2 +1,2 @@ pub mod request; -pub mod socket; \ No newline at end of file +pub mod socket; diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 085a82a..06c1e75 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -89,7 +89,7 @@ impl ConnectedRequestSender { addr: SocketAddr, ) { match self.senders[index.0].try_send((self.index, request, addr)) { - Ok(()) => {}, + Ok(()) => {} Err(TrySendError::Full(_)) => { ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0) } @@ -116,7 +116,7 @@ impl ConnectedResponseSender { addr: SocketAddr, ) { match self.senders[index.0].try_send((response, addr)) { - Ok(()) => {}, + Ok(()) => {} Err(TrySendError::Full(_)) => { ::log::error!("Response channel {} is full, dropping response. Try increasing number of socket workers or raising config.worker_channel_size.", index.0) } From 411716e333ed475f58c7c11888f69f41a939c4df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 22:18:33 +0100 Subject: [PATCH 4/7] udo load test: config: rename "handler" to "requests", refactor --- aquatic_udp_load_test/src/config.rs | 68 ++++++++++++++-------------- aquatic_udp_load_test/src/handler.rs | 8 ++-- aquatic_udp_load_test/src/main.rs | 10 ++-- aquatic_udp_load_test/src/network.rs | 2 +- aquatic_udp_load_test/src/utils.rs | 4 +- 5 files changed, 47 insertions(+), 45 deletions(-) diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index cc63900..55ee0e2 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -18,15 +18,27 @@ pub struct Config { pub workers: u8, /// Run duration (quit and generate report after this many seconds) pub duration: usize, - /// Probability that an additional connect request will be sent for each - /// mio event - pub additional_request_probability: f32, pub network: NetworkConfig, - pub handler: HandlerConfig, + pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] pub cpu_pinning: CpuPinningConfig, } +impl Default for Config { + fn default() -> Self { + Self { + server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, + workers: 1, + duration: 0, + network: NetworkConfig::default(), + requests: RequestConfig::default(), + #[cfg(feature = "cpu-pinning")] + cpu_pinning: CpuPinningConfig::default_for_load_test(), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct NetworkConfig { @@ -60,9 +72,21 @@ pub struct NetworkConfig { pub recv_buffer: usize, } +impl Default for NetworkConfig { + fn default() -> Self { + Self { + multiple_client_ipv4s: true, + first_port: 45_000, + poll_timeout: 276, + poll_event_capacity: 2_877, + recv_buffer: 6_000_000, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] -pub struct HandlerConfig { +pub struct RequestConfig { /// Number of torrents to simulate pub number_of_torrents: usize, /// Maximum number of torrents to ask about in scrape requests @@ -82,37 +106,12 @@ pub struct HandlerConfig { pub torrent_selection_pareto_shape: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, + /// Probability that an additional connect request will be sent for each + /// mio event + pub additional_request_probability: f32, } -impl Default for Config { - fn default() -> Self { - Self { - server_address: "127.0.0.1:3000".parse().unwrap(), - log_level: LogLevel::Error, - workers: 1, - duration: 0, - additional_request_probability: 0.5, - network: NetworkConfig::default(), - handler: HandlerConfig::default(), - #[cfg(feature = "cpu-pinning")] - cpu_pinning: CpuPinningConfig::default_for_load_test(), - } - } -} - -impl Default for NetworkConfig { - fn default() -> Self { - Self { - multiple_client_ipv4s: true, - first_port: 45_000, - poll_timeout: 276, - poll_event_capacity: 2_877, - recv_buffer: 6_000_000, - } - } -} - -impl Default for HandlerConfig { +impl Default for RequestConfig { fn default() -> Self { Self { number_of_torrents: 10_000, @@ -122,6 +121,7 @@ impl Default for HandlerConfig { weight_announce: 5, weight_scrape: 1, torrent_selection_pareto_shape: 2.0, + additional_request_probability: 0.5, } } } diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/handler.rs index 2611341..b791c10 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/handler.rs @@ -115,9 +115,9 @@ fn create_random_request( torrent_peer: &TorrentPeer, ) -> Request { let weights = vec![ - config.handler.weight_announce as u32, - config.handler.weight_connect as u32, - config.handler.weight_scrape as u32, + config.requests.weight_announce as u32, + config.requests.weight_connect as u32, + config.requests.weight_scrape as u32, ]; let items = vec![ @@ -142,7 +142,7 @@ fn create_announce_request( transaction_id: TransactionId, ) -> Request { let (event, bytes_left) = { - if rng.gen_bool(config.handler.peer_seeder_probability) { + if rng.gen_bool(config.requests.peer_seeder_probability) { (AnnounceEvent::Completed, NumberOfBytes(0)) } else { (AnnounceEvent::Started, NumberOfBytes(50)) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 957fcbc..b966a86 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -37,7 +37,9 @@ impl aquatic_cli_helpers::Config for Config { } fn run(config: Config) -> ::anyhow::Result<()> { - if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape + if config.requests.weight_announce + + config.requests.weight_connect + + config.requests.weight_scrape == 0 { panic!("Error: at least one weight must be larger than zero."); @@ -45,9 +47,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); + let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); - for _ in 0..config.handler.number_of_torrents { + for _ in 0..config.requests.number_of_torrents { info_hashes.push(generate_info_hash()); } @@ -56,7 +58,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { statistics: Arc::new(Statistics::default()), }; - let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap(); + let pareto = Pareto::new(1.0, config.requests.torrent_selection_pareto_shape).unwrap(); // Start workers diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index 3a61804..bd4cf3a 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -127,7 +127,7 @@ pub fn run_worker_thread( } } - if rng.gen::() <= config.additional_request_probability { + if rng.gen::() <= config.requests.additional_request_probability { let additional_request = create_connect_request(generate_transaction_id(&mut rng)); diff --git a/aquatic_udp_load_test/src/utils.rs b/aquatic_udp_load_test/src/utils.rs index f88c211..aecbe5a 100644 --- a/aquatic_udp_load_test/src/utils.rs +++ b/aquatic_udp_load_test/src/utils.rs @@ -15,7 +15,7 @@ pub fn create_torrent_peer( info_hashes: &Arc>, connection_id: ConnectionId, ) -> TorrentPeer { - let num_scape_hashes = rng.gen_range(1..config.handler.scrape_max_torrents); + let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); let mut scrape_hash_indeces = Vec::new(); @@ -35,7 +35,7 @@ pub fn create_torrent_peer( } fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { - pareto_usize(rng, pareto, config.handler.number_of_torrents - 1) + pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) } pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto, max: usize) -> usize { From e9729034519ddfa8992a99a33aed068786df94b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 22:28:50 +0100 Subject: [PATCH 5/7] udp load test: move network.rs and handler.rs into new worker module --- aquatic_udp_load_test/src/main.rs | 5 +- aquatic_udp_load_test/src/utils.rs | 35 ---------- .../src/{network.rs => worker/mod.rs} | 69 ++++++++++--------- .../src/{handler.rs => worker/request_gen.rs} | 30 ++++++++ 4 files changed, 68 insertions(+), 71 deletions(-) rename aquatic_udp_load_test/src/{network.rs => worker/mod.rs} (97%) rename aquatic_udp_load_test/src/{handler.rs => worker/request_gen.rs} (86%) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index b966a86..8591911 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -10,14 +10,13 @@ use rand_distr::Pareto; mod common; mod config; -mod handler; -mod network; mod utils; +mod worker; use common::*; use config::Config; -use network::*; use utils::*; +use worker::*; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; diff --git a/aquatic_udp_load_test/src/utils.rs b/aquatic_udp_load_test/src/utils.rs index aecbe5a..bae1391 100644 --- a/aquatic_udp_load_test/src/utils.rs +++ b/aquatic_udp_load_test/src/utils.rs @@ -1,43 +1,8 @@ -use std::sync::Arc; - use rand::prelude::*; use rand_distr::Pareto; use aquatic_udp_protocol::*; -use crate::common::*; -use crate::config::Config; - -pub fn create_torrent_peer( - config: &Config, - rng: &mut impl Rng, - pareto: Pareto, - info_hashes: &Arc>, - connection_id: ConnectionId, -) -> TorrentPeer { - let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); - - let mut scrape_hash_indeces = Vec::new(); - - for _ in 0..num_scape_hashes { - scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) - } - - let info_hash_index = select_info_hash_index(config, rng, pareto); - - TorrentPeer { - info_hash: info_hashes[info_hash_index], - scrape_hash_indeces, - connection_id, - peer_id: generate_peer_id(), - port: Port(rand::random()), - } -} - -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { - pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) -} - pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto, max: usize) -> usize { let p: f64 = rng.sample(pareto); let p = (p.min(101.0f64) - 1.0) / 100.0; diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/worker/mod.rs similarity index 97% rename from aquatic_udp_load_test/src/network.rs rename to aquatic_udp_load_test/src/worker/mod.rs index bd4cf3a..26f5f5a 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -1,3 +1,5 @@ +mod request_gen; + use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; @@ -12,42 +14,11 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; use crate::config::Config; -use crate::{common::*, handler::process_response, utils::*}; +use crate::{common::*, utils::*}; +use request_gen::process_response; const MAX_PACKET_SIZE: usize = 4096; -pub fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, err - ); - } - } - - socket - .bind(&addr.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); - - socket - .connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into() -} - pub fn run_worker_thread( state: LoadTestState, pareto: Pareto, @@ -201,3 +172,35 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker *statistics = SocketWorkerLocalStatistics::default(); } + +fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, err + ); + } + } + + socket + .bind(&addr.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); + + socket + .connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into() +} diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/worker/request_gen.rs similarity index 86% rename from aquatic_udp_load_test/src/handler.rs rename to aquatic_udp_load_test/src/worker/request_gen.rs index b791c10..98091c8 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/worker/request_gen.rs @@ -186,3 +186,33 @@ fn create_scrape_request( }) .into() } + +fn create_torrent_peer( + config: &Config, + rng: &mut impl Rng, + pareto: Pareto, + info_hashes: &Arc>, + connection_id: ConnectionId, +) -> TorrentPeer { + let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); + + let mut scrape_hash_indeces = Vec::new(); + + for _ in 0..num_scape_hashes { + scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) + } + + let info_hash_index = select_info_hash_index(config, rng, pareto); + + TorrentPeer { + info_hash: info_hashes[info_hash_index], + scrape_hash_indeces, + connection_id, + peer_id: generate_peer_id(), + port: Port(rand::random()), + } +} + +fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { + pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) +} From f068d86e0deafbe8c73f8423c4f788735123555f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 22:30:22 +0100 Subject: [PATCH 6/7] udp load test: set MAX_PACKET_SIZE to 8192 --- aquatic_udp_load_test/src/worker/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp_load_test/src/worker/mod.rs b/aquatic_udp_load_test/src/worker/mod.rs index 26f5f5a..1968b92 100644 --- a/aquatic_udp_load_test/src/worker/mod.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -17,7 +17,7 @@ use crate::config::Config; use crate::{common::*, utils::*}; use request_gen::process_response; -const MAX_PACKET_SIZE: usize = 4096; +const MAX_PACKET_SIZE: usize = 8192; pub fn run_worker_thread( state: LoadTestState, From 9d8f9d8f7ca84a1b11095a2e6a6373c87aed59b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 22:33:55 +0100 Subject: [PATCH 7/7] udp load test: remove ThreadId newtype, use poll token 0 in all workers --- aquatic_udp_load_test/src/common.rs | 3 --- aquatic_udp_load_test/src/main.rs | 3 +-- aquatic_udp_load_test/src/worker/mod.rs | 3 +-- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index 276c324..6f40729 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -4,9 +4,6 @@ use hashbrown::HashMap; use aquatic_udp_protocol::*; -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -pub struct ThreadId(pub u8); - #[derive(PartialEq, Eq, Clone)] pub struct TorrentPeer { pub info_hash: InfoHash, diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 8591911..2488659 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -62,7 +62,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { // Start workers for i in 0..config.workers { - let thread_id = ThreadId(i); let port = config.network.first_port + (i as u16); let ip = if config.server_address.is_ipv6() { @@ -87,7 +86,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - run_worker_thread(state, pareto, &config, addr, thread_id) + run_worker_thread(state, pareto, &config, addr) }); } diff --git a/aquatic_udp_load_test/src/worker/mod.rs b/aquatic_udp_load_test/src/worker/mod.rs index 1968b92..816dafc 100644 --- a/aquatic_udp_load_test/src/worker/mod.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -24,7 +24,6 @@ pub fn run_worker_thread( pareto: Pareto, config: &Config, addr: SocketAddr, - thread_id: ThreadId, ) { let mut socket = UdpSocket::from_std(create_socket(config, addr)); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -32,7 +31,7 @@ pub fn run_worker_thread( let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()"); let mut torrent_peers = TorrentPeerMap::default(); - let token = Token(thread_id.0 as usize); + let token = Token(0); let interests = Interest::READABLE; let timeout = Duration::from_micros(config.network.poll_timeout);