Add aquatic_load_tester: multi-run multi-implementation load tests

- Work in progress
- Only UDP is currently implemented so far
- Also includes some changes to other crates, notably deriving
  serde Serialize for Config structs and making udp_load_test
  a lib and a binary
This commit is contained in:
Joakim Frostegård 2023-12-17 21:57:11 +01:00
parent c7997d5aed
commit afc3deb656
18 changed files with 1666 additions and 312 deletions

View file

@ -27,7 +27,7 @@ impl AccessListMode {
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AccessListConfig {
pub mode: AccessListMode,

View file

@ -50,7 +50,7 @@ pub mod mod_name {
use super::*;
/// Experimental cpu pinning
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
pub struct struct_name {
pub active: bool,
pub direction: CpuPinningDirection,

View file

@ -5,11 +5,11 @@ use std::{
use anyhow::Context;
use privdrop::PrivDrop;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct PrivilegeConfig {
/// Chroot and switch group and user after binding to sockets

View file

@ -0,0 +1,35 @@
[package]
name = "aquatic_load_tester"
description = "Load test runner for aquatic BitTorrent tracker"
keywords = ["peer-to-peer", "torrent", "bittorrent"]
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
readme.workspace = true
[[bin]]
name = "aquatic_load_tester"
[features]
default = ["udp"]
udp = ["aquatic_udp", "aquatic_udp_load_test"]
[dependencies]
aquatic_udp = { optional = true, workspace = true }
aquatic_udp_load_test = { optional = true, workspace = true }
anyhow = "1"
clap = { version = "4", features = ["derive"] }
indexmap = "2"
itertools = "0.12"
nonblock = "0.2"
once_cell = "1"
regex = "1"
serde = "1"
tempfile = "3"
toml = "0.8"
[dev-dependencies]

View file

@ -0,0 +1,258 @@
use std::{fmt::Display, ops::Range, thread::available_parallelism};
use itertools::Itertools;
#[derive(Debug, Clone)]
pub struct TaskSetCpuList(pub Vec<TaskSetCpuIndicator>);
impl TaskSetCpuList {
pub fn as_cpu_list(&self) -> String {
let indicator = self.0.iter().map(|indicator| match indicator {
TaskSetCpuIndicator::Single(i) => i.to_string(),
TaskSetCpuIndicator::Range(range) => {
format!(
"{}-{}",
range.start,
range.clone().into_iter().last().unwrap()
)
}
});
Itertools::intersperse_with(indicator, || ",".to_string())
.into_iter()
.collect()
}
pub fn new(
mode: CpuMode,
direction: CpuDirection,
requested_cpus: usize,
) -> anyhow::Result<Self> {
let available_parallelism: usize = available_parallelism()?.into();
Ok(Self::new_with_available_parallelism(
available_parallelism,
mode,
direction,
requested_cpus,
))
}
fn new_with_available_parallelism(
available_parallelism: usize,
mode: CpuMode,
direction: CpuDirection,
requested_cpus: usize,
) -> Self {
match direction {
CpuDirection::Asc => match mode {
CpuMode::Split => {
let middle = available_parallelism / 2;
let range_a = 0..(middle.min(requested_cpus));
let range_b = middle..(available_parallelism.min(middle + requested_cpus));
Self(vec![
range_a.try_into().unwrap(),
range_b.try_into().unwrap(),
])
}
CpuMode::All => {
let range = 0..(available_parallelism.min(requested_cpus));
Self(vec![range.try_into().unwrap()])
}
},
CpuDirection::Desc => match mode {
CpuMode::Split => {
let middle = available_parallelism / 2;
let range_a = middle.saturating_sub(requested_cpus)..middle;
let range_b = available_parallelism
.saturating_sub(requested_cpus)
.max(middle)..available_parallelism;
Self(vec![
range_a.try_into().unwrap(),
range_b.try_into().unwrap(),
])
}
CpuMode::All => {
let range =
available_parallelism.saturating_sub(requested_cpus)..available_parallelism;
Self(vec![range.try_into().unwrap()])
}
},
}
}
}
impl TryFrom<Vec<Range<usize>>> for TaskSetCpuList {
type Error = String;
fn try_from(value: Vec<Range<usize>>) -> Result<Self, Self::Error> {
let mut output = Vec::new();
for range in value {
output.push(range.try_into()?);
}
Ok(Self(output))
}
}
#[derive(Debug, Clone)]
pub enum TaskSetCpuIndicator {
Single(usize),
Range(Range<usize>),
}
impl TryFrom<Range<usize>> for TaskSetCpuIndicator {
type Error = String;
fn try_from(value: Range<usize>) -> Result<Self, Self::Error> {
match value.len() {
0 => Err("Empty ranges not supported".into()),
1 => Ok(TaskSetCpuIndicator::Single(value.start)),
_ => Ok(TaskSetCpuIndicator::Range(value)),
}
}
}
#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum CpuMode {
Split,
All,
}
impl Display for CpuMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::All => f.write_str("all"),
Self::Split => f.write_str("split"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum CpuDirection {
Asc,
Desc,
}
pub fn simple_load_test_runs(cpu_mode: CpuMode, workers: &[usize]) -> Vec<(usize, TaskSetCpuList)> {
workers
.into_iter()
.copied()
.map(|workers| {
(
workers,
TaskSetCpuList::new(cpu_mode, CpuDirection::Desc, workers).unwrap(),
)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_set_cpu_list_split_asc() {
let f = TaskSetCpuList::new_with_available_parallelism;
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 1).as_cpu_list(),
"0,4"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 2).as_cpu_list(),
"0-1,4-5"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 4).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 8).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Asc, 9).as_cpu_list(),
"0-3,4-7"
);
}
#[test]
fn test_task_set_cpu_list_split_desc() {
let f = TaskSetCpuList::new_with_available_parallelism;
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 1).as_cpu_list(),
"3,7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 2).as_cpu_list(),
"2-3,6-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 4).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 8).as_cpu_list(),
"0-3,4-7"
);
assert_eq!(
f(8, CpuMode::Split, CpuDirection::Desc, 9).as_cpu_list(),
"0-3,4-7"
);
}
#[test]
fn test_task_set_cpu_list_all_asc() {
let f = TaskSetCpuList::new_with_available_parallelism;
assert_eq!(f(8, CpuMode::All, CpuDirection::Asc, 1).as_cpu_list(), "0");
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 2).as_cpu_list(),
"0-1"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 4).as_cpu_list(),
"0-3"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 8).as_cpu_list(),
"0-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Asc, 9).as_cpu_list(),
"0-7"
);
}
#[test]
fn test_task_set_cpu_list_all_desc() {
let f = TaskSetCpuList::new_with_available_parallelism;
assert_eq!(f(8, CpuMode::All, CpuDirection::Desc, 1).as_cpu_list(), "7");
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 2).as_cpu_list(),
"6-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 4).as_cpu_list(),
"4-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 8).as_cpu_list(),
"0-7"
);
assert_eq!(
f(8, CpuMode::All, CpuDirection::Desc, 9).as_cpu_list(),
"0-7"
);
}
}

View file

@ -0,0 +1,28 @@
pub mod common;
pub mod protocols;
pub mod run;
pub mod set;
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(author, version, about)]
struct Args {
#[command(subcommand)]
command: Command,
}
#[derive(Subcommand)]
enum Command {
#[cfg(feature = "udp")]
Udp(protocols::udp::UdpCommand),
}
fn main() {
let args = Args::parse();
match args.command {
#[cfg(feature = "udp")]
Command::Udp(command) => command.run().unwrap(),
}
}

View file

@ -0,0 +1,2 @@
#[cfg(feature = "udp")]
pub mod udp;

View file

@ -0,0 +1,236 @@
use std::{
io::Write,
path::PathBuf,
process::{Child, Command, Stdio},
rc::Rc,
};
use clap::Parser;
use indexmap::{indexmap, IndexMap};
use tempfile::NamedTempFile;
use crate::{
common::{simple_load_test_runs, CpuMode, TaskSetCpuList},
run::ProcessRunner,
set::{run_sets, Server, SetConfig},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum UdpServer {
Aquatic,
OpenTracker,
}
impl Server for UdpServer {
fn name(&self) -> String {
match self {
Self::Aquatic => "aquatic_udp".into(),
Self::OpenTracker => "opentracker".into(),
}
}
}
#[derive(Parser, Debug)]
pub struct UdpCommand {
#[arg(long, default_value_t = CpuMode::Split)]
cpu_mode: CpuMode,
#[arg(long, default_value = "./target/release-debug/aquatic_udp_load_test")]
load_test: PathBuf,
#[arg(long, default_value = "./target/release-debug/aquatic_udp")]
aquatic: PathBuf,
#[arg(long, default_value = "opentracker")]
opentracker: PathBuf,
}
impl UdpCommand {
pub fn run(&self) -> anyhow::Result<()> {
run_sets(self, self.cpu_mode, self.sets(), |workers| {
Box::new(AquaticUdpLoadTestProcessConfig { workers })
});
Ok(())
}
fn sets(&self) -> IndexMap<usize, SetConfig<UdpCommand, UdpServer>> {
indexmap::indexmap! {
1 => SetConfig {
implementations: indexmap! {
UdpServer::Aquatic => vec![
Rc::new(AquaticUdpProcessConfig {
socket_workers: 1,
swarm_workers: 1,
}) as Rc<dyn ProcessRunner<Command = UdpCommand>>,
],
/*
UdpServer::OpenTracker => vec![
Rc::new(OpenTrackerUdpProcessConfig {
workers: 1,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
Rc::new(OpenTrackerUdpProcessConfig {
workers: 2,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
],
*/
},
load_test_runs: simple_load_test_runs(self.cpu_mode, &[1, 2, 4]),
},
2 => SetConfig {
implementations: indexmap! {
UdpServer::Aquatic => vec![
Rc::new(AquaticUdpProcessConfig {
socket_workers: 1,
swarm_workers: 1,
}) as Rc<dyn ProcessRunner<Command = UdpCommand>>,
Rc::new(AquaticUdpProcessConfig {
socket_workers: 2,
swarm_workers: 1,
}) as Rc<dyn ProcessRunner<Command = UdpCommand>>,
],
/*
UdpServer::OpenTracker => vec![
Rc::new(OpenTrackerUdpProcessConfig {
workers: 2,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
Rc::new(OpenTrackerUdpProcessConfig {
workers: 4,
}) as Rc<dyn RunProcess<Command = UdpCommand>>,
],
*/
},
load_test_runs: simple_load_test_runs(self.cpu_mode, &[1, 2, 4]),
},
}
}
}
#[derive(Debug, Clone)]
pub struct AquaticUdpProcessConfig {
socket_workers: usize,
swarm_workers: usize,
}
impl ProcessRunner for AquaticUdpProcessConfig {
type Command = UdpCommand;
fn run(
&self,
command: &Self::Command,
vcpus: &TaskSetCpuList,
tmp_file: &mut NamedTempFile,
) -> anyhow::Result<Child> {
let mut c = aquatic_udp::config::Config::default();
c.socket_workers = self.socket_workers;
c.swarm_workers = self.swarm_workers;
let c = toml::to_string_pretty(&c)?;
tmp_file.write_all(c.as_bytes())?;
Ok(Command::new("taskset")
.arg("--cpu-list")
.arg(vcpus.as_cpu_list())
.arg(&command.aquatic)
.arg("-c")
.arg(tmp_file.path())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?)
}
fn info(&self) -> String {
format!(
"socket workers: {}, swarm workers: {}",
self.socket_workers, self.swarm_workers
)
}
fn keys(&self) -> IndexMap<String, String> {
indexmap! {
"socket workers".to_string() => self.socket_workers.to_string(),
"swarm workers".to_string() => self.swarm_workers.to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct OpenTrackerUdpProcessConfig {
workers: usize,
}
impl ProcessRunner for OpenTrackerUdpProcessConfig {
type Command = UdpCommand;
fn run(
&self,
command: &Self::Command,
vcpus: &TaskSetCpuList,
tmp_file: &mut NamedTempFile,
) -> anyhow::Result<Child> {
writeln!(tmp_file, "{}", self.workers)?; // FIXME
Ok(Command::new("taskset")
.arg("--cpu-list")
.arg(vcpus.as_cpu_list())
.arg(&command.opentracker)
.arg("-f")
.arg(tmp_file.path())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?)
}
fn info(&self) -> String {
format!("workers: {}", self.workers)
}
fn keys(&self) -> IndexMap<String, String> {
indexmap! {
"workers".to_string() => self.workers.to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct AquaticUdpLoadTestProcessConfig {
workers: usize,
}
impl ProcessRunner for AquaticUdpLoadTestProcessConfig {
type Command = UdpCommand;
fn run(
&self,
command: &Self::Command,
vcpus: &TaskSetCpuList,
tmp_file: &mut NamedTempFile,
) -> anyhow::Result<Child> {
let mut c = aquatic_udp_load_test::config::Config::default();
c.workers = self.workers as u8;
c.duration = 60;
let c = toml::to_string_pretty(&c)?;
tmp_file.write_all(c.as_bytes())?;
Ok(Command::new("taskset")
.arg("--cpu-list")
.arg(vcpus.as_cpu_list())
.arg(&command.load_test)
.arg("-c")
.arg(tmp_file.path())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?)
}
fn info(&self) -> String {
format!("workers: {}", self.workers)
}
fn keys(&self) -> IndexMap<String, String> {
indexmap! {
"workers".to_string() => self.workers.to_string(),
}
}
}

View file

@ -0,0 +1,274 @@
use std::{
process::{Child, Command},
rc::Rc,
str::FromStr,
time::Duration,
};
use indexmap::IndexMap;
use nonblock::NonBlockingReader;
use once_cell::sync::Lazy;
use regex::Regex;
use tempfile::NamedTempFile;
use crate::common::TaskSetCpuList;
pub trait ProcessRunner: ::std::fmt::Debug {
type Command;
fn run(
&self,
command: &Self::Command,
vcpus: &TaskSetCpuList,
tmp_file: &mut NamedTempFile,
) -> anyhow::Result<Child>;
fn info(&self) -> String;
fn keys(&self) -> IndexMap<String, String>;
}
#[derive(Debug)]
pub struct RunConfig<C> {
pub server_runner: Rc<dyn ProcessRunner<Command = C>>,
pub server_vcpus: TaskSetCpuList,
pub load_test_runner: Box<dyn ProcessRunner<Command = C>>,
pub load_test_vcpus: TaskSetCpuList,
}
impl<C> RunConfig<C> {
pub fn run(self, command: &C) -> Result<RunResults<C>, RunResults<C>> {
let mut server_config_file = NamedTempFile::new().unwrap();
let mut load_test_config_file = NamedTempFile::new().unwrap();
let server =
match self
.server_runner
.run(command, &self.server_vcpus, &mut server_config_file)
{
Ok(handle) => ChildWrapper(handle),
Err(err) => return Err(RunResults::new(self).set_error(err.into(), "run server")),
};
::std::thread::sleep(Duration::from_secs(1));
let mut load_tester = match self.load_test_runner.run(
command,
&self.load_test_vcpus,
&mut load_test_config_file,
) {
Ok(handle) => ChildWrapper(handle),
Err(err) => {
return Err(RunResults::new(self)
.set_error(err.into(), "run load test")
.set_server(server))
}
};
::std::thread::sleep(Duration::from_secs(59));
let cpu_stats_res = Command::new("ps")
.arg("-p")
.arg(server.0.id().to_string())
.arg("-o")
.arg("%cpu,rss")
.arg("--noheader")
.output();
let server_process_stats = match cpu_stats_res {
Ok(output) if output.status.success() => {
ProcessStats::from_str(&String::from_utf8_lossy(&output.stdout)).unwrap()
}
Ok(_) => {
return Err(RunResults::new(self)
.set_error_context("run ps")
.set_server(server)
.set_load_test(load_tester));
}
Err(err) => {
return Err(RunResults::new(self)
.set_error(err.into(), "run ps")
.set_server(server)
.set_load_test(load_tester));
}
};
::std::thread::sleep(Duration::from_secs(5));
let load_test_data = match load_tester.0.try_wait() {
Ok(Some(status)) if status.success() => read_child_outputs(load_tester),
Ok(Some(_)) => {
return Err(RunResults::new(self)
.set_error_context("wait for load tester")
.set_server(server)
.set_load_test(load_tester))
}
Ok(None) => {
if let Err(err) = load_tester.0.kill() {
return Err(RunResults::new(self)
.set_error(err.into(), "kill load tester")
.set_server(server)
.set_load_test(load_tester));
}
::std::thread::sleep(Duration::from_secs(1));
match load_tester.0.try_wait() {
Ok(_) => {
return Err(RunResults::new(self)
.set_error_context("load tester didn't finish in time")
.set_load_test(load_tester))
}
Err(err) => {
return Err(RunResults::new(self)
.set_error(err.into(), "wait for load tester after kill")
.set_server(server));
}
}
}
Err(err) => {
return Err(RunResults::new(self)
.set_error(err.into(), "wait for load tester")
.set_server(server)
.set_load_test(load_tester))
}
};
let mut results = RunResults::new(self);
results.server_process_stats = Some(server_process_stats);
results.load_test_stdout = load_test_data.0;
results.load_test_stderr = load_test_data.1;
Ok(results)
}
}
#[derive(Debug)]
pub struct RunResults<C> {
pub run_config: RunConfig<C>,
pub server_process_stats: Option<ProcessStats>,
pub server_stdout: Option<String>,
pub server_stderr: Option<String>,
pub load_test_stdout: Option<String>,
pub load_test_stderr: Option<String>,
pub error: Option<anyhow::Error>,
pub error_context: Option<String>,
}
impl<C> RunResults<C> {
pub fn avg_responses(&self) -> Option<String> {
static RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"Average responses per second: ([0-9]+\.?[0-9]+)").unwrap());
self.load_test_stdout.as_ref().and_then(|stdout| {
RE.captures_iter(&stdout).next().map(|c| {
let (_, [avg_responses]) = c.extract();
avg_responses.to_string()
})
})
}
fn new(run_config: RunConfig<C>) -> Self {
Self {
run_config,
server_process_stats: Default::default(),
server_stdout: Default::default(),
server_stderr: Default::default(),
load_test_stdout: Default::default(),
load_test_stderr: Default::default(),
error: Default::default(),
error_context: Default::default(),
}
}
fn set_server(mut self, server: ChildWrapper) -> Self {
let (stdout, stderr) = read_child_outputs(server);
self.server_stdout = stdout;
self.server_stderr = stderr;
self
}
fn set_load_test(mut self, load_test: ChildWrapper) -> Self {
let (stdout, stderr) = read_child_outputs(load_test);
self.load_test_stdout = stdout;
self.load_test_stderr = stderr;
self
}
fn set_error(mut self, error: anyhow::Error, context: &str) -> Self {
self.error = Some(error);
self.error_context = Some(context.to_string());
self
}
fn set_error_context(mut self, context: &str) -> Self {
self.error_context = Some(context.to_string());
self
}
}
#[derive(Debug, Clone, Copy)]
pub struct ProcessStats {
pub avg_cpu_utilization: f32,
pub peak_rss_kb: f32,
}
impl FromStr for ProcessStats {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.trim().split_whitespace();
Ok(Self {
avg_cpu_utilization: parts.next().ok_or(())?.parse().map_err(|_| ())?,
peak_rss_kb: parts.next().ok_or(())?.parse().map_err(|_| ())?,
})
}
}
struct ChildWrapper(Child);
impl Drop for ChildWrapper {
fn drop(&mut self) {
let _ = self.0.kill();
::std::thread::sleep(Duration::from_secs(1));
let _ = self.0.try_wait();
}
}
impl AsMut<Child> for ChildWrapper {
fn as_mut(&mut self) -> &mut Child {
&mut self.0
}
}
fn read_child_outputs(mut child: ChildWrapper) -> (Option<String>, Option<String>) {
let stdout = child.0.stdout.take().map(|stdout| {
let mut buf = String::new();
let mut reader = NonBlockingReader::from_fd(stdout).unwrap();
reader.read_available_to_string(&mut buf).unwrap();
buf
});
let stderr = child.0.stderr.take().map(|stderr| {
let mut buf = String::new();
let mut reader = NonBlockingReader::from_fd(stderr).unwrap();
reader.read_available_to_string(&mut buf).unwrap();
buf
});
(stdout, stderr)
}

View file

@ -0,0 +1,291 @@
use std::rc::Rc;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
use crate::{
common::{CpuDirection, CpuMode, TaskSetCpuList},
run::{ProcessRunner, ProcessStats, RunConfig},
};
pub trait Server: ::std::fmt::Debug + Copy + Clone + ::std::hash::Hash {
fn name(&self) -> String;
}
pub struct SetConfig<C, I> {
pub implementations: IndexMap<I, Vec<Rc<dyn ProcessRunner<Command = C>>>>,
pub load_test_runs: Vec<(usize, TaskSetCpuList)>,
}
pub fn run_sets<C, F, I>(
command: &C,
cpu_mode: CpuMode,
set_configs: IndexMap<usize, SetConfig<C, I>>,
load_test_gen: F,
) where
C: ::std::fmt::Debug,
I: Server,
F: Fn(usize) -> Box<dyn ProcessRunner<Command = C>>,
{
println!("# Load test report");
let results = set_configs
.into_iter()
.map(|(server_core_count, set_config)| {
let server_vcpus =
TaskSetCpuList::new(cpu_mode, CpuDirection::Asc, server_core_count).unwrap();
println!(
"## Tracker cores: {} (cpus: {})",
server_core_count,
server_vcpus.as_cpu_list()
);
let server_results = set_config
.implementations
.into_iter()
.map(|(implementation, server_runs)| {
let server_run_results = server_runs
.iter()
.map(|server_run| {
let load_test_run_results = set_config
.load_test_runs
.clone()
.into_iter()
.map(|(workers, load_test_vcpus)| {
LoadTestRunResults::produce(
command,
&load_test_gen,
implementation,
&server_run,
server_vcpus.clone(),
workers,
load_test_vcpus,
)
})
.collect();
ServerConfigurationResults {
config_keys: server_run.keys(),
load_tests: load_test_run_results,
}
})
.collect();
ImplementationResults {
name: implementation.name(),
configurations: server_run_results,
}
})
.collect();
ServerCoreCountResults {
core_count: server_core_count,
implementations: server_results,
}
})
.collect::<Vec<_>>();
html_summary(&results);
}
pub struct ServerCoreCountResults {
core_count: usize,
implementations: Vec<ImplementationResults>,
}
pub struct ImplementationResults {
name: String,
configurations: Vec<ServerConfigurationResults>,
}
impl ImplementationResults {
fn best_result(&self) -> Option<LoadTestRunResultsSuccess> {
self.configurations
.iter()
.filter_map(|c| c.best_result())
.reduce(|acc, r| {
if r.average_responses > acc.average_responses {
r
} else {
acc
}
})
}
}
pub struct ServerConfigurationResults {
config_keys: IndexMap<String, String>,
load_tests: Vec<LoadTestRunResults>,
// best_index: Option<usize>,
}
impl ServerConfigurationResults {
fn best_result(&self) -> Option<LoadTestRunResultsSuccess> {
self.load_tests
.iter()
.filter_map(|r| match r {
LoadTestRunResults::Success(r) => Some(r.clone()),
LoadTestRunResults::Failure(_) => None,
})
.reduce(|acc, r| {
if r.average_responses > acc.average_responses {
r
} else {
acc
}
})
}
}
pub enum LoadTestRunResults {
Success(LoadTestRunResultsSuccess),
Failure(LoadTestRunResultsFailure),
}
impl LoadTestRunResults {
pub fn produce<C, F, I>(
command: &C,
load_test_gen: &F,
implementation: I,
server_process: &Rc<dyn ProcessRunner<Command = C>>,
server_vcpus: TaskSetCpuList,
workers: usize,
load_test_vcpus: TaskSetCpuList,
) -> Self
where
C: ::std::fmt::Debug,
I: Server,
F: Fn(usize) -> Box<dyn ProcessRunner<Command = C>>,
{
println!(
"### {} run ({}) (load test workers: {}, cpus: {})",
implementation.name(),
server_process.info(),
workers,
load_test_vcpus.as_cpu_list()
);
let load_test_runner = load_test_gen(workers);
let load_test_keys = load_test_runner.keys();
let run_config = RunConfig {
server_runner: server_process.clone(),
server_vcpus: server_vcpus.clone(),
load_test_runner,
load_test_vcpus,
};
match run_config.run(command) {
Ok(results) => {
let avg_responses = results.avg_responses().unwrap().parse::<f32>().unwrap();
let server_process_stats = results.server_process_stats.unwrap();
println!("- Average responses per second: {}", avg_responses);
println!(
"- Average server CPU utilization: {}%",
server_process_stats.avg_cpu_utilization,
);
println!("- Peak server RSS: {} kB", server_process_stats.peak_rss_kb);
LoadTestRunResults::Success(LoadTestRunResultsSuccess {
config_keys: load_test_keys,
average_responses: avg_responses,
server_process_stats,
})
}
Err(results) => {
println!("\nRun failed:\n{:?}\n", results);
LoadTestRunResults::Failure(LoadTestRunResultsFailure {
config_keys: load_test_keys,
})
}
}
}
}
#[derive(Clone)]
pub struct LoadTestRunResultsSuccess {
config_keys: IndexMap<String, String>,
average_responses: f32,
server_process_stats: ProcessStats,
}
pub struct LoadTestRunResultsFailure {
config_keys: IndexMap<String, String>,
}
pub fn html_summary(results: &[ServerCoreCountResults]) {
let mut all_implementation_names = IndexSet::new();
for core_count_results in results {
all_implementation_names.extend(
core_count_results
.implementations
.iter()
.map(|r| r.name.clone()),
);
}
let mut data_rows = Vec::new();
for core_count_results in results {
let best_results = core_count_results
.implementations
.iter()
.map(|implementation| (implementation.name.clone(), implementation.best_result()))
.collect::<IndexMap<_, _>>();
let best_results_for_all_implementations = all_implementation_names
.iter()
.map(|name| {
best_results
.get(name)
.and_then(|r| r.as_ref().map(|r| r.average_responses))
})
.collect::<Vec<_>>();
let data_row = format!(
"
<tr>
<th>{}</th>
{}
</tr>
",
core_count_results.core_count,
best_results_for_all_implementations
.into_iter()
.map(|result| format!(
"<td>{}</td>",
result
.map(|r| r.to_string())
.unwrap_or_else(|| "-".to_string())
))
.join("\n"),
);
data_rows.push(data_row);
}
println!(
"
<table>
<thead>
<tr>
<th>CPU cores</th>
{}
</tr>
</thead>
<tbody>
{}
</tbody>
</table>
",
all_implementation_names
.iter()
.map(|name| format!("<th>{name}</th>"))
.join("\n"),
data_rows.join("\n")
)
}

View file

@ -2,13 +2,13 @@ use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use cfg_if::cfg_if;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use aquatic_common::cli::LogLevel;
use aquatic_toml_config::TomlConfig;
/// aquatic_udp configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Number of socket worker. One per physical core is recommended.
@ -78,7 +78,7 @@ impl aquatic_common::cli::Config for Config {
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
@ -138,7 +138,7 @@ impl Default for NetworkConfig {
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ProtocolConfig {
/// Maximum number of torrents to allow in scrape request
@ -159,7 +159,7 @@ impl Default for ProtocolConfig {
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct StatisticsConfig {
/// Collect and print/write statistics this often (seconds)
@ -231,7 +231,7 @@ impl Default for StatisticsConfig {
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct CleaningConfig {
/// Clean torrents this often (seconds)

View file

@ -13,6 +13,9 @@ rust-version.workspace = true
[features]
cpu-pinning = ["aquatic_common/hwloc"]
[lib]
name = "aquatic_udp_load_test"
[[bin]]
name = "aquatic_udp_load_test"

View file

@ -1,6 +1,6 @@
use std::net::SocketAddr;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use aquatic_common::cli::LogLevel;
#[cfg(feature = "cpu-pinning")]
@ -8,7 +8,7 @@ use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig;
/// aquatic_udp_load_test configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Server address
@ -42,7 +42,7 @@ impl Default for Config {
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// True means bind to one localhost IP per socket.
@ -56,8 +56,6 @@ pub struct NetworkConfig {
pub first_port: u16,
/// Socket worker poll timeout in microseconds
pub poll_timeout: u64,
/// Socket worker polling event number
pub poll_event_capacity: usize,
/// Size of socket recv buffer. Use 0 for OS default.
///
/// This setting can have a big impact on dropped packages. It might
@ -79,13 +77,12 @@ impl Default for NetworkConfig {
multiple_client_ipv4s: true,
first_port: 45_000,
poll_timeout: 276,
poll_event_capacity: 2_877,
recv_buffer: 6_000_000,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct RequestConfig {
/// Number of torrents to simulate

View file

@ -0,0 +1,191 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize;
use std::sync::{atomic::Ordering, Arc};
use std::thread::{self, Builder};
use std::time::{Duration, Instant};
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use rand_distr::Gamma;
pub mod common;
pub mod config;
pub mod utils;
pub mod worker;
use common::*;
use config::Config;
use utils::*;
use worker::*;
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.requests.weight_announce
+ config.requests.weight_connect
+ config.requests.weight_scrape
== 0
{
panic!("Error: at least one weight must be larger than zero.");
}
println!("Starting client with config: {:#?}", config);
let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents);
for _ in 0..config.requests.number_of_torrents {
info_hashes.push(generate_info_hash());
}
let state = LoadTestState {
info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()),
};
let gamma = Gamma::new(
config.requests.torrent_gamma_shape,
config.requests.torrent_gamma_scale,
)
.unwrap();
// Start workers
for i in 0..config.workers {
let port = config.network.first_port + (i as u16);
let ip = if config.server_address.is_ipv6() {
Ipv6Addr::LOCALHOST.into()
} else {
if config.network.multiple_client_ipv4s {
Ipv4Addr::new(127, 0, 0, 1 + i).into()
} else {
Ipv4Addr::LOCALHOST.into()
}
};
let addr = SocketAddr::new(ip, port);
let config = config.clone();
let state = state.clone();
Builder::new().name("load-test".into()).spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.workers as usize,
0,
WorkerIndex::SocketWorker(i as usize),
);
run_worker_thread(state, gamma, &config, addr)
})?;
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.workers as usize,
0,
WorkerIndex::Util,
);
monitor_statistics(state, &config);
Ok(())
}
fn monitor_statistics(state: LoadTestState, config: &Config) {
let mut report_avg_connect: Vec<f64> = Vec::new();
let mut report_avg_announce: Vec<f64> = Vec::new();
let mut report_avg_scrape: Vec<f64> = Vec::new();
let mut report_avg_error: Vec<f64> = Vec::new();
let interval = 5;
let start_time = Instant::now();
let duration = Duration::from_secs(config.duration as u64);
let mut last = start_time;
let time_elapsed = loop {
thread::sleep(Duration::from_secs(interval));
let requests = fetch_and_reset(&state.statistics.requests);
let response_peers = fetch_and_reset(&state.statistics.response_peers);
let responses_connect = fetch_and_reset(&state.statistics.responses_connect);
let responses_announce = fetch_and_reset(&state.statistics.responses_announce);
let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape);
let responses_error = fetch_and_reset(&state.statistics.responses_error);
let now = Instant::now();
let elapsed = (now - last).as_secs_f64();
last = now;
let peers_per_announce_response = response_peers / responses_announce;
let avg_requests = requests / elapsed;
let avg_responses_connect = responses_connect / elapsed;
let avg_responses_announce = responses_announce / elapsed;
let avg_responses_scrape = responses_scrape / elapsed;
let avg_responses_error = responses_error / elapsed;
let avg_responses = avg_responses_connect
+ avg_responses_announce
+ avg_responses_scrape
+ avg_responses_error;
report_avg_connect.push(avg_responses_connect);
report_avg_announce.push(avg_responses_announce);
report_avg_scrape.push(avg_responses_scrape);
report_avg_error.push(avg_responses_error);
println!();
println!("Requests out: {:.2}/second", avg_requests);
println!("Responses in: {:.2}/second", avg_responses);
println!(" - Connect responses: {:.2}", avg_responses_connect);
println!(" - Announce responses: {:.2}", avg_responses_announce);
println!(" - Scrape responses: {:.2}", avg_responses_scrape);
println!(" - Error responses: {:.2}", avg_responses_error);
println!(
"Peers per announce response: {:.2}",
peers_per_announce_response
);
let time_elapsed = start_time.elapsed();
if config.duration != 0 && time_elapsed >= duration {
break time_elapsed;
}
};
let len = report_avg_connect.len() as f64;
let avg_connect: f64 = report_avg_connect.into_iter().sum::<f64>() / len;
let avg_announce: f64 = report_avg_announce.into_iter().sum::<f64>() / len;
let avg_scrape: f64 = report_avg_scrape.into_iter().sum::<f64>() / len;
let avg_error: f64 = report_avg_error.into_iter().sum::<f64>() / len;
let avg_total = avg_connect + avg_announce + avg_scrape + avg_error;
println!();
println!("# aquatic load test report");
println!();
println!("Test ran for {} seconds", time_elapsed.as_secs());
println!("Average responses per second: {:.2}", avg_total);
println!(" - Connect responses: {:.2}", avg_connect);
println!(" - Announce responses: {:.2}", avg_announce);
println!(" - Scrape responses: {:.2}", avg_scrape);
println!(" - Error responses: {:.2}", avg_error);
println!();
println!("Config: {:#?}", config);
println!();
}
fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 {
atomic_usize.fetch_and(0, Ordering::Relaxed) as f64
}

View file

@ -1,22 +1,4 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize;
use std::sync::{atomic::Ordering, Arc};
use std::thread::{self, Builder};
use std::time::{Duration, Instant};
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use rand_distr::Gamma;
mod common;
mod config;
mod utils;
mod worker;
use common::*;
use config::Config;
use utils::*;
use worker::*;
use aquatic_udp_load_test::config::Config;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
@ -25,179 +7,7 @@ pub fn main() {
aquatic_common::cli::run_app_with_cli_and_config::<Config>(
"aquatic_udp_load_test: BitTorrent load tester",
env!("CARGO_PKG_VERSION"),
run,
aquatic_udp_load_test::run,
None,
)
}
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
fn run(config: Config) -> ::anyhow::Result<()> {
if config.requests.weight_announce
+ config.requests.weight_connect
+ config.requests.weight_scrape
== 0
{
panic!("Error: at least one weight must be larger than zero.");
}
println!("Starting client with config: {:#?}", config);
let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents);
for _ in 0..config.requests.number_of_torrents {
info_hashes.push(generate_info_hash());
}
let state = LoadTestState {
info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()),
};
let gamma = Gamma::new(
config.requests.torrent_gamma_shape,
config.requests.torrent_gamma_scale,
)
.unwrap();
// Start workers
for i in 0..config.workers {
let port = config.network.first_port + (i as u16);
let ip = if config.server_address.is_ipv6() {
Ipv6Addr::LOCALHOST.into()
} else {
if config.network.multiple_client_ipv4s {
Ipv4Addr::new(127, 0, 0, 1 + i).into()
} else {
Ipv4Addr::LOCALHOST.into()
}
};
let addr = SocketAddr::new(ip, port);
let config = config.clone();
let state = state.clone();
Builder::new().name("load-test".into()).spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.workers as usize,
0,
WorkerIndex::SocketWorker(i as usize),
);
run_worker_thread(state, gamma, &config, addr)
})?;
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.workers as usize,
0,
WorkerIndex::Util,
);
monitor_statistics(state, &config);
Ok(())
}
fn monitor_statistics(state: LoadTestState, config: &Config) {
let mut report_avg_connect: Vec<f64> = Vec::new();
let mut report_avg_announce: Vec<f64> = Vec::new();
let mut report_avg_scrape: Vec<f64> = Vec::new();
let mut report_avg_error: Vec<f64> = Vec::new();
let interval = 5;
let start_time = Instant::now();
let duration = Duration::from_secs(config.duration as u64);
let mut last = start_time;
let time_elapsed = loop {
thread::sleep(Duration::from_secs(interval));
let requests = fetch_and_reset(&state.statistics.requests);
let response_peers = fetch_and_reset(&state.statistics.response_peers);
let responses_connect = fetch_and_reset(&state.statistics.responses_connect);
let responses_announce = fetch_and_reset(&state.statistics.responses_announce);
let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape);
let responses_error = fetch_and_reset(&state.statistics.responses_error);
let now = Instant::now();
let elapsed = (now - last).as_secs_f64();
last = now;
let peers_per_announce_response = response_peers / responses_announce;
let avg_requests = requests / elapsed;
let avg_responses_connect = responses_connect / elapsed;
let avg_responses_announce = responses_announce / elapsed;
let avg_responses_scrape = responses_scrape / elapsed;
let avg_responses_error = responses_error / elapsed;
let avg_responses = avg_responses_connect
+ avg_responses_announce
+ avg_responses_scrape
+ avg_responses_error;
report_avg_connect.push(avg_responses_connect);
report_avg_announce.push(avg_responses_announce);
report_avg_scrape.push(avg_responses_scrape);
report_avg_error.push(avg_responses_error);
println!();
println!("Requests out: {:.2}/second", avg_requests);
println!("Responses in: {:.2}/second", avg_responses);
println!(" - Connect responses: {:.2}", avg_responses_connect);
println!(" - Announce responses: {:.2}", avg_responses_announce);
println!(" - Scrape responses: {:.2}", avg_responses_scrape);
println!(" - Error responses: {:.2}", avg_responses_error);
println!(
"Peers per announce response: {:.2}",
peers_per_announce_response
);
let time_elapsed = start_time.elapsed();
if config.duration != 0 && time_elapsed >= duration {
break time_elapsed;
}
};
let len = report_avg_connect.len() as f64;
let avg_connect: f64 = report_avg_connect.into_iter().sum::<f64>() / len;
let avg_announce: f64 = report_avg_announce.into_iter().sum::<f64>() / len;
let avg_scrape: f64 = report_avg_scrape.into_iter().sum::<f64>() / len;
let avg_error: f64 = report_avg_error.into_iter().sum::<f64>() / len;
let avg_total = avg_connect + avg_announce + avg_scrape + avg_error;
println!();
println!("# aquatic load test report");
println!();
println!("Test ran for {} seconds", time_elapsed.as_secs());
println!("Average responses per second: {:.2}", avg_total);
println!(" - Connect responses: {:.2}", avg_connect);
println!(" - Announce responses: {:.2}", avg_announce);
println!(" - Scrape responses: {:.2}", avg_scrape);
println!(" - Error responses: {:.2}", avg_error);
println!();
println!("Config: {:#?}", config);
println!();
}
fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 {
atomic_usize.fetch_and(0, Ordering::Relaxed) as f64
}

View file

@ -41,7 +41,7 @@ pub fn run_worker_thread(
.register(&mut socket, token, interests)
.unwrap();
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut events = Events::with_capacity(1);
let mut statistics = SocketWorkerLocalStatistics::default();