udp load test: display stats on announce responses per info hash

This commit is contained in:
Joakim Frostegård 2024-02-06 18:06:12 +01:00
parent 5cad19c12e
commit 84aa830e64
6 changed files with 100 additions and 17 deletions

1
Cargo.lock generated
View file

@ -335,6 +335,7 @@ dependencies = [
"aquatic_common",
"aquatic_toml_config",
"aquatic_udp_protocol",
"crossbeam-channel",
"hdrhistogram",
"mimalloc",
"quickcheck",

View file

@ -25,6 +25,7 @@ aquatic_toml_config.workspace = true
aquatic_udp_protocol.workspace = true
anyhow = "1"
crossbeam-channel = "0.5"
hdrhistogram = "7"
mimalloc = { version = "0.1", default-features = false }
rand_distr = "0.4"

View file

@ -1,5 +1,6 @@
use std::sync::{atomic::AtomicUsize, Arc};
use aquatic_common::IndexMap;
use aquatic_udp_protocol::*;
#[derive(Clone)]
@ -19,8 +20,13 @@ pub struct SharedStatistics {
}
pub struct Peer {
pub announce_info_hash_index: usize,
pub announce_info_hash: InfoHash,
pub announce_port: Port,
pub scrape_info_hash_indices: Box<[usize]>,
pub socket_index: u8,
}
pub enum StatisticsMessage {
ResponsesPerInfoHash(IndexMap<usize, u64>),
}

View file

@ -50,6 +50,12 @@ impl Default for Config {
}
}
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {

View file

@ -9,6 +9,7 @@ use std::time::{Duration, Instant};
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::IndexMap;
use aquatic_udp_protocol::{InfoHash, Port};
use crossbeam_channel::{unbounded, Receiver};
use hdrhistogram::Histogram;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
@ -22,11 +23,7 @@ use common::*;
use config::Config;
use worker::*;
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
const PERCENTILES: &[f64] = &[10.0, 25.0, 50.0, 75.0, 90.0, 95.0, 99.0, 99.9, 100.0];
pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.requests.weight_announce
@ -51,6 +48,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
statistics: Arc::new(SharedStatistics::default()),
};
let (statistics_sender, statistics_receiver) = unbounded();
// Start workers
for (i, peers) in (0..config.workers).zip(peers_by_worker) {
@ -65,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let addr = SocketAddr::new(ip, 0);
let config = config.clone();
let state = state.clone();
let statistics_sender = statistics_sender.clone();
Builder::new().name("load-test".into()).spawn(move || {
#[cfg(feature = "cpu-pinning")]
@ -75,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::SocketWorker(i as usize),
);
Worker::run(config, state, peers, addr)
Worker::run(config, state, statistics_sender, peers, addr)
})?;
}
@ -87,12 +87,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::Util,
);
monitor_statistics(state, &config);
monitor_statistics(state, &config, statistics_receiver);
Ok(())
}
fn monitor_statistics(state: LoadTestState, config: &Config) {
fn monitor_statistics(
state: LoadTestState,
config: &Config,
statistics_receiver: Receiver<StatisticsMessage>,
) {
let mut report_avg_connect: Vec<f64> = Vec::new();
let mut report_avg_announce: Vec<f64> = Vec::new();
let mut report_avg_scrape: Vec<f64> = Vec::new();
@ -108,6 +112,21 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
let time_elapsed = loop {
thread::sleep(Duration::from_secs(INTERVAL));
let mut opt_responses_per_info_hash: Option<IndexMap<usize, u64>> =
config.peer_histogram.then_some(Default::default());
for message in statistics_receiver.try_iter() {
match message {
StatisticsMessage::ResponsesPerInfoHash(data) => {
if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_mut() {
for (k, v) in data {
*responses_per_info_hash.entry(k).or_default() += v;
}
}
}
}
}
let requests = fetch_and_reset(&state.statistics.requests);
let response_peers = fetch_and_reset(&state.statistics.response_peers);
let responses_connect = fetch_and_reset(&state.statistics.responses_connect);
@ -151,6 +170,20 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
peers_per_announce_response
);
if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_ref() {
let mut histogram = Histogram::<u64>::new(2).unwrap();
for num_responses in responses_per_info_hash.values().copied() {
histogram.record(num_responses).unwrap();
}
println!("Announce responses per info hash:");
for p in PERCENTILES {
println!(" - p{}: {}", p, histogram.value_at_percentile(*p));
}
}
let time_elapsed = start_time.elapsed();
if config.duration != 0 && time_elapsed >= duration {
@ -225,6 +258,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec<Box<[Peer
}
Peer {
announce_info_hash_index,
announce_info_hash,
announce_port: Port::new(rng.gen()),
scrape_info_hash_indices,
@ -243,16 +277,10 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec<Box<[Peer
histogram.record(*num_peers).unwrap();
}
let percentiles = [
1.0, 10.0, 25.0, 50.0, 75.0, 85.0, 90.0, 95.0, 98.0, 99.9, 100.0,
];
println!("Peers per info hash:");
for p in percentiles {
let value = histogram.value_at_percentile(p);
println!(" - p{}: {}", p, value);
for p in PERCENTILES {
println!(" - p{}: {}", p, histogram.value_at_percentile(*p));
}
}

View file

@ -3,6 +3,8 @@ use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::Ordering;
use std::time::Duration;
use aquatic_common::IndexMap;
use crossbeam_channel::Sender;
use rand::Rng;
use rand::{prelude::SmallRng, SeedableRng};
use rand_distr::{Distribution, WeightedIndex};
@ -12,6 +14,7 @@ use aquatic_udp_protocol::*;
use crate::common::{LoadTestState, Peer};
use crate::config::Config;
use crate::StatisticsMessage;
const MAX_PACKET_SIZE: usize = 8192;
@ -25,10 +28,18 @@ pub struct Worker {
buffer: [u8; MAX_PACKET_SIZE],
rng: SmallRng,
statistics: LocalStatistics,
statistics_sender: Sender<StatisticsMessage>,
announce_responses_per_info_hash: IndexMap<usize, u64>,
}
impl Worker {
pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) {
pub fn run(
config: Config,
shared_state: LoadTestState,
statistics_sender: Sender<StatisticsMessage>,
peers: Box<[Peer]>,
addr: SocketAddr,
) {
let mut sockets = Vec::new();
for _ in 0..config.network.sockets_per_worker {
@ -50,6 +61,8 @@ impl Worker {
buffer,
rng,
statistics,
statistics_sender,
announce_responses_per_info_hash: Default::default(),
};
instance.run_inner();
@ -267,10 +280,30 @@ impl Worker {
Response::AnnounceIpv4(r) => {
self.statistics.responses_announce += 1;
self.statistics.response_peers += r.peers.len();
let peer_index =
u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize;
if let Some(peer) = self.peers.get(peer_index) {
*self
.announce_responses_per_info_hash
.entry(peer.announce_info_hash_index)
.or_default() += 1;
}
}
Response::AnnounceIpv6(r) => {
self.statistics.responses_announce += 1;
self.statistics.response_peers += r.peers.len();
let peer_index =
u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize;
if let Some(peer) = self.peers.get(peer_index) {
*self
.announce_responses_per_info_hash
.entry(peer.announce_info_hash_index)
.or_default() += 1;
}
}
Response::Scrape(_) => {
self.statistics.responses_scrape += 1;
@ -303,6 +336,14 @@ impl Worker {
.response_peers
.fetch_add(self.statistics.response_peers, Ordering::Relaxed);
if self.config.peer_histogram {
let message = StatisticsMessage::ResponsesPerInfoHash(
self.announce_responses_per_info_hash.split_off(0),
);
self.statistics_sender.try_send(message).unwrap();
}
self.statistics = LocalStatistics::default();
}
}