Use gamma distribution for all load testers

This commit is contained in:
Joakim Frostegård 2022-10-26 19:49:30 +02:00
parent 2b9db63984
commit db561a1101
12 changed files with 65 additions and 55 deletions

View file

@ -1,6 +1,6 @@
use std::sync::{atomic::AtomicUsize, Arc}; use std::sync::{atomic::AtomicUsize, Arc};
use rand_distr::Pareto; use rand_distr::Gamma;
pub use aquatic_http_protocol::common::*; pub use aquatic_http_protocol::common::*;
pub use aquatic_http_protocol::request::*; pub use aquatic_http_protocol::request::*;
@ -29,7 +29,7 @@ pub struct Statistics {
pub struct LoadTestState { pub struct LoadTestState {
pub info_hashes: Arc<Vec<InfoHash>>, pub info_hashes: Arc<Vec<InfoHash>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
pub pareto: Arc<Pareto<f64>>, pub gamma: Arc<Gamma<f64>>,
} }
#[derive(PartialEq, Eq, Clone, Copy)] #[derive(PartialEq, Eq, Clone, Copy)]

View file

@ -33,24 +33,6 @@ impl aquatic_common::cli::Config for Config {
} }
} }
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct TorrentConfig {
pub number_of_torrents: usize,
/// Pareto shape
///
/// Fake peers choose torrents according to Pareto distribution.
pub torrent_selection_pareto_shape: f64,
/// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64,
/// Probability that a generated request is a announce request, as part
/// of sum of the various weight arguments.
pub weight_announce: usize,
/// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments.
pub weight_scrape: usize,
}
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -68,14 +50,33 @@ impl Default for Config {
} }
} }
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct TorrentConfig {
pub number_of_torrents: usize,
/// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64,
/// Probability that a generated request is a announce request, as part
/// of sum of the various weight arguments.
pub weight_announce: usize,
/// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments.
pub weight_scrape: usize,
/// Peers choose torrents according to this Gamma distribution shape
pub torrent_gamma_shape: f64,
/// Peers choose torrents according to this Gamma distribution scale
pub torrent_gamma_scale: f64,
}
impl Default for TorrentConfig { impl Default for TorrentConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
number_of_torrents: 10_000, number_of_torrents: 10_000,
peer_seeder_probability: 0.25, peer_seeder_probability: 0.25,
torrent_selection_pareto_shape: 2.0,
weight_announce: 5, weight_announce: 5,
weight_scrape: 0, weight_scrape: 0,
torrent_gamma_shape: 0.2,
torrent_gamma_scale: 100.0,
} }
} }
} }

View file

@ -6,7 +6,7 @@ use ::glommio::LocalExecutorBuilder;
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::cpu_pinning::WorkerIndex;
use rand::prelude::*; use rand::prelude::*;
use rand_distr::Pareto; use rand_distr::Gamma;
mod common; mod common;
mod config; mod config;
@ -47,12 +47,16 @@ fn run(config: Config) -> ::anyhow::Result<()> {
info_hashes.push(InfoHash(rng.gen())); info_hashes.push(InfoHash(rng.gen()));
} }
let pareto = Pareto::new(1.0, config.torrents.torrent_selection_pareto_shape).unwrap(); let gamma = Gamma::new(
config.torrents.torrent_gamma_shape,
config.torrents.torrent_gamma_scale,
)
.unwrap();
let state = LoadTestState { let state = LoadTestState {
info_hashes: Arc::new(info_hashes), info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
pareto: Arc::new(pareto), gamma: Arc::new(gamma),
}; };
let tls_config = create_tls_config().unwrap(); let tls_config = create_tls_config().unwrap();

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use rand::distributions::WeightedIndex; use rand::distributions::WeightedIndex;
use rand::prelude::*; use rand::prelude::*;
use rand_distr::Pareto; use rand_distr::Gamma;
use crate::common::*; use crate::common::*;
use crate::config::*; use crate::config::*;
@ -69,12 +69,12 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl
#[inline] #[inline]
fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize {
pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1)
} }
#[inline] #[inline]
fn pareto_usize(rng: &mut impl Rng, pareto: &Arc<Pareto<f64>>, max: usize) -> usize { fn gamma_usize(rng: &mut impl Rng, gamma: &Arc<Gamma<f64>>, max: usize) -> usize {
let p: f64 = pareto.sample(rng); let p: f64 = gamma.sample(rng);
let p = (p.min(101.0f64) - 1.0) / 100.0; let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize (p * max as f64) as usize

View file

@ -58,7 +58,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
}; };
let pareto = Gamma::new( let gamma = Gamma::new(
config.requests.torrent_gamma_shape, config.requests.torrent_gamma_shape,
config.requests.torrent_gamma_scale, config.requests.torrent_gamma_scale,
) )
@ -92,7 +92,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::SocketWorker(i as usize), WorkerIndex::SocketWorker(i as usize),
); );
run_worker_thread(state, pareto, &config, addr) run_worker_thread(state, gamma, &config, addr)
})?; })?;
} }

View file

@ -3,8 +3,8 @@ use rand_distr::Gamma;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
pub fn pareto_usize(rng: &mut impl Rng, pareto: Gamma<f64>, max: usize) -> usize { pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma<f64>, max: usize) -> usize {
let p: f64 = rng.sample(pareto); let p: f64 = rng.sample(gamma);
let p = (p.min(101.0f64) - 1.0) / 100.0; let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize (p * max as f64) as usize

View file

@ -21,7 +21,7 @@ const MAX_PACKET_SIZE: usize = 8192;
pub fn run_worker_thread( pub fn run_worker_thread(
state: LoadTestState, state: LoadTestState,
pareto: Gamma<f64>, gamma: Gamma<f64>,
config: &Config, config: &Config,
addr: SocketAddr, addr: SocketAddr,
) { ) {
@ -80,7 +80,7 @@ pub fn run_worker_thread(
let opt_request = process_response( let opt_request = process_response(
&mut rng, &mut rng,
pareto, gamma,
&state.info_hashes, &state.info_hashes,
&config, &config,
&mut torrent_peers, &mut torrent_peers,

View file

@ -12,7 +12,7 @@ use crate::utils::*;
pub fn process_response( pub fn process_response(
rng: &mut impl Rng, rng: &mut impl Rng,
pareto: Gamma<f64>, gamma: Gamma<f64>,
info_hashes: &Arc<Vec<InfoHash>>, info_hashes: &Arc<Vec<InfoHash>>,
config: &Config, config: &Config,
torrent_peers: &mut TorrentPeerMap, torrent_peers: &mut TorrentPeerMap,
@ -32,7 +32,7 @@ pub fn process_response(
torrent_peer torrent_peer
}) })
.unwrap_or_else(|| { .unwrap_or_else(|| {
create_torrent_peer(config, rng, pareto, info_hashes, r.connection_id) create_torrent_peer(config, rng, gamma, info_hashes, r.connection_id)
}); });
let new_transaction_id = generate_transaction_id(rng); let new_transaction_id = generate_transaction_id(rng);
@ -190,7 +190,7 @@ fn create_scrape_request(
fn create_torrent_peer( fn create_torrent_peer(
config: &Config, config: &Config,
rng: &mut impl Rng, rng: &mut impl Rng,
pareto: Gamma<f64>, gamma: Gamma<f64>,
info_hashes: &Arc<Vec<InfoHash>>, info_hashes: &Arc<Vec<InfoHash>>,
connection_id: ConnectionId, connection_id: ConnectionId,
) -> TorrentPeer { ) -> TorrentPeer {
@ -199,10 +199,10 @@ fn create_torrent_peer(
let mut scrape_hash_indeces = Vec::new(); let mut scrape_hash_indeces = Vec::new();
for _ in 0..num_scape_hashes { for _ in 0..num_scape_hashes {
scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) scrape_hash_indeces.push(select_info_hash_index(config, rng, gamma))
} }
let info_hash_index = select_info_hash_index(config, rng, pareto); let info_hash_index = select_info_hash_index(config, rng, gamma);
TorrentPeer { TorrentPeer {
info_hash: info_hashes[info_hash_index], info_hash: info_hashes[info_hash_index],
@ -213,6 +213,6 @@ fn create_torrent_peer(
} }
} }
fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Gamma<f64>) -> usize { fn select_info_hash_index(config: &Config, rng: &mut impl Rng, gamma: Gamma<f64>) -> usize {
pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) gamma_usize(rng, gamma, config.requests.number_of_torrents - 1)
} }

View file

@ -1,6 +1,6 @@
use std::sync::{atomic::AtomicUsize, Arc}; use std::sync::{atomic::AtomicUsize, Arc};
use rand_distr::Pareto; use rand_distr::Gamma;
pub use aquatic_ws_protocol::*; pub use aquatic_ws_protocol::*;
@ -19,7 +19,7 @@ pub struct Statistics {
pub struct LoadTestState { pub struct LoadTestState {
pub info_hashes: Arc<Vec<InfoHash>>, pub info_hashes: Arc<Vec<InfoHash>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
pub pareto: Arc<Pareto<f64>>, pub gamma: Arc<Gamma<f64>>,
} }
#[derive(PartialEq, Eq, Clone, Copy)] #[derive(PartialEq, Eq, Clone, Copy)]

View file

@ -45,10 +45,6 @@ impl Default for Config {
pub struct TorrentConfig { pub struct TorrentConfig {
pub offers_per_request: usize, pub offers_per_request: usize,
pub number_of_torrents: usize, pub number_of_torrents: usize,
/// Pareto shape
///
/// Fake peers choose torrents according to Pareto distribution.
pub torrent_selection_pareto_shape: f64,
/// Probability that a generated peer is a seeder /// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64, pub peer_seeder_probability: f64,
/// Probability that a generated request is a announce request, as part /// Probability that a generated request is a announce request, as part
@ -57,6 +53,10 @@ pub struct TorrentConfig {
/// Probability that a generated request is a scrape request, as part /// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments. /// of sum of the various weight arguments.
pub weight_scrape: usize, pub weight_scrape: usize,
/// Peers choose torrents according to this Gamma distribution shape
pub torrent_gamma_shape: f64,
/// Peers choose torrents according to this Gamma distribution scale
pub torrent_gamma_scale: f64,
} }
impl Default for TorrentConfig { impl Default for TorrentConfig {
@ -65,9 +65,10 @@ impl Default for TorrentConfig {
offers_per_request: 10, offers_per_request: 10,
number_of_torrents: 10_000, number_of_torrents: 10_000,
peer_seeder_probability: 0.25, peer_seeder_probability: 0.25,
torrent_selection_pareto_shape: 2.0,
weight_announce: 5, weight_announce: 5,
weight_scrape: 0, weight_scrape: 0,
torrent_gamma_shape: 0.2,
torrent_gamma_scale: 100.0,
} }
} }
} }

View file

@ -6,7 +6,7 @@ use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_fo
use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::cpu_pinning::WorkerIndex;
use glommio::LocalExecutorBuilder; use glommio::LocalExecutorBuilder;
use rand::prelude::*; use rand::prelude::*;
use rand_distr::Pareto; use rand_distr::Gamma;
mod common; mod common;
mod config; mod config;
@ -44,12 +44,16 @@ fn run(config: Config) -> ::anyhow::Result<()> {
info_hashes.push(InfoHash(rng.gen())); info_hashes.push(InfoHash(rng.gen()));
} }
let pareto = Pareto::new(1.0, config.torrents.torrent_selection_pareto_shape).unwrap(); let gamma = Gamma::new(
config.torrents.torrent_gamma_shape,
config.torrents.torrent_gamma_scale,
)
.unwrap();
let state = LoadTestState { let state = LoadTestState {
info_hashes: Arc::new(info_hashes), info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
pareto: Arc::new(pareto), gamma: Arc::new(gamma),
}; };
let tls_config = create_tls_config().unwrap(); let tls_config = create_tls_config().unwrap();

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use rand::distributions::WeightedIndex; use rand::distributions::WeightedIndex;
use rand::prelude::*; use rand::prelude::*;
use rand_distr::Pareto; use rand_distr::Gamma;
use crate::common::*; use crate::common::*;
use crate::config::*; use crate::config::*;
@ -88,12 +88,12 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl
#[inline] #[inline]
fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize {
pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1)
} }
#[inline] #[inline]
fn pareto_usize(rng: &mut impl Rng, pareto: &Arc<Pareto<f64>>, max: usize) -> usize { fn gamma_usize(rng: &mut impl Rng, gamma: &Arc<Gamma<f64>>, max: usize) -> usize {
let p: f64 = pareto.sample(rng); let p: f64 = gamma.sample(rng);
let p = (p.min(101.0f64) - 1.0) / 100.0; let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize (p * max as f64) as usize