Merge pull request #163 from greatest-ape/load-test-fixes

load tester: refactor, add udp sets, improve docs, add command options; update README
This commit is contained in:
Joakim Frostegård 2023-12-27 21:44:38 +01:00 committed by GitHub
commit 6f9b0fce7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 364 additions and 201 deletions

View file

@ -9,11 +9,11 @@ of sub-implementations for different protocols:
[aquatic_http]: ./crates/http [aquatic_http]: ./crates/http
[aquatic_ws]: ./crates/ws [aquatic_ws]: ./crates/ws
| Name | Protocol | OS requirements | | Name | Protocol | OS requirements |
|----------------|-------------------------------------------|-----------------| |----------------|-------------------------------------------|--------------------|
| [aquatic_udp] | BitTorrent over UDP | Unix-like | | [aquatic_udp] | BitTorrent over UDP | Unix-like |
| [aquatic_http] | BitTorrent over HTTP, optionally over TLS | Linux 5.8+ | | [aquatic_http] | BitTorrent over HTTP, optionally over TLS | Linux 5.8 or later |
| [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8+ | | [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8 or later |
Features at a glance: Features at a glance:
@ -27,6 +27,7 @@ Features at a glance:
Known users: Known users:
- [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically [serving ~100,000 requests per second](https://explodie.org/tracker-stats.html) - [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically [serving ~100,000 requests per second](https://explodie.org/tracker-stats.html)
- [tracker.webtorrent.dev](https://tracker.webtorrent.dev) (`wss://tracker.webtorrent.dev`)
## Performance of the UDP implementation ## Performance of the UDP implementation

View file

@ -1,6 +1,6 @@
[package] [package]
name = "aquatic_load_tester" name = "aquatic_load_tester"
description = "Load test runner for aquatic BitTorrent tracker" description = "Automated load testing of aquatic and other BitTorrent trackers (Linux only)"
keywords = ["peer-to-peer", "torrent", "bittorrent"] keywords = ["peer-to-peer", "torrent", "bittorrent"]
version.workspace = true version.workspace = true
authors.workspace = true authors.workspace = true

View file

@ -0,0 +1,3 @@
# aquatic_load_tester
Automated load testing of aquatic and other BitTorrent trackers. Linux only.

View file

@ -4,16 +4,28 @@ pub mod run;
pub mod set; pub mod set;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use common::CpuMode;
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about)] #[command(author, version, about)]
struct Args { struct Args {
/// How to choose which virtual CPUs to allow trackers and load test
/// executables on
#[arg(long, default_value_t = CpuMode::Split)]
cpu_mode: CpuMode,
/// Minimum number of tracker cpu cores to run load tests for
#[arg(long)]
min_cores: Option<usize>,
/// Maximum number of tracker cpu cores to run load tests for
#[arg(long)]
max_cores: Option<usize>,
#[command(subcommand)] #[command(subcommand)]
command: Command, command: Command,
} }
#[derive(Subcommand)] #[derive(Subcommand)]
enum Command { enum Command {
/// Benchmark UDP BitTorrent trackers aquatic_udp and opentracker
#[cfg(feature = "udp")] #[cfg(feature = "udp")]
Udp(protocols::udp::UdpCommand), Udp(protocols::udp::UdpCommand),
} }
@ -23,6 +35,8 @@ fn main() {
match args.command { match args.command {
#[cfg(feature = "udp")] #[cfg(feature = "udp")]
Command::Udp(command) => command.run().unwrap(), Command::Udp(command) => command
.run(args.cpu_mode, args.min_cores, args.max_cores)
.unwrap(),
} }
} }

View file

@ -12,16 +12,16 @@ use tempfile::NamedTempFile;
use crate::{ use crate::{
common::{simple_load_test_runs, CpuMode, TaskSetCpuList}, common::{simple_load_test_runs, CpuMode, TaskSetCpuList},
run::ProcessRunner, run::ProcessRunner,
set::{run_sets, Server, SetConfig}, set::{run_sets, SetConfig, Tracker},
}; };
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum UdpServer { pub enum UdpTracker {
Aquatic, Aquatic,
OpenTracker, OpenTracker,
} }
impl Server for UdpServer { impl Tracker for UdpTracker {
fn name(&self) -> String { fn name(&self) -> String {
match self { match self {
Self::Aquatic => "aquatic_udp".into(), Self::Aquatic => "aquatic_udp".into(),
@ -32,84 +32,184 @@ impl Server for UdpServer {
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
pub struct UdpCommand { pub struct UdpCommand {
#[arg(long, default_value_t = CpuMode::Split)] /// Path to aquatic_udp_load_test binary
cpu_mode: CpuMode,
#[arg(long, default_value = "./target/release-debug/aquatic_udp_load_test")] #[arg(long, default_value = "./target/release-debug/aquatic_udp_load_test")]
load_test: PathBuf, load_test: PathBuf,
/// Path to aquatic_udp binary
#[arg(long, default_value = "./target/release-debug/aquatic_udp")] #[arg(long, default_value = "./target/release-debug/aquatic_udp")]
aquatic: PathBuf, aquatic: PathBuf,
/// Path to opentracker binary
#[arg(long, default_value = "opentracker")] #[arg(long, default_value = "opentracker")]
opentracker: PathBuf, opentracker: PathBuf,
} }
impl UdpCommand { impl UdpCommand {
pub fn run(&self) -> anyhow::Result<()> { pub fn run(
run_sets(self, self.cpu_mode, self.sets(), |workers| { &self,
Box::new(AquaticUdpLoadTestProcessConfig { workers }) cpu_mode: CpuMode,
min_cores: Option<usize>,
max_cores: Option<usize>,
) -> anyhow::Result<()> {
let mut sets = self.sets(cpu_mode);
if let Some(min_cores) = min_cores {
sets = sets.into_iter().filter(|(k, _)| *k >= min_cores).collect();
}
if let Some(max_cores) = max_cores {
sets = sets.into_iter().filter(|(k, _)| *k <= max_cores).collect();
}
run_sets(self, cpu_mode, sets, |workers| {
Box::new(AquaticUdpLoadTestRunner { workers })
}); });
Ok(()) Ok(())
} }
fn sets(&self) -> IndexMap<usize, SetConfig<UdpCommand, UdpServer>> { fn sets(&self, cpu_mode: CpuMode) -> IndexMap<usize, SetConfig<UdpCommand, UdpTracker>> {
indexmap::indexmap! { indexmap::indexmap! {
1 => SetConfig { 1 => SetConfig {
implementations: indexmap! { implementations: indexmap! {
UdpServer::Aquatic => vec![ UdpTracker::Aquatic => vec![
Rc::new(AquaticUdpProcessConfig { AquaticUdpRunner::new(1, 1),
socket_workers: 1,
swarm_workers: 1,
}) as Rc<dyn ProcessRunner<Command = UdpCommand>>,
], ],
/* UdpTracker::OpenTracker => vec![
UdpServer::OpenTracker => vec![ OpenTrackerUdpRunner::new(0), // Handle requests within event loop
Rc::new(OpenTrackerUdpProcessConfig { OpenTrackerUdpRunner::new(1),
workers: 1, OpenTrackerUdpRunner::new(2),
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
Rc::new(OpenTrackerUdpProcessConfig {
workers: 2,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
], ],
*/
}, },
load_test_runs: simple_load_test_runs(self.cpu_mode, &[1, 2, 4]), load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]),
}, },
2 => SetConfig { 2 => SetConfig {
implementations: indexmap! { implementations: indexmap! {
UdpServer::Aquatic => vec![ UdpTracker::Aquatic => vec![
Rc::new(AquaticUdpProcessConfig { AquaticUdpRunner::new(1, 1),
socket_workers: 1, AquaticUdpRunner::new(2, 1),
swarm_workers: 1, AquaticUdpRunner::new(3, 1),
}) as Rc<dyn ProcessRunner<Command = UdpCommand>>,
Rc::new(AquaticUdpProcessConfig {
socket_workers: 2,
swarm_workers: 1,
}) as Rc<dyn ProcessRunner<Command = UdpCommand>>,
], ],
/* UdpTracker::OpenTracker => vec![
UdpServer::OpenTracker => vec![ OpenTrackerUdpRunner::new(2),
Rc::new(OpenTrackerUdpProcessConfig { OpenTrackerUdpRunner::new(4),
workers: 2,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
Rc::new(OpenTrackerUdpProcessConfig {
workers: 4,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
], ],
*/
}, },
load_test_runs: simple_load_test_runs(self.cpu_mode, &[1, 2, 4]), load_test_runs: simple_load_test_runs(cpu_mode, &[1, 2, 4, 6]),
},
3 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::new(2, 1),
AquaticUdpRunner::new(3, 1),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(3),
OpenTrackerUdpRunner::new(6),
],
},
load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]),
},
4 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::new(3, 1),
AquaticUdpRunner::new(6, 1),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(4),
OpenTrackerUdpRunner::new(8),
],
},
load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8]),
},
6 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::new(5, 1),
AquaticUdpRunner::new(10, 1),
AquaticUdpRunner::new(4, 2),
AquaticUdpRunner::new(8, 2),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(6),
OpenTrackerUdpRunner::new(12),
],
},
load_test_runs: simple_load_test_runs(cpu_mode, &[4, 6, 8, 12]),
},
8 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::new(7, 1),
AquaticUdpRunner::new(14, 1),
AquaticUdpRunner::new(6, 2),
AquaticUdpRunner::new(12, 2),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(8),
OpenTrackerUdpRunner::new(16),
],
},
load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12]),
},
12 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::new(11, 1),
AquaticUdpRunner::new(22, 1),
AquaticUdpRunner::new(10, 2),
AquaticUdpRunner::new(20, 2),
AquaticUdpRunner::new(9, 3),
AquaticUdpRunner::new(18, 3),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(12),
OpenTrackerUdpRunner::new(24),
],
},
load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]),
},
16 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::new(15, 1),
AquaticUdpRunner::new(30, 1),
AquaticUdpRunner::new(15, 2),
AquaticUdpRunner::new(30, 2),
AquaticUdpRunner::new(13, 3),
AquaticUdpRunner::new(26, 3),
AquaticUdpRunner::new(12, 4),
AquaticUdpRunner::new(24, 4),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(16),
OpenTrackerUdpRunner::new(32),
],
},
load_test_runs: simple_load_test_runs(cpu_mode, &[4, 8, 12, 16]),
}, },
} }
} }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AquaticUdpProcessConfig { struct AquaticUdpRunner {
socket_workers: usize, socket_workers: usize,
swarm_workers: usize, swarm_workers: usize,
} }
impl ProcessRunner for AquaticUdpProcessConfig { impl AquaticUdpRunner {
fn new(
socket_workers: usize,
swarm_workers: usize,
) -> Rc<dyn ProcessRunner<Command = UdpCommand>> {
Rc::new(Self {
socket_workers,
swarm_workers,
})
}
}
impl ProcessRunner for AquaticUdpRunner {
type Command = UdpCommand; type Command = UdpCommand;
fn run( fn run(
@ -138,12 +238,6 @@ impl ProcessRunner for AquaticUdpProcessConfig {
.spawn()?) .spawn()?)
} }
fn info(&self) -> String {
format!(
"socket workers: {}, swarm workers: {}",
self.socket_workers, self.swarm_workers
)
}
fn keys(&self) -> IndexMap<String, String> { fn keys(&self) -> IndexMap<String, String> {
indexmap! { indexmap! {
"socket workers".to_string() => self.socket_workers.to_string(), "socket workers".to_string() => self.socket_workers.to_string(),
@ -153,11 +247,17 @@ impl ProcessRunner for AquaticUdpProcessConfig {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct OpenTrackerUdpProcessConfig { struct OpenTrackerUdpRunner {
workers: usize, workers: usize,
} }
impl ProcessRunner for OpenTrackerUdpProcessConfig { impl OpenTrackerUdpRunner {
fn new(workers: usize) -> Rc<dyn ProcessRunner<Command = UdpCommand>> {
Rc::new(Self { workers })
}
}
impl ProcessRunner for OpenTrackerUdpRunner {
type Command = UdpCommand; type Command = UdpCommand;
fn run( fn run(
@ -166,7 +266,11 @@ impl ProcessRunner for OpenTrackerUdpProcessConfig {
vcpus: &TaskSetCpuList, vcpus: &TaskSetCpuList,
tmp_file: &mut NamedTempFile, tmp_file: &mut NamedTempFile,
) -> anyhow::Result<Child> { ) -> anyhow::Result<Child> {
writeln!(tmp_file, "{}", self.workers)?; // FIXME writeln!(
tmp_file,
"listen.udp.workers {}\nlisten.udp 0.0.0.0:3000",
self.workers
)?;
Ok(Command::new("taskset") Ok(Command::new("taskset")
.arg("--cpu-list") .arg("--cpu-list")
@ -179,10 +283,6 @@ impl ProcessRunner for OpenTrackerUdpProcessConfig {
.spawn()?) .spawn()?)
} }
fn info(&self) -> String {
format!("workers: {}", self.workers)
}
fn keys(&self) -> IndexMap<String, String> { fn keys(&self) -> IndexMap<String, String> {
indexmap! { indexmap! {
"workers".to_string() => self.workers.to_string(), "workers".to_string() => self.workers.to_string(),
@ -191,11 +291,11 @@ impl ProcessRunner for OpenTrackerUdpProcessConfig {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AquaticUdpLoadTestProcessConfig { struct AquaticUdpLoadTestRunner {
workers: usize, workers: usize,
} }
impl ProcessRunner for AquaticUdpLoadTestProcessConfig { impl ProcessRunner for AquaticUdpLoadTestRunner {
type Command = UdpCommand; type Command = UdpCommand;
fn run( fn run(
@ -224,10 +324,6 @@ impl ProcessRunner for AquaticUdpLoadTestProcessConfig {
.spawn()?) .spawn()?)
} }
fn info(&self) -> String {
format!("workers: {}", self.workers)
}
fn keys(&self) -> IndexMap<String, String> { fn keys(&self) -> IndexMap<String, String> {
indexmap! { indexmap! {
"workers".to_string() => self.workers.to_string(), "workers".to_string() => self.workers.to_string(),

View file

@ -6,6 +6,7 @@ use std::{
}; };
use indexmap::IndexMap; use indexmap::IndexMap;
use itertools::Itertools;
use nonblock::NonBlockingReader; use nonblock::NonBlockingReader;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use regex::Regex; use regex::Regex;
@ -22,30 +23,39 @@ pub trait ProcessRunner: ::std::fmt::Debug {
vcpus: &TaskSetCpuList, vcpus: &TaskSetCpuList,
tmp_file: &mut NamedTempFile, tmp_file: &mut NamedTempFile,
) -> anyhow::Result<Child>; ) -> anyhow::Result<Child>;
fn info(&self) -> String;
fn keys(&self) -> IndexMap<String, String>; fn keys(&self) -> IndexMap<String, String>;
fn info(&self) -> String {
self.keys()
.into_iter()
.map(|(k, v)| format!("{}: {}", k, v))
.join(", ")
}
} }
#[derive(Debug)] #[derive(Debug)]
pub struct RunConfig<C> { pub struct RunConfig<C> {
pub server_runner: Rc<dyn ProcessRunner<Command = C>>, pub tracker_runner: Rc<dyn ProcessRunner<Command = C>>,
pub server_vcpus: TaskSetCpuList, pub tracker_vcpus: TaskSetCpuList,
pub load_test_runner: Box<dyn ProcessRunner<Command = C>>, pub load_test_runner: Box<dyn ProcessRunner<Command = C>>,
pub load_test_vcpus: TaskSetCpuList, pub load_test_vcpus: TaskSetCpuList,
} }
impl<C> RunConfig<C> { impl<C> RunConfig<C> {
pub fn run(self, command: &C) -> Result<RunResults<C>, RunResults<C>> { pub fn run(self, command: &C) -> Result<RunSuccessResults, RunErrorResults<C>> {
let mut server_config_file = NamedTempFile::new().unwrap(); let mut tracker_config_file = NamedTempFile::new().unwrap();
let mut load_test_config_file = NamedTempFile::new().unwrap(); let mut load_test_config_file = NamedTempFile::new().unwrap();
let server = let tracker =
match self match self
.server_runner .tracker_runner
.run(command, &self.server_vcpus, &mut server_config_file) .run(command, &self.tracker_vcpus, &mut tracker_config_file)
{ {
Ok(handle) => ChildWrapper(handle), Ok(handle) => ChildWrapper(handle),
Err(err) => return Err(RunResults::new(self).set_error(err.into(), "run server")), Err(err) => {
return Err(RunErrorResults::new(self).set_error(err.into(), "run tracker"))
}
}; };
::std::thread::sleep(Duration::from_secs(1)); ::std::thread::sleep(Duration::from_secs(1));
@ -57,123 +67,149 @@ impl<C> RunConfig<C> {
) { ) {
Ok(handle) => ChildWrapper(handle), Ok(handle) => ChildWrapper(handle),
Err(err) => { Err(err) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error(err.into(), "run load test") .set_error(err.into(), "run load test")
.set_server(server)) .set_tracker(tracker))
} }
}; };
::std::thread::sleep(Duration::from_secs(59)); ::std::thread::sleep(Duration::from_secs(59));
let cpu_stats_res = Command::new("ps") let tracker_process_stats_res = Command::new("ps")
.arg("-p") .arg("-p")
.arg(server.0.id().to_string()) .arg(tracker.0.id().to_string())
.arg("-o") .arg("-o")
.arg("%cpu,rss") .arg("%cpu,rss")
.arg("--noheader") .arg("--noheader")
.output(); .output();
let server_process_stats = match cpu_stats_res { let tracker_process_stats = match tracker_process_stats_res {
Ok(output) if output.status.success() => { Ok(output) if output.status.success() => {
ProcessStats::from_str(&String::from_utf8_lossy(&output.stdout)).unwrap() ProcessStats::from_str(&String::from_utf8_lossy(&output.stdout)).unwrap()
} }
Ok(_) => { Ok(_) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error_context("run ps") .set_error_context("run ps")
.set_server(server) .set_tracker(tracker)
.set_load_test(load_tester)); .set_load_test_outputs(load_tester));
} }
Err(err) => { Err(err) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error(err.into(), "run ps") .set_error(err.into(), "run ps")
.set_server(server) .set_tracker(tracker)
.set_load_test(load_tester)); .set_load_test_outputs(load_tester));
} }
}; };
::std::thread::sleep(Duration::from_secs(5)); ::std::thread::sleep(Duration::from_secs(5));
let load_test_data = match load_tester.0.try_wait() { let (load_test_stdout, load_test_stderr) = match load_tester.0.try_wait() {
Ok(Some(status)) if status.success() => read_child_outputs(load_tester), Ok(Some(status)) if status.success() => read_child_outputs(load_tester),
Ok(Some(_)) => { Ok(Some(_)) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error_context("wait for load tester") .set_error_context("wait for load tester")
.set_server(server) .set_tracker(tracker)
.set_load_test(load_tester)) .set_load_test_outputs(load_tester))
} }
Ok(None) => { Ok(None) => {
if let Err(err) = load_tester.0.kill() { if let Err(err) = load_tester.0.kill() {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error(err.into(), "kill load tester") .set_error(err.into(), "kill load tester")
.set_server(server) .set_tracker(tracker)
.set_load_test(load_tester)); .set_load_test_outputs(load_tester));
} }
::std::thread::sleep(Duration::from_secs(1)); ::std::thread::sleep(Duration::from_secs(1));
match load_tester.0.try_wait() { match load_tester.0.try_wait() {
Ok(_) => { Ok(_) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error_context("load tester didn't finish in time") .set_error_context("load tester didn't finish in time")
.set_load_test(load_tester)) .set_load_test_outputs(load_tester))
} }
Err(err) => { Err(err) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error(err.into(), "wait for load tester after kill") .set_error(err.into(), "wait for load tester after kill")
.set_server(server)); .set_tracker(tracker));
} }
} }
} }
Err(err) => { Err(err) => {
return Err(RunResults::new(self) return Err(RunErrorResults::new(self)
.set_error(err.into(), "wait for load tester") .set_error(err.into(), "wait for load tester")
.set_server(server) .set_tracker(tracker)
.set_load_test(load_tester)) .set_load_test_outputs(load_tester))
} }
}; };
let mut results = RunResults::new(self); let load_test_stdout = if let Some(load_test_stdout) = load_test_stdout {
load_test_stdout
} else {
return Err(RunErrorResults::new(self)
.set_error_context("couldn't read load tester stdout")
.set_tracker(tracker)
.set_load_test_stderr(load_test_stderr));
};
results.server_process_stats = Some(server_process_stats); let avg_responses = {
results.load_test_stdout = load_test_data.0; static RE: Lazy<Regex> = Lazy::new(|| {
results.load_test_stderr = load_test_data.1; Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap()
});
let opt_avg_responses = RE
.captures_iter(&load_test_stdout)
.next()
.map(|c| {
let (_, [avg_responses]) = c.extract();
avg_responses.to_string()
})
.and_then(|v| v.parse::<f32>().ok());
if let Some(avg_responses) = opt_avg_responses {
avg_responses
} else {
return Err(RunErrorResults::new(self)
.set_error_context("couldn't extract avg_responses")
.set_tracker(tracker)
.set_load_test_stdout(Some(load_test_stdout))
.set_load_test_stderr(load_test_stderr));
}
};
let results = RunSuccessResults {
tracker_process_stats,
avg_responses,
};
Ok(results) Ok(results)
} }
} }
pub struct RunSuccessResults {
pub tracker_process_stats: ProcessStats,
pub avg_responses: f32,
}
#[derive(Debug)] #[derive(Debug)]
pub struct RunResults<C> { pub struct RunErrorResults<C> {
pub run_config: RunConfig<C>, pub run_config: RunConfig<C>,
pub server_process_stats: Option<ProcessStats>, pub tracker_process_stats: Option<ProcessStats>,
pub server_stdout: Option<String>, pub tracker_stdout: Option<String>,
pub server_stderr: Option<String>, pub tracker_stderr: Option<String>,
pub load_test_stdout: Option<String>, pub load_test_stdout: Option<String>,
pub load_test_stderr: Option<String>, pub load_test_stderr: Option<String>,
pub error: Option<anyhow::Error>, pub error: Option<anyhow::Error>,
pub error_context: Option<String>, pub error_context: Option<String>,
} }
impl<C> RunResults<C> { impl<C> RunErrorResults<C> {
pub fn avg_responses(&self) -> Option<String> {
static RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap());
self.load_test_stdout.as_ref().and_then(|stdout| {
RE.captures_iter(&stdout).next().map(|c| {
let (_, [avg_responses]) = c.extract();
avg_responses.to_string()
})
})
}
fn new(run_config: RunConfig<C>) -> Self { fn new(run_config: RunConfig<C>) -> Self {
Self { Self {
run_config, run_config,
server_process_stats: Default::default(), tracker_process_stats: Default::default(),
server_stdout: Default::default(), tracker_stdout: Default::default(),
server_stderr: Default::default(), tracker_stderr: Default::default(),
load_test_stdout: Default::default(), load_test_stdout: Default::default(),
load_test_stderr: Default::default(), load_test_stderr: Default::default(),
error: Default::default(), error: Default::default(),
@ -181,16 +217,16 @@ impl<C> RunResults<C> {
} }
} }
fn set_server(mut self, server: ChildWrapper) -> Self { fn set_tracker(mut self, tracker: ChildWrapper) -> Self {
let (stdout, stderr) = read_child_outputs(server); let (stdout, stderr) = read_child_outputs(tracker);
self.server_stdout = stdout; self.tracker_stdout = stdout;
self.server_stderr = stderr; self.tracker_stderr = stderr;
self self
} }
fn set_load_test(mut self, load_test: ChildWrapper) -> Self { fn set_load_test_outputs(mut self, load_test: ChildWrapper) -> Self {
let (stdout, stderr) = read_child_outputs(load_test); let (stdout, stderr) = read_child_outputs(load_test);
self.load_test_stdout = stdout; self.load_test_stdout = stdout;
@ -199,6 +235,18 @@ impl<C> RunResults<C> {
self self
} }
fn set_load_test_stdout(mut self, stdout: Option<String>) -> Self {
self.load_test_stdout = stdout;
self
}
fn set_load_test_stderr(mut self, stderr: Option<String>) -> Self {
self.load_test_stderr = stderr;
self
}
fn set_error(mut self, error: anyhow::Error, context: &str) -> Self { fn set_error(mut self, error: anyhow::Error, context: &str) -> Self {
self.error = Some(error); self.error = Some(error);
self.error_context = Some(context.to_string()); self.error_context = Some(context.to_string());
@ -244,12 +292,6 @@ impl Drop for ChildWrapper {
} }
} }
impl AsMut<Child> for ChildWrapper {
fn as_mut(&mut self) -> &mut Child {
&mut self.0
}
}
fn read_child_outputs(mut child: ChildWrapper) -> (Option<String>, Option<String>) { fn read_child_outputs(mut child: ChildWrapper) -> (Option<String>, Option<String>) {
let stdout = child.0.stdout.take().map(|stdout| { let stdout = child.0.stdout.take().map(|stdout| {
let mut buf = String::new(); let mut buf = String::new();

View file

@ -8,7 +8,7 @@ use crate::{
run::{ProcessRunner, ProcessStats, RunConfig}, run::{ProcessRunner, ProcessStats, RunConfig},
}; };
pub trait Server: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash { pub trait Tracker: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash {
fn name(&self) -> String; fn name(&self) -> String;
} }
@ -24,30 +24,30 @@ pub fn run_sets<C, F, I>(
load_test_gen: F, load_test_gen: F,
) where ) where
C: ::std::fmt::Debug, C: ::std::fmt::Debug,
I: Server, I: Tracker,
F: Fn(usize) -> Box<dyn ProcessRunner<Command = C>>, F: Fn(usize) -> Box<dyn ProcessRunner<Command = C>>,
{ {
println!("# Load test report"); println!("# Load test report");
let results = set_configs let results = set_configs
.into_iter() .into_iter()
.map(|(server_core_count, set_config)| { .map(|(tracker_core_count, set_config)| {
let server_vcpus = let tracker_vcpus =
TaskSetCpuList::new(cpu_mode, CpuDirection::Asc, server_core_count).unwrap(); TaskSetCpuList::new(cpu_mode, CpuDirection::Asc, tracker_core_count).unwrap();
println!( println!(
"## Tracker cores: {} (cpus: {})", "## Tracker cores: {} (cpus: {})",
server_core_count, tracker_core_count,
server_vcpus.as_cpu_list() tracker_vcpus.as_cpu_list()
); );
let server_results = set_config let tracker_results = set_config
.implementations .implementations
.into_iter() .into_iter()
.map(|(implementation, server_runs)| { .map(|(implementation, tracker_runs)| {
let server_run_results = server_runs let tracker_run_results = tracker_runs
.iter() .iter()
.map(|server_run| { .map(|tracker_run| {
let load_test_run_results = set_config let load_test_run_results = set_config
.load_test_runs .load_test_runs
.clone() .clone()
@ -57,16 +57,15 @@ pub fn run_sets<C, F, I>(
command, command,
&load_test_gen, &load_test_gen,
implementation, implementation,
&server_run, &tracker_run,
server_vcpus.clone(), tracker_vcpus.clone(),
workers, workers,
load_test_vcpus, load_test_vcpus,
) )
}) })
.collect(); .collect();
ServerConfigurationResults { TrackerConfigurationResults {
config_keys: server_run.keys(),
load_tests: load_test_run_results, load_tests: load_test_run_results,
} }
}) })
@ -74,14 +73,14 @@ pub fn run_sets<C, F, I>(
ImplementationResults { ImplementationResults {
name: implementation.name(), name: implementation.name(),
configurations: server_run_results, configurations: tracker_run_results,
} }
}) })
.collect(); .collect();
ServerCoreCountResults { TrackerCoreCountResults {
core_count: server_core_count, core_count: tracker_core_count,
implementations: server_results, implementations: tracker_results,
} }
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -89,14 +88,14 @@ pub fn run_sets<C, F, I>(
html_summary(&results); html_summary(&results);
} }
pub struct ServerCoreCountResults { pub struct TrackerCoreCountResults {
core_count: usize, core_count: usize,
implementations: Vec<ImplementationResults>, implementations: Vec<ImplementationResults>,
} }
pub struct ImplementationResults { pub struct ImplementationResults {
name: String, name: String,
configurations: Vec<ServerConfigurationResults>, configurations: Vec<TrackerConfigurationResults>,
} }
impl ImplementationResults { impl ImplementationResults {
@ -114,13 +113,11 @@ impl ImplementationResults {
} }
} }
pub struct ServerConfigurationResults { pub struct TrackerConfigurationResults {
config_keys: IndexMap<String, String>,
load_tests: Vec<LoadTestRunResults>, load_tests: Vec<LoadTestRunResults>,
// best_index: Option<usize>,
} }
impl ServerConfigurationResults { impl TrackerConfigurationResults {
fn best_result(&self) -> Option<LoadTestRunResultsSuccess> { fn best_result(&self) -> Option<LoadTestRunResultsSuccess> {
self.load_tests self.load_tests
.iter() .iter()
@ -148,57 +145,59 @@ impl LoadTestRunResults {
command: &C, command: &C,
load_test_gen: &F, load_test_gen: &F,
implementation: I, implementation: I,
server_process: &Rc<dyn ProcessRunner<Command = C>>, tracker_process: &Rc<dyn ProcessRunner<Command = C>>,
server_vcpus: TaskSetCpuList, tracker_vcpus: TaskSetCpuList,
workers: usize, workers: usize,
load_test_vcpus: TaskSetCpuList, load_test_vcpus: TaskSetCpuList,
) -> Self ) -> Self
where where
C: ::std::fmt::Debug, C: ::std::fmt::Debug,
I: Server, I: Tracker,
F: Fn(usize) -> Box<dyn ProcessRunner<Command = C>>, F: Fn(usize) -> Box<dyn ProcessRunner<Command = C>>,
{ {
println!( println!(
"### {} run ({}) (load test workers: {}, cpus: {})", "### {} run ({}) (load test workers: {}, cpus: {})",
implementation.name(), implementation.name(),
server_process.info(), tracker_process.info(),
workers, workers,
load_test_vcpus.as_cpu_list() load_test_vcpus.as_cpu_list()
); );
let load_test_runner = load_test_gen(workers); let load_test_runner = load_test_gen(workers);
let load_test_keys = load_test_runner.keys(); // let load_test_keys = load_test_runner.keys();
let run_config = RunConfig { let run_config = RunConfig {
server_runner: server_process.clone(), tracker_runner: tracker_process.clone(),
server_vcpus: server_vcpus.clone(), tracker_vcpus: tracker_vcpus.clone(),
load_test_runner, load_test_runner,
load_test_vcpus, load_test_vcpus,
}; };
match run_config.run(command) { match run_config.run(command) {
Ok(results) => { Ok(r) => {
let avg_responses = results.avg_responses().unwrap().parse::<f32>().unwrap(); println!("- Average responses per second: {}", r.avg_responses);
let server_process_stats = results.server_process_stats.unwrap();
println!("- Average responses per second: {}", avg_responses);
println!( println!(
"- Average server CPU utilization: {}%", "- Average tracker CPU utilization: {}%",
server_process_stats.avg_cpu_utilization, r.tracker_process_stats.avg_cpu_utilization,
);
println!(
"- Peak tracker RSS: {} kB",
r.tracker_process_stats.peak_rss_kb
); );
println!("- Peak server RSS: {} kB", server_process_stats.peak_rss_kb);
LoadTestRunResults::Success(LoadTestRunResultsSuccess { LoadTestRunResults::Success(LoadTestRunResultsSuccess {
config_keys: load_test_keys, average_responses: r.avg_responses,
average_responses: avg_responses, // tracker_keys: tracker_process.keys(),
server_process_stats, tracker_info: tracker_process.info(),
tracker_process_stats: r.tracker_process_stats,
// load_test_keys,
}) })
} }
Err(results) => { Err(results) => {
println!("\nRun failed:\n{:?}\n", results); println!("\nRun failed:\n{:#?}\n", results);
LoadTestRunResults::Failure(LoadTestRunResultsFailure { LoadTestRunResults::Failure(LoadTestRunResultsFailure {
config_keys: load_test_keys, // load_test_keys
}) })
} }
} }
@ -207,16 +206,18 @@ impl LoadTestRunResults {
#[derive(Clone)] #[derive(Clone)]
pub struct LoadTestRunResultsSuccess { pub struct LoadTestRunResultsSuccess {
config_keys: IndexMap<String, String>,
average_responses: f32, average_responses: f32,
server_process_stats: ProcessStats, // tracker_keys: IndexMap<String, String>,
tracker_info: String,
tracker_process_stats: ProcessStats,
// load_test_keys: IndexMap<String, String>,
} }
pub struct LoadTestRunResultsFailure { pub struct LoadTestRunResultsFailure {
config_keys: IndexMap<String, String>, // load_test_keys: IndexMap<String, String>,
} }
pub fn html_summary(results: &[ServerCoreCountResults]) { pub fn html_summary(results: &[TrackerCoreCountResults]) {
let mut all_implementation_names = IndexSet::new(); let mut all_implementation_names = IndexSet::new();
for core_count_results in results { for core_count_results in results {
@ -239,11 +240,7 @@ pub fn html_summary(results: &[ServerCoreCountResults]) {
let best_results_for_all_implementations = all_implementation_names let best_results_for_all_implementations = all_implementation_names
.iter() .iter()
.map(|name| { .map(|name| best_results.get(name).cloned().flatten())
best_results
.get(name)
.and_then(|r| r.as_ref().map(|r| r.average_responses))
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let data_row = format!( let data_row = format!(
@ -256,12 +253,18 @@ pub fn html_summary(results: &[ServerCoreCountResults]) {
core_count_results.core_count, core_count_results.core_count,
best_results_for_all_implementations best_results_for_all_implementations
.into_iter() .into_iter()
.map(|result| format!( .map(|result| {
"<td>{}</td>", if let Some(r) = result {
result format!(
.map(|r| r.to_string()) r#"<td><span title="{}, avg cpu utilization: {}%">{}</span></td>"#,
.unwrap_or_else(|| "-".to_string()) r.tracker_info,
)) r.tracker_process_stats.avg_cpu_utilization,
r.average_responses,
)
} else {
"<td>-</td>".to_string()
}
})
.join("\n"), .join("\n"),
); );

View file

@ -13,6 +13,10 @@ Features at a glance:
- Prometheus metrics - Prometheus metrics
- Automated CI testing of full file transfers - Automated CI testing of full file transfers
Known users:
- [tracker.webtorrent.dev](https://tracker.webtorrent.dev) (`wss://tracker.webtorrent.dev`)
## Performance ## Performance
![WebTorrent tracker throughput comparison](../../documents/aquatic-ws-load-test-illustration-2023-01-25.png) ![WebTorrent tracker throughput comparison](../../documents/aquatic-ws-load-test-illustration-2023-01-25.png)