From 7b6bb12c9ec1d416deb76956a962796a21736695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 27 Dec 2023 19:08:00 +0100 Subject: [PATCH 1/2] Update README --- README.md | 11 ++++++----- crates/ws/README.md | 4 ++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e0c2ea0..7acffab 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,11 @@ of sub-implementations for different protocols: [aquatic_http]: ./crates/http [aquatic_ws]: ./crates/ws -| Name | Protocol | OS requirements | -|----------------|-------------------------------------------|-----------------| -| [aquatic_udp] | BitTorrent over UDP | Unix-like | -| [aquatic_http] | BitTorrent over HTTP, optionally over TLS | Linux 5.8+ | -| [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8+ | +| Name | Protocol | OS requirements | +|----------------|-------------------------------------------|--------------------| +| [aquatic_udp] | BitTorrent over UDP | Unix-like | +| [aquatic_http] | BitTorrent over HTTP, optionally over TLS | Linux 5.8 or later | +| [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8 or later | Features at a glance: @@ -27,6 +27,7 @@ Features at a glance: Known users: - [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically [serving ~100,000 requests per second](https://explodie.org/tracker-stats.html) +- [tracker.webtorrent.dev](https://tracker.webtorrent.dev) (`wss://tracker.webtorrent.dev`) ## Performance of the UDP implementation diff --git a/crates/ws/README.md b/crates/ws/README.md index 3cf8238..19e61f2 100644 --- a/crates/ws/README.md +++ b/crates/ws/README.md @@ -13,6 +13,10 @@ Features at a glance: - Prometheus metrics - Automated CI testing of full file transfers +Known users: + +- [tracker.webtorrent.dev](https://tracker.webtorrent.dev) (`wss://tracker.webtorrent.dev`) + ## Performance ![WebTorrent tracker throughput comparison](../../documents/aquatic-ws-load-test-illustration-2023-01-25.png) From 0317053f806cb9a35b3e98bb7fa1beac6b931a49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 27 Dec 2023 21:22:58 +0100 Subject: [PATCH 2/2] load tester: refactor, add udp sets, improve docs, add command options --- crates/load_tester/Cargo.toml | 2 +- crates/load_tester/README.md | 3 + crates/load_tester/src/main.rs | 16 +- crates/load_tester/src/protocols/udp.rs | 224 +++++++++++++++++------- crates/load_tester/src/run.rs | 182 +++++++++++-------- crates/load_tester/src/set.rs | 123 ++++++------- 6 files changed, 354 insertions(+), 196 deletions(-) create mode 100644 crates/load_tester/README.md diff --git a/crates/load_tester/Cargo.toml b/crates/load_tester/Cargo.toml index 44c2ac5..10dead9 100644 --- a/crates/load_tester/Cargo.toml +++ b/crates/load_tester/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aquatic_load_tester" -description = "Load test runner for aquatic BitTorrent tracker" +description = "Automated load testing of aquatic and other BitTorrent trackers (Linux only)" keywords = ["peer-to-peer", "torrent", "bittorrent"] version.workspace = true authors.workspace = true diff --git a/crates/load_tester/README.md b/crates/load_tester/README.md new file mode 100644 index 0000000..edfeb62 --- /dev/null +++ b/crates/load_tester/README.md @@ -0,0 +1,3 @@ +# aquatic_load_tester + +Automated load testing of aquatic and other BitTorrent trackers. Linux only. \ No newline at end of file diff --git a/crates/load_tester/src/main.rs b/crates/load_tester/src/main.rs index 058b114..42f4e08 100644 --- a/crates/load_tester/src/main.rs +++ b/crates/load_tester/src/main.rs @@ -4,16 +4,28 @@ pub mod run; pub mod set; use clap::{Parser, Subcommand}; +use common::CpuMode; #[derive(Parser)] #[command(author, version, about)] struct Args { + /// How to choose which virtual CPUs to allow trackers and load test + /// executables on + #[arg(long, default_value_t = CpuMode::Split)] + cpu_mode: CpuMode, + /// Minimum number of tracker cpu cores to run load tests for + #[arg(long)] + min_cores: Option, + /// Maximum number of tracker cpu cores to run load tests for + #[arg(long)] + max_cores: Option, #[command(subcommand)] command: Command, } #[derive(Subcommand)] enum Command { + /// Benchmark UDP BitTorrent trackers aquatic_udp and opentracker #[cfg(feature = "udp")] Udp(protocols::udp::UdpCommand), } @@ -23,6 +35,8 @@ fn main() { match args.command { #[cfg(feature = "udp")] - Command::Udp(command) => command.run().unwrap(), + Command::Udp(command) => command + .run(args.cpu_mode, args.min_cores, args.max_cores) + .unwrap(), } } diff --git a/crates/load_tester/src/protocols/udp.rs b/crates/load_tester/src/protocols/udp.rs index 7c2cff6..5531ee7 100644 --- a/crates/load_tester/src/protocols/udp.rs +++ b/crates/load_tester/src/protocols/udp.rs @@ -12,16 +12,16 @@ use tempfile::NamedTempFile; use crate::{ common::{simple_load_test_runs, CpuMode, TaskSetCpuList}, run::ProcessRunner, - set::{run_sets, Server, SetConfig}, + set::{run_sets, SetConfig, Tracker}, }; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum UdpServer { +pub enum UdpTracker { Aquatic, OpenTracker, } -impl Server for UdpServer { +impl Tracker for UdpTracker { fn name(&self) -> String { match self { Self::Aquatic => "aquatic_udp".into(), @@ -32,84 +32,184 @@ impl Server for UdpServer { #[derive(Parser, Debug)] pub struct UdpCommand { - #[arg(long, default_value_t = CpuMode::Split)] - cpu_mode: CpuMode, + /// Path to aquatic_udp_load_test binary #[arg(long, default_value = "./target/release-debug/aquatic_udp_load_test")] load_test: PathBuf, + /// Path to aquatic_udp binary #[arg(long, default_value = "./target/release-debug/aquatic_udp")] aquatic: PathBuf, + /// Path to opentracker binary #[arg(long, default_value = "opentracker")] opentracker: PathBuf, } impl UdpCommand { - pub fn run(&self) -> anyhow::Result<()> { - run_sets(self, self.cpu_mode, self.sets(), |workers| { - Box::new(AquaticUdpLoadTestProcessConfig { workers }) + pub fn run( + &self, + cpu_mode: CpuMode, + min_cores: Option, + max_cores: Option, + ) -> anyhow::Result<()> { + let mut sets = self.sets(cpu_mode); + + if let Some(min_cores) = min_cores { + sets = sets.into_iter().filter(|(k, _)| *k >= min_cores).collect(); + } + if let Some(max_cores) = max_cores { + sets = sets.into_iter().filter(|(k, _)| *k <= max_cores).collect(); + } + + run_sets(self, cpu_mode, sets, |workers| { + Box::new(AquaticUdpLoadTestRunner { workers }) }); Ok(()) } - fn sets(&self) -> IndexMap> { + fn sets(&self, cpu_mode: CpuMode) -> IndexMap> { indexmap::indexmap! { 1 => SetConfig { implementations: indexmap! { - UdpServer::Aquatic => vec![ - Rc::new(AquaticUdpProcessConfig { - socket_workers: 1, - swarm_workers: 1, - }) as Rc>, + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(1, 1), ], - /* - UdpServer::OpenTracker => vec![ - Rc::new(OpenTrackerUdpProcessConfig { - workers: 1, - }) as Rc>, - Rc::new(OpenTrackerUdpProcessConfig { - workers: 2, - }) as Rc>, + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(0), // Handle requests within event loop + OpenTrackerUdpRunner::new(1), + OpenTrackerUdpRunner::new(2), ], - */ }, - load_test_runs: simple_load_test_runs(self.cpu_mode, &[1, 2, 4]), + load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), }, 2 => SetConfig { implementations: indexmap! { - UdpServer::Aquatic => vec![ - Rc::new(AquaticUdpProcessConfig { - socket_workers: 1, - swarm_workers: 1, - }) as Rc>, - Rc::new(AquaticUdpProcessConfig { - socket_workers: 2, - swarm_workers: 1, - }) as Rc>, + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(1, 1), + AquaticUdpRunner::new(2, 1), + AquaticUdpRunner::new(3, 1), ], - /* - UdpServer::OpenTracker => vec![ - Rc::new(OpenTrackerUdpProcessConfig { - workers: 2, - }) as Rc>, - Rc::new(OpenTrackerUdpProcessConfig { - workers: 4, - }) as Rc>, + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(2), + OpenTrackerUdpRunner::new(4), ], - */ }, - load_test_runs: simple_load_test_runs(self.cpu_mode, &[1, 2, 4]), + load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), + }, + 3 => SetConfig { + implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(2, 1), + AquaticUdpRunner::new(3, 1), + ], + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(3), + OpenTrackerUdpRunner::new(6), + ], + }, + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]), + }, + 4 => SetConfig { + implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(3, 1), + AquaticUdpRunner::new(6, 1), + ], + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(4), + OpenTrackerUdpRunner::new(8), + ], + }, + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]), + }, + 6 => SetConfig { + implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(5, 1), + AquaticUdpRunner::new(10, 1), + AquaticUdpRunner::new(4, 2), + AquaticUdpRunner::new(8, 2), + ], + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(6), + OpenTrackerUdpRunner::new(12), + ], + }, + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8, 12]), + }, + 8 => SetConfig { + implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(7, 1), + AquaticUdpRunner::new(14, 1), + AquaticUdpRunner::new(6, 2), + AquaticUdpRunner::new(12, 2), + ], + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(8), + OpenTrackerUdpRunner::new(16), + ], + }, + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12]), + }, + 12 => SetConfig { + implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(11, 1), + AquaticUdpRunner::new(22, 1), + AquaticUdpRunner::new(10, 2), + AquaticUdpRunner::new(20, 2), + AquaticUdpRunner::new(9, 3), + AquaticUdpRunner::new(18, 3), + ], + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(12), + OpenTrackerUdpRunner::new(24), + ], + }, + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), + }, + 16 => SetConfig { + implementations: indexmap! { + UdpTracker::Aquatic => vec![ + AquaticUdpRunner::new(15, 1), + AquaticUdpRunner::new(30, 1), + AquaticUdpRunner::new(15, 2), + AquaticUdpRunner::new(30, 2), + AquaticUdpRunner::new(13, 3), + AquaticUdpRunner::new(26, 3), + AquaticUdpRunner::new(12, 4), + AquaticUdpRunner::new(24, 4), + ], + UdpTracker::OpenTracker => vec![ + OpenTrackerUdpRunner::new(16), + OpenTrackerUdpRunner::new(32), + ], + }, + load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), }, } } } #[derive(Debug, Clone)] -pub struct AquaticUdpProcessConfig { +struct AquaticUdpRunner { socket_workers: usize, swarm_workers: usize, } -impl ProcessRunner for AquaticUdpProcessConfig { +impl AquaticUdpRunner { + fn new( + socket_workers: usize, + swarm_workers: usize, + ) -> Rc> { + Rc::new(Self { + socket_workers, + swarm_workers, + }) + } +} + +impl ProcessRunner for AquaticUdpRunner { type Command = UdpCommand; fn run( @@ -138,12 +238,6 @@ impl ProcessRunner for AquaticUdpProcessConfig { .spawn()?) } - fn info(&self) -> String { - format!( - "socket workers: {}, swarm workers: {}", - self.socket_workers, self.swarm_workers - ) - } fn keys(&self) -> IndexMap { indexmap! { "socket workers".to_string() => self.socket_workers.to_string(), @@ -153,11 +247,17 @@ impl ProcessRunner for AquaticUdpProcessConfig { } #[derive(Debug, Clone)] -pub struct OpenTrackerUdpProcessConfig { +struct OpenTrackerUdpRunner { workers: usize, } -impl ProcessRunner for OpenTrackerUdpProcessConfig { +impl OpenTrackerUdpRunner { + fn new(workers: usize) -> Rc> { + Rc::new(Self { workers }) + } +} + +impl ProcessRunner for OpenTrackerUdpRunner { type Command = UdpCommand; fn run( @@ -166,7 +266,11 @@ impl ProcessRunner for OpenTrackerUdpProcessConfig { vcpus: &TaskSetCpuList, tmp_file: &mut NamedTempFile, ) -> anyhow::Result { - writeln!(tmp_file, "{}", self.workers)?; // FIXME + writeln!( + tmp_file, + "listen.udp.workers {}\nlisten.udp 0.0.0.0:3000", + self.workers + )?; Ok(Command::new("taskset") .arg("--cpu-list") @@ -179,10 +283,6 @@ impl ProcessRunner for OpenTrackerUdpProcessConfig { .spawn()?) } - fn info(&self) -> String { - format!("workers: {}", self.workers) - } - fn keys(&self) -> IndexMap { indexmap! { "workers".to_string() => self.workers.to_string(), @@ -191,11 +291,11 @@ impl ProcessRunner for OpenTrackerUdpProcessConfig { } #[derive(Debug, Clone)] -pub struct AquaticUdpLoadTestProcessConfig { +struct AquaticUdpLoadTestRunner { workers: usize, } -impl ProcessRunner for AquaticUdpLoadTestProcessConfig { +impl ProcessRunner for AquaticUdpLoadTestRunner { type Command = UdpCommand; fn run( @@ -224,10 +324,6 @@ impl ProcessRunner for AquaticUdpLoadTestProcessConfig { .spawn()?) } - fn info(&self) -> String { - format!("workers: {}", self.workers) - } - fn keys(&self) -> IndexMap { indexmap! { "workers".to_string() => self.workers.to_string(), diff --git a/crates/load_tester/src/run.rs b/crates/load_tester/src/run.rs index 2278f5a..3727f78 100644 --- a/crates/load_tester/src/run.rs +++ b/crates/load_tester/src/run.rs @@ -6,6 +6,7 @@ use std::{ }; use indexmap::IndexMap; +use itertools::Itertools; use nonblock::NonBlockingReader; use once_cell::sync::Lazy; use regex::Regex; @@ -22,30 +23,39 @@ pub trait ProcessRunner: ::std::fmt::Debug { vcpus: &TaskSetCpuList, tmp_file: &mut NamedTempFile, ) -> anyhow::Result; - fn info(&self) -> String; + fn keys(&self) -> IndexMap; + + fn info(&self) -> String { + self.keys() + .into_iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .join(", ") + } } #[derive(Debug)] pub struct RunConfig { - pub server_runner: Rc>, - pub server_vcpus: TaskSetCpuList, + pub tracker_runner: Rc>, + pub tracker_vcpus: TaskSetCpuList, pub load_test_runner: Box>, pub load_test_vcpus: TaskSetCpuList, } impl RunConfig { - pub fn run(self, command: &C) -> Result, RunResults> { - let mut server_config_file = NamedTempFile::new().unwrap(); + pub fn run(self, command: &C) -> Result> { + let mut tracker_config_file = NamedTempFile::new().unwrap(); let mut load_test_config_file = NamedTempFile::new().unwrap(); - let server = + let tracker = match self - .server_runner - .run(command, &self.server_vcpus, &mut server_config_file) + .tracker_runner + .run(command, &self.tracker_vcpus, &mut tracker_config_file) { Ok(handle) => ChildWrapper(handle), - Err(err) => return Err(RunResults::new(self).set_error(err.into(), "run server")), + Err(err) => { + return Err(RunErrorResults::new(self).set_error(err.into(), "run tracker")) + } }; ::std::thread::sleep(Duration::from_secs(1)); @@ -57,123 +67,149 @@ impl RunConfig { ) { Ok(handle) => ChildWrapper(handle), Err(err) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error(err.into(), "run load test") - .set_server(server)) + .set_tracker(tracker)) } }; ::std::thread::sleep(Duration::from_secs(59)); - let cpu_stats_res = Command::new("ps") + let tracker_process_stats_res = Command::new("ps") .arg("-p") - .arg(server.0.id().to_string()) + .arg(tracker.0.id().to_string()) .arg("-o") .arg("%cpu,rss") .arg("--noheader") .output(); - let server_process_stats = match cpu_stats_res { + let tracker_process_stats = match tracker_process_stats_res { Ok(output) if output.status.success() => { ProcessStats::from_str(&String::from_utf8_lossy(&output.stdout)).unwrap() } Ok(_) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error_context("run ps") - .set_server(server) - .set_load_test(load_tester)); + .set_tracker(tracker) + .set_load_test_outputs(load_tester)); } Err(err) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error(err.into(), "run ps") - .set_server(server) - .set_load_test(load_tester)); + .set_tracker(tracker) + .set_load_test_outputs(load_tester)); } }; ::std::thread::sleep(Duration::from_secs(5)); - let load_test_data = match load_tester.0.try_wait() { + let (load_test_stdout, load_test_stderr) = match load_tester.0.try_wait() { Ok(Some(status)) if status.success() => read_child_outputs(load_tester), Ok(Some(_)) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error_context("wait for load tester") - .set_server(server) - .set_load_test(load_tester)) + .set_tracker(tracker) + .set_load_test_outputs(load_tester)) } Ok(None) => { if let Err(err) = load_tester.0.kill() { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error(err.into(), "kill load tester") - .set_server(server) - .set_load_test(load_tester)); + .set_tracker(tracker) + .set_load_test_outputs(load_tester)); } ::std::thread::sleep(Duration::from_secs(1)); match load_tester.0.try_wait() { Ok(_) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error_context("load tester didn't finish in time") - .set_load_test(load_tester)) + .set_load_test_outputs(load_tester)) } Err(err) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error(err.into(), "wait for load tester after kill") - .set_server(server)); + .set_tracker(tracker)); } } } Err(err) => { - return Err(RunResults::new(self) + return Err(RunErrorResults::new(self) .set_error(err.into(), "wait for load tester") - .set_server(server) - .set_load_test(load_tester)) + .set_tracker(tracker) + .set_load_test_outputs(load_tester)) } }; - let mut results = RunResults::new(self); + let load_test_stdout = if let Some(load_test_stdout) = load_test_stdout { + load_test_stdout + } else { + return Err(RunErrorResults::new(self) + .set_error_context("couldn't read load tester stdout") + .set_tracker(tracker) + .set_load_test_stderr(load_test_stderr)); + }; - results.server_process_stats = Some(server_process_stats); - results.load_test_stdout = load_test_data.0; - results.load_test_stderr = load_test_data.1; + let avg_responses = { + static RE: Lazy = Lazy::new(|| { + Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap() + }); + + let opt_avg_responses = RE + .captures_iter(&load_test_stdout) + .next() + .map(|c| { + let (_, [avg_responses]) = c.extract(); + + avg_responses.to_string() + }) + .and_then(|v| v.parse::().ok()); + + if let Some(avg_responses) = opt_avg_responses { + avg_responses + } else { + return Err(RunErrorResults::new(self) + .set_error_context("couldn't extract avg_responses") + .set_tracker(tracker) + .set_load_test_stdout(Some(load_test_stdout)) + .set_load_test_stderr(load_test_stderr)); + } + }; + + let results = RunSuccessResults { + tracker_process_stats, + avg_responses, + }; Ok(results) } } +pub struct RunSuccessResults { + pub tracker_process_stats: ProcessStats, + pub avg_responses: f32, +} + #[derive(Debug)] -pub struct RunResults { +pub struct RunErrorResults { pub run_config: RunConfig, - pub server_process_stats: Option, - pub server_stdout: Option, - pub server_stderr: Option, + pub tracker_process_stats: Option, + pub tracker_stdout: Option, + pub tracker_stderr: Option, pub load_test_stdout: Option, pub load_test_stderr: Option, pub error: Option, pub error_context: Option, } -impl RunResults { - pub fn avg_responses(&self) -> Option { - static RE: Lazy = - Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap()); - - self.load_test_stdout.as_ref().and_then(|stdout| { - RE.captures_iter(&stdout).next().map(|c| { - let (_, [avg_responses]) = c.extract(); - - avg_responses.to_string() - }) - }) - } - +impl RunErrorResults { fn new(run_config: RunConfig) -> Self { Self { run_config, - server_process_stats: Default::default(), - server_stdout: Default::default(), - server_stderr: Default::default(), + tracker_process_stats: Default::default(), + tracker_stdout: Default::default(), + tracker_stderr: Default::default(), load_test_stdout: Default::default(), load_test_stderr: Default::default(), error: Default::default(), @@ -181,16 +217,16 @@ impl RunResults { } } - fn set_server(mut self, server: ChildWrapper) -> Self { - let (stdout, stderr) = read_child_outputs(server); + fn set_tracker(mut self, tracker: ChildWrapper) -> Self { + let (stdout, stderr) = read_child_outputs(tracker); - self.server_stdout = stdout; - self.server_stderr = stderr; + self.tracker_stdout = stdout; + self.tracker_stderr = stderr; self } - fn set_load_test(mut self, load_test: ChildWrapper) -> Self { + fn set_load_test_outputs(mut self, load_test: ChildWrapper) -> Self { let (stdout, stderr) = read_child_outputs(load_test); self.load_test_stdout = stdout; @@ -199,6 +235,18 @@ impl RunResults { self } + fn set_load_test_stdout(mut self, stdout: Option) -> Self { + self.load_test_stdout = stdout; + + self + } + + fn set_load_test_stderr(mut self, stderr: Option) -> Self { + self.load_test_stderr = stderr; + + self + } + fn set_error(mut self, error: anyhow::Error, context: &str) -> Self { self.error = Some(error); self.error_context = Some(context.to_string()); @@ -244,12 +292,6 @@ impl Drop for ChildWrapper { } } -impl AsMut for ChildWrapper { - fn as_mut(&mut self) -> &mut Child { - &mut self.0 - } -} - fn read_child_outputs(mut child: ChildWrapper) -> (Option, Option) { let stdout = child.0.stdout.take().map(|stdout| { let mut buf = String::new(); diff --git a/crates/load_tester/src/set.rs b/crates/load_tester/src/set.rs index bfeb05d..dea4004 100644 --- a/crates/load_tester/src/set.rs +++ b/crates/load_tester/src/set.rs @@ -8,7 +8,7 @@ use crate::{ run::{ProcessRunner, ProcessStats, RunConfig}, }; -pub trait Server: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash { +pub trait Tracker: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash { fn name(&self) -> String; } @@ -24,30 +24,30 @@ pub fn run_sets( load_test_gen: F, ) where C: ::std::fmt::Debug, - I: Server, + I: Tracker, F: Fn(usize) -> Box>, { println!("# Load test report"); let results = set_configs .into_iter() - .map(|(server_core_count, set_config)| { - let server_vcpus = - TaskSetCpuList::new(cpu_mode, CpuDirection::Asc, server_core_count).unwrap(); + .map(|(tracker_core_count, set_config)| { + let tracker_vcpus = + TaskSetCpuList::new(cpu_mode, CpuDirection::Asc, tracker_core_count).unwrap(); println!( "## Tracker cores: {} (cpus: {})", - server_core_count, - server_vcpus.as_cpu_list() + tracker_core_count, + tracker_vcpus.as_cpu_list() ); - let server_results = set_config + let tracker_results = set_config .implementations .into_iter() - .map(|(implementation, server_runs)| { - let server_run_results = server_runs + .map(|(implementation, tracker_runs)| { + let tracker_run_results = tracker_runs .iter() - .map(|server_run| { + .map(|tracker_run| { let load_test_run_results = set_config .load_test_runs .clone() @@ -57,16 +57,15 @@ pub fn run_sets( command, &load_test_gen, implementation, - &server_run, - server_vcpus.clone(), + &tracker_run, + tracker_vcpus.clone(), workers, load_test_vcpus, ) }) .collect(); - ServerConfigurationResults { - config_keys: server_run.keys(), + TrackerConfigurationResults { load_tests: load_test_run_results, } }) @@ -74,14 +73,14 @@ pub fn run_sets( ImplementationResults { name: implementation.name(), - configurations: server_run_results, + configurations: tracker_run_results, } }) .collect(); - ServerCoreCountResults { - core_count: server_core_count, - implementations: server_results, + TrackerCoreCountResults { + core_count: tracker_core_count, + implementations: tracker_results, } }) .collect::>(); @@ -89,14 +88,14 @@ pub fn run_sets( html_summary(&results); } -pub struct ServerCoreCountResults { +pub struct TrackerCoreCountResults { core_count: usize, implementations: Vec, } pub struct ImplementationResults { name: String, - configurations: Vec, + configurations: Vec, } impl ImplementationResults { @@ -114,13 +113,11 @@ impl ImplementationResults { } } -pub struct ServerConfigurationResults { - config_keys: IndexMap, +pub struct TrackerConfigurationResults { load_tests: Vec, - // best_index: Option, } -impl ServerConfigurationResults { +impl TrackerConfigurationResults { fn best_result(&self) -> Option { self.load_tests .iter() @@ -148,57 +145,59 @@ impl LoadTestRunResults { command: &C, load_test_gen: &F, implementation: I, - server_process: &Rc>, - server_vcpus: TaskSetCpuList, + tracker_process: &Rc>, + tracker_vcpus: TaskSetCpuList, workers: usize, load_test_vcpus: TaskSetCpuList, ) -> Self where C: ::std::fmt::Debug, - I: Server, + I: Tracker, F: Fn(usize) -> Box>, { println!( "### {} run ({}) (load test workers: {}, cpus: {})", implementation.name(), - server_process.info(), + tracker_process.info(), workers, load_test_vcpus.as_cpu_list() ); let load_test_runner = load_test_gen(workers); - let load_test_keys = load_test_runner.keys(); + // let load_test_keys = load_test_runner.keys(); let run_config = RunConfig { - server_runner: server_process.clone(), - server_vcpus: server_vcpus.clone(), + tracker_runner: tracker_process.clone(), + tracker_vcpus: tracker_vcpus.clone(), load_test_runner, load_test_vcpus, }; match run_config.run(command) { - Ok(results) => { - let avg_responses = results.avg_responses().unwrap().parse::().unwrap(); - let server_process_stats = results.server_process_stats.unwrap(); - - println!("- Average responses per second: {}", avg_responses); + Ok(r) => { + println!("- Average responses per second: {}", r.avg_responses); println!( - "- Average server CPU utilization: {}%", - server_process_stats.avg_cpu_utilization, + "- Average tracker CPU utilization: {}%", + r.tracker_process_stats.avg_cpu_utilization, + ); + println!( + "- Peak tracker RSS: {} kB", + r.tracker_process_stats.peak_rss_kb ); - println!("- Peak server RSS: {} kB", server_process_stats.peak_rss_kb); LoadTestRunResults::Success(LoadTestRunResultsSuccess { - config_keys: load_test_keys, - average_responses: avg_responses, - server_process_stats, + average_responses: r.avg_responses, + // tracker_keys: tracker_process.keys(), + tracker_info: tracker_process.info(), + tracker_process_stats: r.tracker_process_stats, + // load_test_keys, }) } Err(results) => { - println!("\nRun failed:\n{:?}\n", results); + println!("\nRun failed:\n{:#?}\n", results); LoadTestRunResults::Failure(LoadTestRunResultsFailure { - config_keys: load_test_keys, + // load_test_keys }) } } @@ -207,16 +206,18 @@ impl LoadTestRunResults { #[derive(Clone)] pub struct LoadTestRunResultsSuccess { - config_keys: IndexMap, average_responses: f32, - server_process_stats: ProcessStats, + // tracker_keys: IndexMap, + tracker_info: String, + tracker_process_stats: ProcessStats, + // load_test_keys: IndexMap, } pub struct LoadTestRunResultsFailure { - config_keys: IndexMap, + // load_test_keys: IndexMap, } -pub fn html_summary(results: &[ServerCoreCountResults]) { +pub fn html_summary(results: &[TrackerCoreCountResults]) { let mut all_implementation_names = IndexSet::new(); for core_count_results in results { @@ -239,11 +240,7 @@ pub fn html_summary(results: &[ServerCoreCountResults]) { let best_results_for_all_implementations = all_implementation_names .iter() - .map(|name| { - best_results - .get(name) - .and_then(|r| r.as_ref().map(|r| r.average_responses)) - }) + .map(|name| best_results.get(name).cloned().flatten()) .collect::>(); let data_row = format!( @@ -256,12 +253,18 @@ pub fn html_summary(results: &[ServerCoreCountResults]) { core_count_results.core_count, best_results_for_all_implementations .into_iter() - .map(|result| format!( - "{}", - result - .map(|r| r.to_string()) - .unwrap_or_else(|| "-".to_string()) - )) + .map(|result| { + if let Some(r) = result { + format!( + r#"{}"#, + r.tracker_info, + r.tracker_process_stats.avg_cpu_utilization, + r.average_responses, + ) + } else { + "-".to_string() + } + }) .join("\n"), );