From 0c500ec4f0a895a88b7b6b9712400c61e4190b74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 28 Dec 2023 20:02:46 +0100 Subject: [PATCH] bencher: udp: support chihaya, improve error handling, update sets --- Cargo.lock | 7 ++ crates/bencher/Cargo.toml | 1 + crates/bencher/src/main.rs | 2 +- crates/bencher/src/protocols/udp.rs | 115 +++++++++++++++++++++++++++- crates/bencher/src/run.rs | 86 ++++++++++++++++----- crates/bencher/src/set.rs | 25 +++++- 6 files changed, 211 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bde4793..65a00cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,6 +136,7 @@ dependencies = [ "aquatic_udp_load_test", "clap 4.4.11", "indexmap 2.1.0", + "indoc", "itertools 0.12.0", "nonblock", "once_cell", @@ -1464,6 +1465,12 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "indoc" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e186cfbae8084e513daff4240b4797e342f988cecda4fb6c939150f96315fd8" + [[package]] name = "instant" version = "0.1.12" diff --git a/crates/bencher/Cargo.toml b/crates/bencher/Cargo.toml index 493d44b..bdcdaf6 100644 --- a/crates/bencher/Cargo.toml +++ b/crates/bencher/Cargo.toml @@ -25,6 +25,7 @@ aquatic_udp_load_test = { optional = true, workspace = true } anyhow = "1" clap = { version = "4", features = ["derive"] } indexmap = "2" +indoc = "2" itertools = "0.12" nonblock = "0.2" once_cell = "1" diff --git a/crates/bencher/src/main.rs b/crates/bencher/src/main.rs index 522f7bb..2ed2be8 100644 --- a/crates/bencher/src/main.rs +++ b/crates/bencher/src/main.rs @@ -25,7 +25,7 @@ struct Args { #[derive(Subcommand)] enum Command { - /// Benchmark UDP BitTorrent trackers aquatic_udp and opentracker + /// Benchmark UDP BitTorrent trackers aquatic_udp, opentracker and chihaya #[cfg(feature = "udp")] Udp(protocols::udp::UdpCommand), } diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 5531ee7..5ad3f60 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -7,6 +7,7 @@ use std::{ use clap::Parser; use indexmap::{indexmap, IndexMap}; +use indoc::writedoc; use tempfile::NamedTempFile; use crate::{ @@ -19,6 +20,7 @@ use crate::{ pub enum UdpTracker { Aquatic, OpenTracker, + Chihaya, } impl Tracker for UdpTracker { @@ -26,6 +28,7 @@ impl Tracker for UdpTracker { match self { Self::Aquatic => "aquatic_udp".into(), Self::OpenTracker => "opentracker".into(), + Self::Chihaya => "chihaya".into(), } } } @@ -41,6 +44,9 @@ pub struct UdpCommand { /// Path to opentracker binary #[arg(long, default_value = "opentracker")] opentracker: PathBuf, + /// Path to chihaya binary + #[arg(long, default_value = "chihaya")] + chihaya: PathBuf, } impl UdpCommand { @@ -72,12 +78,17 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(1, 1), + AquaticUdpRunner::new(2, 1), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(0), // Handle requests within event loop OpenTrackerUdpRunner::new(1), OpenTrackerUdpRunner::new(2), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(2)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), }, @@ -92,6 +103,10 @@ impl UdpCommand { OpenTrackerUdpRunner::new(2), OpenTrackerUdpRunner::new(4), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(4)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]), }, @@ -100,11 +115,16 @@ impl UdpCommand { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(2, 1), AquaticUdpRunner::new(3, 1), + AquaticUdpRunner::new(5, 1), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(3), OpenTrackerUdpRunner::new(6), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(6)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]), }, @@ -112,12 +132,17 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(3, 1), - AquaticUdpRunner::new(6, 1), + AquaticUdpRunner::new(4, 1), + AquaticUdpRunner::new(7, 1), ], UdpTracker::OpenTracker => vec![ OpenTrackerUdpRunner::new(4), OpenTrackerUdpRunner::new(8), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(8)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]), }, @@ -125,7 +150,8 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(5, 1), - AquaticUdpRunner::new(10, 1), + AquaticUdpRunner::new(6, 1), + AquaticUdpRunner::new(9, 1), AquaticUdpRunner::new(4, 2), AquaticUdpRunner::new(8, 2), ], @@ -133,6 +159,10 @@ impl UdpCommand { OpenTrackerUdpRunner::new(6), OpenTrackerUdpRunner::new(12), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(12)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8, 12]), }, @@ -140,6 +170,7 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(7, 1), + AquaticUdpRunner::new(8, 1), AquaticUdpRunner::new(14, 1), AquaticUdpRunner::new(6, 2), AquaticUdpRunner::new(12, 2), @@ -148,6 +179,10 @@ impl UdpCommand { OpenTrackerUdpRunner::new(8), OpenTrackerUdpRunner::new(16), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(16)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12]), }, @@ -155,6 +190,7 @@ impl UdpCommand { implementations: indexmap! { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(11, 1), + AquaticUdpRunner::new(12, 1), AquaticUdpRunner::new(22, 1), AquaticUdpRunner::new(10, 2), AquaticUdpRunner::new(20, 2), @@ -165,6 +201,10 @@ impl UdpCommand { OpenTrackerUdpRunner::new(12), OpenTrackerUdpRunner::new(24), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(24)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), }, @@ -173,8 +213,8 @@ impl UdpCommand { UdpTracker::Aquatic => vec![ AquaticUdpRunner::new(15, 1), AquaticUdpRunner::new(30, 1), - AquaticUdpRunner::new(15, 2), - AquaticUdpRunner::new(30, 2), + AquaticUdpRunner::new(14, 2), + AquaticUdpRunner::new(28, 2), AquaticUdpRunner::new(13, 3), AquaticUdpRunner::new(26, 3), AquaticUdpRunner::new(12, 4), @@ -184,6 +224,10 @@ impl UdpCommand { OpenTrackerUdpRunner::new(16), OpenTrackerUdpRunner::new(32), ], + UdpTracker::Chihaya => vec![ + ChihayaUdpRunner::new(None), + ChihayaUdpRunner::new(Some(32)), + ], }, load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]), }, @@ -290,6 +334,65 @@ impl ProcessRunner for OpenTrackerUdpRunner { } } +#[derive(Debug, Clone)] +struct ChihayaUdpRunner { + gomaxprocs: Option, +} + +impl ChihayaUdpRunner { + fn new(gomaxprocs: Option) -> Rc> { + Rc::new(Self { gomaxprocs }) + } +} + +impl ProcessRunner for ChihayaUdpRunner { + type Command = UdpCommand; + + fn run( + &self, + command: &Self::Command, + vcpus: &TaskSetCpuList, + tmp_file: &mut NamedTempFile, + ) -> anyhow::Result { + writedoc!( + tmp_file, + r#" + --- + chihaya: + metrics_addr: "127.0.0.1:0" + udp: + addr: "127.0.0.1:3000" + private_key: "abcdefghijklmnopqrst" + storage: + name: "memory" + "#, + )?; + + let mut c = Command::new("taskset"); + + let mut c = c + .arg("--cpu-list") + .arg(vcpus.as_cpu_list()) + .arg(&command.chihaya) + .arg("--config") + .arg(tmp_file.path()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(gomaxprocs) = self.gomaxprocs { + c = c.env("GOMAXPROCS", gomaxprocs.to_string()); + } + + Ok(c.spawn()?) + } + + fn keys(&self) -> IndexMap { + indexmap! { + "GOMAXPROCS".to_string() => format!("{:?}", self.gomaxprocs), + } + } +} + #[derive(Debug, Clone)] struct AquaticUdpLoadTestRunner { workers: usize, @@ -309,6 +412,10 @@ impl ProcessRunner for AquaticUdpLoadTestRunner { c.workers = self.workers as u8; c.duration = 60; + c.requests.weight_connect = 100; + c.requests.weight_announce = 100; + c.requests.weight_scrape = 1; + let c = toml::to_string_pretty(&c)?; tmp_file.write_all(c.as_bytes())?; diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 3727f78..d556e41 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -47,7 +47,7 @@ impl RunConfig { let mut tracker_config_file = NamedTempFile::new().unwrap(); let mut load_test_config_file = NamedTempFile::new().unwrap(); - let tracker = + let mut tracker = match self .tracker_runner .run(command, &self.tracker_vcpus, &mut tracker_config_file) @@ -69,11 +69,20 @@ impl RunConfig { Err(err) => { return Err(RunErrorResults::new(self) .set_error(err.into(), "run load test") - .set_tracker(tracker)) + .set_tracker_outputs(tracker)) } }; - ::std::thread::sleep(Duration::from_secs(59)); + for _ in 0..59 { + if let Ok(Some(status)) = tracker.0.try_wait() { + return Err(RunErrorResults::new(self) + .set_tracker_outputs(tracker) + .set_load_test_outputs(load_tester) + .set_error_context(&format!("tracker exited with {}", status))); + } + + ::std::thread::sleep(Duration::from_secs(1)); + } let tracker_process_stats_res = Command::new("ps") .arg("-p") @@ -90,13 +99,13 @@ impl RunConfig { Ok(_) => { return Err(RunErrorResults::new(self) .set_error_context("run ps") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_outputs(load_tester)); } Err(err) => { return Err(RunErrorResults::new(self) .set_error(err.into(), "run ps") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_outputs(load_tester)); } }; @@ -108,14 +117,14 @@ impl RunConfig { Ok(Some(_)) => { return Err(RunErrorResults::new(self) .set_error_context("wait for load tester") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_outputs(load_tester)) } Ok(None) => { if let Err(err) = load_tester.0.kill() { return Err(RunErrorResults::new(self) .set_error(err.into(), "kill load tester") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_outputs(load_tester)); } @@ -130,14 +139,14 @@ impl RunConfig { Err(err) => { return Err(RunErrorResults::new(self) .set_error(err.into(), "wait for load tester after kill") - .set_tracker(tracker)); + .set_tracker_outputs(tracker)); } } } Err(err) => { return Err(RunErrorResults::new(self) .set_error(err.into(), "wait for load tester") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_outputs(load_tester)) } }; @@ -147,7 +156,7 @@ impl RunConfig { } else { return Err(RunErrorResults::new(self) .set_error_context("couldn't read load tester stdout") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_stderr(load_test_stderr)); }; @@ -171,7 +180,7 @@ impl RunConfig { } else { return Err(RunErrorResults::new(self) .set_error_context("couldn't extract avg_responses") - .set_tracker(tracker) + .set_tracker_outputs(tracker) .set_load_test_stdout(Some(load_test_stdout)) .set_load_test_stderr(load_test_stderr)); } @@ -194,7 +203,6 @@ pub struct RunSuccessResults { #[derive(Debug)] pub struct RunErrorResults { pub run_config: RunConfig, - pub tracker_process_stats: Option, pub tracker_stdout: Option, pub tracker_stderr: Option, pub load_test_stdout: Option, @@ -207,7 +215,6 @@ impl RunErrorResults { fn new(run_config: RunConfig) -> Self { Self { run_config, - tracker_process_stats: Default::default(), tracker_stdout: Default::default(), tracker_stderr: Default::default(), load_test_stdout: Default::default(), @@ -217,7 +224,7 @@ impl RunErrorResults { } } - fn set_tracker(mut self, tracker: ChildWrapper) -> Self { + fn set_tracker_outputs(mut self, tracker: ChildWrapper) -> Self { let (stdout, stderr) = read_child_outputs(tracker); self.tracker_stdout = stdout; @@ -261,6 +268,49 @@ impl RunErrorResults { } } +impl std::fmt::Display for RunErrorResults { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(t) = self.error_context.as_ref() { + writeln!(f, "- {}", t)?; + } + if let Some(err) = self.error.as_ref() { + writeln!(f, "- {:#}", err)?; + } + + writeln!(f, "- tracker_runner: {:?}", self.run_config.tracker_runner)?; + writeln!( + f, + "- load_test_runner: {:?}", + self.run_config.load_test_runner + )?; + writeln!( + f, + "- tracker_vcpus: {}", + self.run_config.tracker_vcpus.as_cpu_list() + )?; + writeln!( + f, + "- load_test_vcpus: {}", + self.run_config.load_test_vcpus.as_cpu_list() + )?; + + if let Some(t) = self.tracker_stdout.as_ref() { + writeln!(f, "- tracker stdout:\n```\n{}\n```", t)?; + } + if let Some(t) = self.tracker_stderr.as_ref() { + writeln!(f, "- tracker stderr:\n```\n{}\n```", t)?; + } + if let Some(t) = self.load_test_stdout.as_ref() { + writeln!(f, "- load test stdout:\n```\n{}\n```", t)?; + } + if let Some(t) = self.load_test_stderr.as_ref() { + writeln!(f, "- load test stderr:\n```\n{}\n```", t)?; + } + + Ok(()) + } +} + #[derive(Debug, Clone, Copy)] pub struct ProcessStats { pub avg_cpu_utilization: f32, @@ -293,23 +343,23 @@ impl Drop for ChildWrapper { } fn read_child_outputs(mut child: ChildWrapper) -> (Option, Option) { - let stdout = child.0.stdout.take().map(|stdout| { + let stdout = child.0.stdout.take().and_then(|stdout| { let mut buf = String::new(); let mut reader = NonBlockingReader::from_fd(stdout).unwrap(); reader.read_available_to_string(&mut buf).unwrap(); - buf + (!buf.is_empty()).then_some(buf) }); - let stderr = child.0.stderr.take().map(|stderr| { + let stderr = child.0.stderr.take().and_then(|stderr| { let mut buf = String::new(); let mut reader = NonBlockingReader::from_fd(stderr).unwrap(); reader.read_available_to_string(&mut buf).unwrap(); - buf + (!buf.is_empty()).then_some(buf) }); (stdout, stderr) diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index dea4004..597b8fa 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -27,7 +27,28 @@ pub fn run_sets( I: Tracker, F: Fn(usize) -> Box>, { - println!("# Load test report"); + println!("# Benchmark report"); + + let total_num_runs = set_configs + .values() + .map(|set| { + set.implementations.values().map(Vec::len).sum::() * set.load_test_runs.len() + }) + .sum::(); + + let (estimated_hours, estimated_minutes) = { + let minutes = (total_num_runs * 67) / 60; + + (minutes / 60, minutes % 60) + }; + + println!(""); + println!("Total number of load test runs: {}", total_num_runs); + println!( + "Estimated duration: {} hours, {} minutes", + estimated_hours, estimated_minutes + ); + println!(""); let results = set_configs .into_iter() @@ -194,7 +215,7 @@ impl LoadTestRunResults { }) } Err(results) => { - println!("\nRun failed:\n{:#?}\n", results); + println!("\nRun failed:\n{:#}\n", results); LoadTestRunResults::Failure(LoadTestRunResultsFailure { // load_test_keys