From b70cbe63495025f7ffc49c128935296ffa25ab8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 01:01:49 +0100 Subject: [PATCH] udp load test: use relaxed ordering, more accurate stats, improve code --- aquatic_udp_load_test/src/main.rs | 80 +++++++++++++++------------- aquatic_udp_load_test/src/network.rs | 12 ++--- 2 files changed, 48 insertions(+), 44 deletions(-) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 52eaa87..cb1131e 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -1,4 +1,5 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; @@ -102,63 +103,62 @@ fn run(config: Config) -> ::anyhow::Result<()> { } fn monitor_statistics(state: LoadTestState, config: &Config) { - let start_time = Instant::now(); - let duration = Duration::from_secs(config.duration as u64); - let mut report_avg_connect: Vec = Vec::new(); let mut report_avg_announce: Vec = Vec::new(); let mut report_avg_scrape: Vec = Vec::new(); let mut report_avg_error: Vec = Vec::new(); let interval = 5; - let interval_f64 = interval as f64; + + let start_time = Instant::now(); + let duration = Duration::from_secs(config.duration as u64); + + let mut last = start_time; loop { thread::sleep(Duration::from_secs(interval)); - let statistics = state.statistics.as_ref(); + let requests = fetch_and_reset(&state.statistics.requests); + let response_peers = fetch_and_reset(&state.statistics.response_peers); + let responses_connect = fetch_and_reset(&state.statistics.responses_connect); + let responses_announce = fetch_and_reset(&state.statistics.responses_announce); + let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape); + let responses_error = fetch_and_reset(&state.statistics.responses_error); - let responses_announce = - statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64; - let response_peers = statistics.response_peers.fetch_and(0, Ordering::SeqCst) as f64; + let now = Instant::now(); - let requests_per_second = - statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_connect_per_second = - statistics.responses_connect.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_scrape_per_second = - statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_error_per_second = - statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let elapsed = (now - last).as_secs_f64(); - let responses_announce_per_second = responses_announce / interval_f64; + last = now; - let responses_per_second = responses_connect_per_second - + responses_announce_per_second - + responses_scrape_per_second - + responses_error_per_second; + let peers_per_announce_response = response_peers / responses_announce; - report_avg_connect.push(responses_connect_per_second); - report_avg_announce.push(responses_announce_per_second); - report_avg_scrape.push(responses_scrape_per_second); - report_avg_error.push(responses_error_per_second); + let avg_requests = requests / elapsed; + let avg_responses_connect = responses_connect / elapsed; + let avg_responses_announce = responses_announce / elapsed; + let avg_responses_scrape = responses_scrape / elapsed; + let avg_responses_error = responses_error / elapsed; + + let avg_responses = avg_responses_connect + + avg_responses_announce + + avg_responses_scrape + + avg_responses_error; + + report_avg_connect.push(avg_responses_connect); + report_avg_announce.push(avg_responses_announce); + report_avg_scrape.push(avg_responses_scrape); + report_avg_error.push(avg_responses_error); println!(); - println!("Requests out: {:.2}/second", requests_per_second); - println!("Responses in: {:.2}/second", responses_per_second); - println!( - " - Connect responses: {:.2}", - responses_connect_per_second - ); - println!( - " - Announce responses: {:.2}", - responses_announce_per_second - ); - println!(" - Scrape responses: {:.2}", responses_scrape_per_second); - println!(" - Error responses: {:.2}", responses_error_per_second); + println!("Requests out: {:.2}/second", avg_requests); + println!("Responses in: {:.2}/second", avg_responses); + println!(" - Connect responses: {:.2}", avg_responses_connect); + println!(" - Announce responses: {:.2}", avg_responses_announce); + println!(" - Scrape responses: {:.2}", avg_responses_scrape); + println!(" - Error responses: {:.2}", avg_responses_error); println!( "Peers per announce response: {:.2}", - response_peers / responses_announce + peers_per_announce_response ); let time_elapsed = start_time.elapsed(); @@ -192,3 +192,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { } } } + +fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { + atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 +} diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index a6ee6cf..3a61804 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -177,27 +177,27 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker state .statistics .requests - .fetch_add(statistics.requests, Ordering::SeqCst); + .fetch_add(statistics.requests, Ordering::Relaxed); state .statistics .responses_connect - .fetch_add(statistics.responses_connect, Ordering::SeqCst); + .fetch_add(statistics.responses_connect, Ordering::Relaxed); state .statistics .responses_announce - .fetch_add(statistics.responses_announce, Ordering::SeqCst); + .fetch_add(statistics.responses_announce, Ordering::Relaxed); state .statistics .responses_scrape - .fetch_add(statistics.responses_scrape, Ordering::SeqCst); + .fetch_add(statistics.responses_scrape, Ordering::Relaxed); state .statistics .responses_error - .fetch_add(statistics.responses_error, Ordering::SeqCst); + .fetch_add(statistics.responses_error, Ordering::Relaxed); state .statistics .response_peers - .fetch_add(statistics.response_peers, Ordering::SeqCst); + .fetch_add(statistics.response_peers, Ordering::Relaxed); *statistics = SocketWorkerLocalStatistics::default(); }