udp load test: use relaxed ordering, more accurate stats, improve code

This commit is contained in:
Joakim Frostegård 2021-11-28 01:01:49 +01:00
parent 039eeae160
commit b70cbe6349
2 changed files with 48 additions and 44 deletions

View file

@ -1,4 +1,5 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize;
use std::sync::{atomic::Ordering, Arc};
use std::thread;
use std::time::{Duration, Instant};
@ -102,63 +103,62 @@ fn run(config: Config) -> ::anyhow::Result<()> {
}
fn monitor_statistics(state: LoadTestState, config: &Config) {
let start_time = Instant::now();
let duration = Duration::from_secs(config.duration as u64);
let mut report_avg_connect: Vec<f64> = Vec::new();
let mut report_avg_announce: Vec<f64> = Vec::new();
let mut report_avg_scrape: Vec<f64> = Vec::new();
let mut report_avg_error: Vec<f64> = Vec::new();
let interval = 5;
let interval_f64 = interval as f64;
let start_time = Instant::now();
let duration = Duration::from_secs(config.duration as u64);
let mut last = start_time;
loop {
thread::sleep(Duration::from_secs(interval));
let statistics = state.statistics.as_ref();
let requests = fetch_and_reset(&state.statistics.requests);
let response_peers = fetch_and_reset(&state.statistics.response_peers);
let responses_connect = fetch_and_reset(&state.statistics.responses_connect);
let responses_announce = fetch_and_reset(&state.statistics.responses_announce);
let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape);
let responses_error = fetch_and_reset(&state.statistics.responses_error);
let responses_announce =
statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64;
let response_peers = statistics.response_peers.fetch_and(0, Ordering::SeqCst) as f64;
let now = Instant::now();
let requests_per_second =
statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_connect_per_second =
statistics.responses_connect.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_scrape_per_second =
statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_error_per_second =
statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let elapsed = (now - last).as_secs_f64();
let responses_announce_per_second = responses_announce / interval_f64;
last = now;
let responses_per_second = responses_connect_per_second
+ responses_announce_per_second
+ responses_scrape_per_second
+ responses_error_per_second;
let peers_per_announce_response = response_peers / responses_announce;
report_avg_connect.push(responses_connect_per_second);
report_avg_announce.push(responses_announce_per_second);
report_avg_scrape.push(responses_scrape_per_second);
report_avg_error.push(responses_error_per_second);
let avg_requests = requests / elapsed;
let avg_responses_connect = responses_connect / elapsed;
let avg_responses_announce = responses_announce / elapsed;
let avg_responses_scrape = responses_scrape / elapsed;
let avg_responses_error = responses_error / elapsed;
let avg_responses = avg_responses_connect
+ avg_responses_announce
+ avg_responses_scrape
+ avg_responses_error;
report_avg_connect.push(avg_responses_connect);
report_avg_announce.push(avg_responses_announce);
report_avg_scrape.push(avg_responses_scrape);
report_avg_error.push(avg_responses_error);
println!();
println!("Requests out: {:.2}/second", requests_per_second);
println!("Responses in: {:.2}/second", responses_per_second);
println!(
" - Connect responses: {:.2}",
responses_connect_per_second
);
println!(
" - Announce responses: {:.2}",
responses_announce_per_second
);
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
println!(" - Error responses: {:.2}", responses_error_per_second);
println!("Requests out: {:.2}/second", avg_requests);
println!("Responses in: {:.2}/second", avg_responses);
println!(" - Connect responses: {:.2}", avg_responses_connect);
println!(" - Announce responses: {:.2}", avg_responses_announce);
println!(" - Scrape responses: {:.2}", avg_responses_scrape);
println!(" - Error responses: {:.2}", avg_responses_error);
println!(
"Peers per announce response: {:.2}",
response_peers / responses_announce
peers_per_announce_response
);
let time_elapsed = start_time.elapsed();
@ -192,3 +192,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
}
}
}
fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 {
atomic_usize.fetch_and(0, Ordering::Relaxed) as f64
}

View file

@ -177,27 +177,27 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker
state
.statistics
.requests
.fetch_add(statistics.requests, Ordering::SeqCst);
.fetch_add(statistics.requests, Ordering::Relaxed);
state
.statistics
.responses_connect
.fetch_add(statistics.responses_connect, Ordering::SeqCst);
.fetch_add(statistics.responses_connect, Ordering::Relaxed);
state
.statistics
.responses_announce
.fetch_add(statistics.responses_announce, Ordering::SeqCst);
.fetch_add(statistics.responses_announce, Ordering::Relaxed);
state
.statistics
.responses_scrape
.fetch_add(statistics.responses_scrape, Ordering::SeqCst);
.fetch_add(statistics.responses_scrape, Ordering::Relaxed);
state
.statistics
.responses_error
.fetch_add(statistics.responses_error, Ordering::SeqCst);
.fetch_add(statistics.responses_error, Ordering::Relaxed);
state
.statistics
.response_peers
.fetch_add(statistics.response_peers, Ordering::SeqCst);
.fetch_add(statistics.response_peers, Ordering::Relaxed);
*statistics = SocketWorkerLocalStatistics::default();
}