From db561a1101f68fc232e7c7b5a47f159ec75f4813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 26 Oct 2022 19:49:30 +0200 Subject: [PATCH] Use gamma distribution for all load testers --- aquatic_http_load_test/src/common.rs | 4 +- aquatic_http_load_test/src/config.rs | 39 ++++++++++--------- aquatic_http_load_test/src/main.rs | 10 +++-- aquatic_http_load_test/src/utils.rs | 8 ++-- aquatic_udp_load_test/src/main.rs | 4 +- aquatic_udp_load_test/src/utils.rs | 4 +- aquatic_udp_load_test/src/worker/mod.rs | 4 +- .../src/worker/request_gen.rs | 14 +++---- aquatic_ws_load_test/src/common.rs | 4 +- aquatic_ws_load_test/src/config.rs | 11 +++--- aquatic_ws_load_test/src/main.rs | 10 +++-- aquatic_ws_load_test/src/utils.rs | 8 ++-- 12 files changed, 65 insertions(+), 55 deletions(-) diff --git a/aquatic_http_load_test/src/common.rs b/aquatic_http_load_test/src/common.rs index ad5ad6b..f187ad5 100644 --- a/aquatic_http_load_test/src/common.rs +++ b/aquatic_http_load_test/src/common.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use rand_distr::Pareto; +use rand_distr::Gamma; pub use aquatic_http_protocol::common::*; pub use aquatic_http_protocol::request::*; @@ -29,7 +29,7 @@ pub struct Statistics { pub struct LoadTestState { pub info_hashes: Arc>, pub statistics: Arc, - pub pareto: Arc>, + pub gamma: Arc>, } #[derive(PartialEq, Eq, Clone, Copy)] diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 5c1a35d..060b401 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -33,24 +33,6 @@ impl aquatic_common::cli::Config for Config { } } -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default, deny_unknown_fields)] -pub struct TorrentConfig { - pub number_of_torrents: usize, - /// Pareto shape - /// - /// Fake peers choose torrents according to Pareto distribution. - pub torrent_selection_pareto_shape: f64, - /// Probability that a generated peer is a seeder - pub peer_seeder_probability: f64, - /// Probability that a generated request is a announce request, as part - /// of sum of the various weight arguments. - pub weight_announce: usize, - /// Probability that a generated request is a scrape request, as part - /// of sum of the various weight arguments. - pub weight_scrape: usize, -} - impl Default for Config { fn default() -> Self { Self { @@ -68,14 +50,33 @@ impl Default for Config { } } +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct TorrentConfig { + pub number_of_torrents: usize, + /// Probability that a generated peer is a seeder + pub peer_seeder_probability: f64, + /// Probability that a generated request is a announce request, as part + /// of sum of the various weight arguments. + pub weight_announce: usize, + /// Probability that a generated request is a scrape request, as part + /// of sum of the various weight arguments. + pub weight_scrape: usize, + /// Peers choose torrents according to this Gamma distribution shape + pub torrent_gamma_shape: f64, + /// Peers choose torrents according to this Gamma distribution scale + pub torrent_gamma_scale: f64, +} + impl Default for TorrentConfig { fn default() -> Self { Self { number_of_torrents: 10_000, peer_seeder_probability: 0.25, - torrent_selection_pareto_shape: 2.0, weight_announce: 5, weight_scrape: 0, + torrent_gamma_shape: 0.2, + torrent_gamma_scale: 100.0, } } } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 1955885..5dfc0d0 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -6,7 +6,7 @@ use ::glommio::LocalExecutorBuilder; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; mod common; mod config; @@ -47,12 +47,16 @@ fn run(config: Config) -> ::anyhow::Result<()> { info_hashes.push(InfoHash(rng.gen())); } - let pareto = Pareto::new(1.0, config.torrents.torrent_selection_pareto_shape).unwrap(); + let gamma = Gamma::new( + config.torrents.torrent_gamma_shape, + config.torrents.torrent_gamma_scale, + ) + .unwrap(); let state = LoadTestState { info_hashes: Arc::new(info_hashes), statistics: Arc::new(Statistics::default()), - pareto: Arc::new(pareto), + gamma: Arc::new(gamma), }; let tls_config = create_tls_config().unwrap(); diff --git a/aquatic_http_load_test/src/utils.rs b/aquatic_http_load_test/src/utils.rs index 313657a..c3dab09 100644 --- a/aquatic_http_load_test/src/utils.rs +++ b/aquatic_http_load_test/src/utils.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rand::distributions::WeightedIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; use crate::common::*; use crate::config::*; @@ -69,12 +69,12 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl #[inline] fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { - pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) + gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) } #[inline] -fn pareto_usize(rng: &mut impl Rng, pareto: &Arc>, max: usize) -> usize { - let p: f64 = pareto.sample(rng); +fn gamma_usize(rng: &mut impl Rng, gamma: &Arc>, max: usize) -> usize { + let p: f64 = gamma.sample(rng); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 30d70d5..48af77b 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -58,7 +58,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { statistics: Arc::new(Statistics::default()), }; - let pareto = Gamma::new( + let gamma = Gamma::new( config.requests.torrent_gamma_shape, config.requests.torrent_gamma_scale, ) @@ -92,7 +92,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - run_worker_thread(state, pareto, &config, addr) + run_worker_thread(state, gamma, &config, addr) })?; } diff --git a/aquatic_udp_load_test/src/utils.rs b/aquatic_udp_load_test/src/utils.rs index d436233..fd32443 100644 --- a/aquatic_udp_load_test/src/utils.rs +++ b/aquatic_udp_load_test/src/utils.rs @@ -3,8 +3,8 @@ use rand_distr::Gamma; use aquatic_udp_protocol::*; -pub fn pareto_usize(rng: &mut impl Rng, pareto: Gamma, max: usize) -> usize { - let p: f64 = rng.sample(pareto); +pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma, max: usize) -> usize { + let p: f64 = rng.sample(gamma); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize diff --git a/aquatic_udp_load_test/src/worker/mod.rs b/aquatic_udp_load_test/src/worker/mod.rs index 63c2fd4..b863660 100644 --- a/aquatic_udp_load_test/src/worker/mod.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -21,7 +21,7 @@ const MAX_PACKET_SIZE: usize = 8192; pub fn run_worker_thread( state: LoadTestState, - pareto: Gamma, + gamma: Gamma, config: &Config, addr: SocketAddr, ) { @@ -80,7 +80,7 @@ pub fn run_worker_thread( let opt_request = process_response( &mut rng, - pareto, + gamma, &state.info_hashes, &config, &mut torrent_peers, diff --git a/aquatic_udp_load_test/src/worker/request_gen.rs b/aquatic_udp_load_test/src/worker/request_gen.rs index 665ed32..8f504fd 100644 --- a/aquatic_udp_load_test/src/worker/request_gen.rs +++ b/aquatic_udp_load_test/src/worker/request_gen.rs @@ -12,7 +12,7 @@ use crate::utils::*; pub fn process_response( rng: &mut impl Rng, - pareto: Gamma, + gamma: Gamma, info_hashes: &Arc>, config: &Config, torrent_peers: &mut TorrentPeerMap, @@ -32,7 +32,7 @@ pub fn process_response( torrent_peer }) .unwrap_or_else(|| { - create_torrent_peer(config, rng, pareto, info_hashes, r.connection_id) + create_torrent_peer(config, rng, gamma, info_hashes, r.connection_id) }); let new_transaction_id = generate_transaction_id(rng); @@ -190,7 +190,7 @@ fn create_scrape_request( fn create_torrent_peer( config: &Config, rng: &mut impl Rng, - pareto: Gamma, + gamma: Gamma, info_hashes: &Arc>, connection_id: ConnectionId, ) -> TorrentPeer { @@ -199,10 +199,10 @@ fn create_torrent_peer( let mut scrape_hash_indeces = Vec::new(); for _ in 0..num_scape_hashes { - scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) + scrape_hash_indeces.push(select_info_hash_index(config, rng, gamma)) } - let info_hash_index = select_info_hash_index(config, rng, pareto); + let info_hash_index = select_info_hash_index(config, rng, gamma); TorrentPeer { info_hash: info_hashes[info_hash_index], @@ -213,6 +213,6 @@ fn create_torrent_peer( } } -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Gamma) -> usize { - pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) +fn select_info_hash_index(config: &Config, rng: &mut impl Rng, gamma: Gamma) -> usize { + gamma_usize(rng, gamma, config.requests.number_of_torrents - 1) } diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 200d04f..288f484 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use rand_distr::Pareto; +use rand_distr::Gamma; pub use aquatic_ws_protocol::*; @@ -19,7 +19,7 @@ pub struct Statistics { pub struct LoadTestState { pub info_hashes: Arc>, pub statistics: Arc, - pub pareto: Arc>, + pub gamma: Arc>, } #[derive(PartialEq, Eq, Clone, Copy)] diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 370769d..eff0ae5 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -45,10 +45,6 @@ impl Default for Config { pub struct TorrentConfig { pub offers_per_request: usize, pub number_of_torrents: usize, - /// Pareto shape - /// - /// Fake peers choose torrents according to Pareto distribution. - pub torrent_selection_pareto_shape: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, /// Probability that a generated request is a announce request, as part @@ -57,6 +53,10 @@ pub struct TorrentConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, + /// Peers choose torrents according to this Gamma distribution shape + pub torrent_gamma_shape: f64, + /// Peers choose torrents according to this Gamma distribution scale + pub torrent_gamma_scale: f64, } impl Default for TorrentConfig { @@ -65,9 +65,10 @@ impl Default for TorrentConfig { offers_per_request: 10, number_of_torrents: 10_000, peer_seeder_probability: 0.25, - torrent_selection_pareto_shape: 2.0, weight_announce: 5, weight_scrape: 0, + torrent_gamma_shape: 0.2, + torrent_gamma_scale: 100.0, } } } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index c660267..67a4a00 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -6,7 +6,7 @@ use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_fo use aquatic_common::cpu_pinning::WorkerIndex; use glommio::LocalExecutorBuilder; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; mod common; mod config; @@ -44,12 +44,16 @@ fn run(config: Config) -> ::anyhow::Result<()> { info_hashes.push(InfoHash(rng.gen())); } - let pareto = Pareto::new(1.0, config.torrents.torrent_selection_pareto_shape).unwrap(); + let gamma = Gamma::new( + config.torrents.torrent_gamma_shape, + config.torrents.torrent_gamma_scale, + ) + .unwrap(); let state = LoadTestState { info_hashes: Arc::new(info_hashes), statistics: Arc::new(Statistics::default()), - pareto: Arc::new(pareto), + gamma: Arc::new(gamma), }; let tls_config = create_tls_config().unwrap(); diff --git a/aquatic_ws_load_test/src/utils.rs b/aquatic_ws_load_test/src/utils.rs index a7e568a..74646b1 100644 --- a/aquatic_ws_load_test/src/utils.rs +++ b/aquatic_ws_load_test/src/utils.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rand::distributions::WeightedIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; use crate::common::*; use crate::config::*; @@ -88,12 +88,12 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl #[inline] fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { - pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) + gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) } #[inline] -fn pareto_usize(rng: &mut impl Rng, pareto: &Arc>, max: usize) -> usize { - let p: f64 = pareto.sample(rng); +fn gamma_usize(rng: &mut impl Rng, gamma: &Arc>, max: usize) -> usize { + let p: f64 = gamma.sample(rng); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize