Merge pull request #33 from greatest-ape/udp-load-test-fixes

aquatic_udp_load_test improvements
This commit is contained in:
Joakim Frostegård 2021-11-28 11:43:34 +01:00 committed by GitHub
commit f81244287b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 66 deletions

View file

@ -18,6 +18,9 @@ pub struct Config {
pub workers: u8, pub workers: u8,
/// Run duration (quit and generate report after this many seconds) /// Run duration (quit and generate report after this many seconds)
pub duration: usize, pub duration: usize,
/// Probability that an additional connect request will be sent for each
/// mio event
pub additional_request_probability: f32,
pub network: NetworkConfig, pub network: NetworkConfig,
pub handler: HandlerConfig, pub handler: HandlerConfig,
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
@ -88,6 +91,7 @@ impl Default for Config {
log_level: LogLevel::Error, log_level: LogLevel::Error,
workers: 1, workers: 1,
duration: 0, duration: 0,
additional_request_probability: 0.5,
network: NetworkConfig::default(), network: NetworkConfig::default(),
handler: HandlerConfig::default(), handler: HandlerConfig::default(),
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]

View file

@ -1,4 +1,5 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize;
use std::sync::{atomic::Ordering, Arc}; use std::sync::{atomic::Ordering, Arc};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -102,77 +103,94 @@ fn run(config: Config) -> ::anyhow::Result<()> {
} }
fn monitor_statistics(state: LoadTestState, config: &Config) { fn monitor_statistics(state: LoadTestState, config: &Config) {
let start_time = Instant::now(); let mut report_avg_connect: Vec<f64> = Vec::new();
let mut report_avg_response_vec: Vec<f64> = Vec::new(); let mut report_avg_announce: Vec<f64> = Vec::new();
let mut report_avg_scrape: Vec<f64> = Vec::new();
let mut report_avg_error: Vec<f64> = Vec::new();
let interval = 5; let interval = 5;
let interval_f64 = interval as f64;
loop { let start_time = Instant::now();
let duration = Duration::from_secs(config.duration as u64);
let mut last = start_time;
let time_elapsed = loop {
thread::sleep(Duration::from_secs(interval)); thread::sleep(Duration::from_secs(interval));
let statistics = state.statistics.as_ref(); let requests = fetch_and_reset(&state.statistics.requests);
let response_peers = fetch_and_reset(&state.statistics.response_peers);
let responses_connect = fetch_and_reset(&state.statistics.responses_connect);
let responses_announce = fetch_and_reset(&state.statistics.responses_announce);
let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape);
let responses_error = fetch_and_reset(&state.statistics.responses_error);
let responses_announce = let now = Instant::now();
statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64;
let response_peers = statistics.response_peers.fetch_and(0, Ordering::SeqCst) as f64;
let requests_per_second = let elapsed = (now - last).as_secs_f64();
statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_connect_per_second =
statistics.responses_connect.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_scrape_per_second =
statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_error_per_second =
statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_announce_per_second = responses_announce / interval_f64; last = now;
let responses_per_second = responses_connect_per_second let peers_per_announce_response = response_peers / responses_announce;
+ responses_announce_per_second
+ responses_scrape_per_second
+ responses_error_per_second;
report_avg_response_vec.push(responses_per_second); let avg_requests = requests / elapsed;
let avg_responses_connect = responses_connect / elapsed;
let avg_responses_announce = responses_announce / elapsed;
let avg_responses_scrape = responses_scrape / elapsed;
let avg_responses_error = responses_error / elapsed;
let avg_responses = avg_responses_connect
+ avg_responses_announce
+ avg_responses_scrape
+ avg_responses_error;
report_avg_connect.push(avg_responses_connect);
report_avg_announce.push(avg_responses_announce);
report_avg_scrape.push(avg_responses_scrape);
report_avg_error.push(avg_responses_error);
println!(); println!();
println!("Requests out: {:.2}/second", requests_per_second); println!("Requests out: {:.2}/second", avg_requests);
println!("Responses in: {:.2}/second", responses_per_second); println!("Responses in: {:.2}/second", avg_responses);
println!( println!(" - Connect responses: {:.2}", avg_responses_connect);
" - Connect responses: {:.2}", println!(" - Announce responses: {:.2}", avg_responses_announce);
responses_connect_per_second println!(" - Scrape responses: {:.2}", avg_responses_scrape);
); println!(" - Error responses: {:.2}", avg_responses_error);
println!(
" - Announce responses: {:.2}",
responses_announce_per_second
);
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
println!(" - Error responses: {:.2}", responses_error_per_second);
println!( println!(
"Peers per announce response: {:.2}", "Peers per announce response: {:.2}",
response_peers / responses_announce peers_per_announce_response
); );
let time_elapsed = start_time.elapsed(); let time_elapsed = start_time.elapsed();
let duration = Duration::from_secs(config.duration as u64);
if config.duration != 0 && time_elapsed >= duration { if config.duration != 0 && time_elapsed >= duration {
let report_len = report_avg_response_vec.len() as f64; break time_elapsed;
let report_sum: f64 = report_avg_response_vec.into_iter().sum(); }
let report_avg: f64 = report_sum / report_len; };
println!( let len = report_avg_connect.len() as f64;
concat!(
"\n# aquatic load test report\n\n",
"Test ran for {} seconds.\n",
"Average responses per second: {:.2}\n\nConfig: {:#?}\n"
),
time_elapsed.as_secs(),
report_avg,
config
);
break; let avg_connect: f64 = report_avg_connect.into_iter().sum::<f64>() / len;
} let avg_announce: f64 = report_avg_announce.into_iter().sum::<f64>() / len;
let avg_scrape: f64 = report_avg_scrape.into_iter().sum::<f64>() / len;
let avg_error: f64 = report_avg_error.into_iter().sum::<f64>() / len;
let avg_total = avg_connect + avg_announce + avg_scrape + avg_error;
println!();
println!("# aquatic load test report");
println!();
println!("Test ran for {} seconds", time_elapsed.as_secs());
println!("Average responses per second: {:.2}", avg_total);
println!(" - Connect responses: {:.2}", avg_connect);
println!(" - Announce responses: {:.2}", avg_announce);
println!(" - Scrape responses: {:.2}", avg_scrape);
println!(" - Error responses: {:.2}", avg_error);
println!();
println!("Config: {:#?}", config);
println!();
} }
fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 {
atomic_usize.fetch_and(0, Ordering::Relaxed) as f64
} }

View file

@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
use std::time::Duration; use std::time::Duration;
use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use mio::{net::UdpSocket, Events, Interest, Poll, Token};
use rand::Rng;
use rand::{prelude::SmallRng, thread_rng, SeedableRng}; use rand::{prelude::SmallRng, thread_rng, SeedableRng};
use rand_distr::Pareto; use rand_distr::Pareto;
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
@ -126,7 +127,9 @@ pub fn run_worker_thread(
} }
} }
let additional_request = create_connect_request(generate_transaction_id(&mut rng)); if rng.gen::<f32>() <= config.additional_request_probability {
let additional_request =
create_connect_request(generate_transaction_id(&mut rng));
send_request( send_request(
&mut socket, &mut socket,
@ -134,6 +137,7 @@ pub fn run_worker_thread(
&mut statistics, &mut statistics,
additional_request, additional_request,
); );
}
update_shared_statistics(&state, &mut statistics); update_shared_statistics(&state, &mut statistics);
} }
@ -173,27 +177,27 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker
state state
.statistics .statistics
.requests .requests
.fetch_add(statistics.requests, Ordering::SeqCst); .fetch_add(statistics.requests, Ordering::Relaxed);
state state
.statistics .statistics
.responses_connect .responses_connect
.fetch_add(statistics.responses_connect, Ordering::SeqCst); .fetch_add(statistics.responses_connect, Ordering::Relaxed);
state state
.statistics .statistics
.responses_announce .responses_announce
.fetch_add(statistics.responses_announce, Ordering::SeqCst); .fetch_add(statistics.responses_announce, Ordering::Relaxed);
state state
.statistics .statistics
.responses_scrape .responses_scrape
.fetch_add(statistics.responses_scrape, Ordering::SeqCst); .fetch_add(statistics.responses_scrape, Ordering::Relaxed);
state state
.statistics .statistics
.responses_error .responses_error
.fetch_add(statistics.responses_error, Ordering::SeqCst); .fetch_add(statistics.responses_error, Ordering::Relaxed);
state state
.statistics .statistics
.response_peers .response_peers
.fetch_add(statistics.response_peers, Ordering::SeqCst); .fetch_add(statistics.response_peers, Ordering::Relaxed);
*statistics = SocketWorkerLocalStatistics::default(); *statistics = SocketWorkerLocalStatistics::default();
} }