From b653e3e3ff1b83ae6351b949dd7334b3cb65bce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 12:00:36 +0100 Subject: [PATCH] ws load test: add cpu pinning and log crate logging --- Cargo.lock | 1 + aquatic_ws_load_test/Cargo.toml | 1 + aquatic_ws_load_test/src/config.rs | 14 ++++++++++++-- aquatic_ws_load_test/src/main.rs | 19 +++++++++++++++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48a85da..f968137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_ws_protocol", "async-tungstenite", "futures", diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index 581f1de..67a02f2 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -13,6 +13,7 @@ name = "aquatic_ws_load_test" anyhow = "1" async-tungstenite = "0.15" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" futures = "0.3" futures-rustls = "0.22" diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 10d4bdc..3bef7bc 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -1,18 +1,26 @@ use std::net::SocketAddr; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub server_address: SocketAddr, - pub num_workers: u8, + pub log_level: LogLevel, + pub num_workers: usize, pub num_connections: usize, pub duration: usize, pub torrents: TorrentConfig, + pub cpu_pinning: CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -37,10 +45,12 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_workers: 1, num_connections: 16, duration: 0, torrents: TorrentConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 54f9208..1a8c342 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Pareto; @@ -33,6 +34,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -51,12 +58,20 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for _ in 0..config.num_workers { + for i in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); - LocalExecutorBuilder::default() + let mut builder = LocalExecutorBuilder::default(); + + if config.cpu_pinning.active { + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), + ); + } + + builder .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); })