From f6a33dac8a8ed2207b95c631db02ba1e2b895269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 00:29:41 +0100 Subject: [PATCH 1/6] udp load test: enable tuning additional request frequency --- aquatic_udp_load_test/src/config.rs | 4 ++++ aquatic_udp_load_test/src/network.rs | 18 +++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index cbd1176..456be0b 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -18,6 +18,9 @@ pub struct Config { pub workers: u8, /// Run duration (quit and generate report after this many seconds) pub duration: usize, + /// Probability that an additional connect request will be sent for each + /// mio event + pub additional_request_probability: f32, pub network: NetworkConfig, pub handler: HandlerConfig, #[cfg(feature = "cpu-pinning")] @@ -88,6 +91,7 @@ impl Default for Config { log_level: LogLevel::Error, workers: 1, duration: 0, + additional_request_probability: 0.1, network: NetworkConfig::default(), handler: HandlerConfig::default(), #[cfg(feature = "cpu-pinning")] diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index f6b08dc..a6ee6cf 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -4,6 +4,7 @@ use std::sync::atomic::Ordering; use std::time::Duration; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; +use rand::Rng; use rand::{prelude::SmallRng, thread_rng, SeedableRng}; use rand_distr::Pareto; use socket2::{Domain, Protocol, Socket, Type}; @@ -126,14 +127,17 @@ pub fn run_worker_thread( } } - let additional_request = create_connect_request(generate_transaction_id(&mut rng)); + if rng.gen::() <= config.additional_request_probability { + let additional_request = + create_connect_request(generate_transaction_id(&mut rng)); - send_request( - &mut socket, - &mut buffer, - &mut statistics, - additional_request, - ); + send_request( + &mut socket, + &mut buffer, + &mut statistics, + additional_request, + ); + } update_shared_statistics(&state, &mut statistics); } From 039eeae160b13ca2eda8ec9c2663735931ddbb26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 00:49:32 +0100 Subject: [PATCH 2/6] udo load test: report avg for all response types --- aquatic_udp_load_test/src/main.rs | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 3a7651c..52eaa87 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -103,7 +103,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { fn monitor_statistics(state: LoadTestState, config: &Config) { let start_time = Instant::now(); - let mut report_avg_response_vec: Vec = Vec::new(); + let duration = Duration::from_secs(config.duration as u64); + + let mut report_avg_connect: Vec = Vec::new(); + let mut report_avg_announce: Vec = Vec::new(); + let mut report_avg_scrape: Vec = Vec::new(); + let mut report_avg_error: Vec = Vec::new(); let interval = 5; let interval_f64 = interval as f64; @@ -133,7 +138,10 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { + responses_scrape_per_second + responses_error_per_second; - report_avg_response_vec.push(responses_per_second); + report_avg_connect.push(responses_connect_per_second); + report_avg_announce.push(responses_announce_per_second); + report_avg_scrape.push(responses_scrape_per_second); + report_avg_error.push(responses_error_per_second); println!(); println!("Requests out: {:.2}/second", requests_per_second); @@ -154,12 +162,16 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { ); let time_elapsed = start_time.elapsed(); - let duration = Duration::from_secs(config.duration as u64); if config.duration != 0 && time_elapsed >= duration { - let report_len = report_avg_response_vec.len() as f64; - let report_sum: f64 = report_avg_response_vec.into_iter().sum(); - let report_avg: f64 = report_sum / report_len; + let len = report_avg_connect.len() as f64; + + let avg_connect: f64 = report_avg_connect.into_iter().sum::() / len; + let avg_announce: f64 = report_avg_announce.into_iter().sum::() / len; + let avg_scrape: f64 = report_avg_scrape.into_iter().sum::() / len; + let avg_error: f64 = report_avg_error.into_iter().sum::() / len; + + let avg_total = avg_connect + avg_announce + avg_scrape + avg_error; println!( concat!( @@ -168,9 +180,13 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { "Average responses per second: {:.2}\n\nConfig: {:#?}\n" ), time_elapsed.as_secs(), - report_avg, + avg_total, config ); + println!(" - Connect responses: {:.2}", avg_connect); + println!(" - Announce responses: {:.2}", avg_announce); + println!(" - Scrape responses: {:.2}", avg_scrape); + println!(" - Error responses: {:.2}", avg_error); break; } From b70cbe63495025f7ffc49c128935296ffa25ab8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 01:01:49 +0100 Subject: [PATCH 3/6] udp load test: use relaxed ordering, more accurate stats, improve code --- aquatic_udp_load_test/src/main.rs | 80 +++++++++++++++------------- aquatic_udp_load_test/src/network.rs | 12 ++--- 2 files changed, 48 insertions(+), 44 deletions(-) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 52eaa87..cb1131e 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -1,4 +1,5 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; @@ -102,63 +103,62 @@ fn run(config: Config) -> ::anyhow::Result<()> { } fn monitor_statistics(state: LoadTestState, config: &Config) { - let start_time = Instant::now(); - let duration = Duration::from_secs(config.duration as u64); - let mut report_avg_connect: Vec = Vec::new(); let mut report_avg_announce: Vec = Vec::new(); let mut report_avg_scrape: Vec = Vec::new(); let mut report_avg_error: Vec = Vec::new(); let interval = 5; - let interval_f64 = interval as f64; + + let start_time = Instant::now(); + let duration = Duration::from_secs(config.duration as u64); + + let mut last = start_time; loop { thread::sleep(Duration::from_secs(interval)); - let statistics = state.statistics.as_ref(); + let requests = fetch_and_reset(&state.statistics.requests); + let response_peers = fetch_and_reset(&state.statistics.response_peers); + let responses_connect = fetch_and_reset(&state.statistics.responses_connect); + let responses_announce = fetch_and_reset(&state.statistics.responses_announce); + let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape); + let responses_error = fetch_and_reset(&state.statistics.responses_error); - let responses_announce = - statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64; - let response_peers = statistics.response_peers.fetch_and(0, Ordering::SeqCst) as f64; + let now = Instant::now(); - let requests_per_second = - statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_connect_per_second = - statistics.responses_connect.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_scrape_per_second = - statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_error_per_second = - statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let elapsed = (now - last).as_secs_f64(); - let responses_announce_per_second = responses_announce / interval_f64; + last = now; - let responses_per_second = responses_connect_per_second - + responses_announce_per_second - + responses_scrape_per_second - + responses_error_per_second; + let peers_per_announce_response = response_peers / responses_announce; - report_avg_connect.push(responses_connect_per_second); - report_avg_announce.push(responses_announce_per_second); - report_avg_scrape.push(responses_scrape_per_second); - report_avg_error.push(responses_error_per_second); + let avg_requests = requests / elapsed; + let avg_responses_connect = responses_connect / elapsed; + let avg_responses_announce = responses_announce / elapsed; + let avg_responses_scrape = responses_scrape / elapsed; + let avg_responses_error = responses_error / elapsed; + + let avg_responses = avg_responses_connect + + avg_responses_announce + + avg_responses_scrape + + avg_responses_error; + + report_avg_connect.push(avg_responses_connect); + report_avg_announce.push(avg_responses_announce); + report_avg_scrape.push(avg_responses_scrape); + report_avg_error.push(avg_responses_error); println!(); - println!("Requests out: {:.2}/second", requests_per_second); - println!("Responses in: {:.2}/second", responses_per_second); - println!( - " - Connect responses: {:.2}", - responses_connect_per_second - ); - println!( - " - Announce responses: {:.2}", - responses_announce_per_second - ); - println!(" - Scrape responses: {:.2}", responses_scrape_per_second); - println!(" - Error responses: {:.2}", responses_error_per_second); + println!("Requests out: {:.2}/second", avg_requests); + println!("Responses in: {:.2}/second", avg_responses); + println!(" - Connect responses: {:.2}", avg_responses_connect); + println!(" - Announce responses: {:.2}", avg_responses_announce); + println!(" - Scrape responses: {:.2}", avg_responses_scrape); + println!(" - Error responses: {:.2}", avg_responses_error); println!( "Peers per announce response: {:.2}", - response_peers / responses_announce + peers_per_announce_response ); let time_elapsed = start_time.elapsed(); @@ -192,3 +192,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { } } } + +fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { + atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 +} diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index a6ee6cf..3a61804 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -177,27 +177,27 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker state .statistics .requests - .fetch_add(statistics.requests, Ordering::SeqCst); + .fetch_add(statistics.requests, Ordering::Relaxed); state .statistics .responses_connect - .fetch_add(statistics.responses_connect, Ordering::SeqCst); + .fetch_add(statistics.responses_connect, Ordering::Relaxed); state .statistics .responses_announce - .fetch_add(statistics.responses_announce, Ordering::SeqCst); + .fetch_add(statistics.responses_announce, Ordering::Relaxed); state .statistics .responses_scrape - .fetch_add(statistics.responses_scrape, Ordering::SeqCst); + .fetch_add(statistics.responses_scrape, Ordering::Relaxed); state .statistics .responses_error - .fetch_add(statistics.responses_error, Ordering::SeqCst); + .fetch_add(statistics.responses_error, Ordering::Relaxed); state .statistics .response_peers - .fetch_add(statistics.response_peers, Ordering::SeqCst); + .fetch_add(statistics.response_peers, Ordering::Relaxed); *statistics = SocketWorkerLocalStatistics::default(); } From 6605055b3d34a240951538f76e2fd8d516305d7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 01:05:52 +0100 Subject: [PATCH 4/6] udp load test: improve statistics code --- aquatic_udp_load_test/src/main.rs | 54 +++++++++++++++---------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index cb1131e..c677477 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -115,7 +115,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let mut last = start_time; - loop { + let time_elapsed = loop { thread::sleep(Duration::from_secs(interval)); let requests = fetch_and_reset(&state.statistics.requests); @@ -164,33 +164,33 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let time_elapsed = start_time.elapsed(); if config.duration != 0 && time_elapsed >= duration { - let len = report_avg_connect.len() as f64; - - let avg_connect: f64 = report_avg_connect.into_iter().sum::() / len; - let avg_announce: f64 = report_avg_announce.into_iter().sum::() / len; - let avg_scrape: f64 = report_avg_scrape.into_iter().sum::() / len; - let avg_error: f64 = report_avg_error.into_iter().sum::() / len; - - let avg_total = avg_connect + avg_announce + avg_scrape + avg_error; - - println!( - concat!( - "\n# aquatic load test report\n\n", - "Test ran for {} seconds.\n", - "Average responses per second: {:.2}\n\nConfig: {:#?}\n" - ), - time_elapsed.as_secs(), - avg_total, - config - ); - println!(" - Connect responses: {:.2}", avg_connect); - println!(" - Announce responses: {:.2}", avg_announce); - println!(" - Scrape responses: {:.2}", avg_scrape); - println!(" - Error responses: {:.2}", avg_error); - - break; + break time_elapsed; } - } + }; + + let len = report_avg_connect.len() as f64; + + let avg_connect: f64 = report_avg_connect.into_iter().sum::() / len; + let avg_announce: f64 = report_avg_announce.into_iter().sum::() / len; + let avg_scrape: f64 = report_avg_scrape.into_iter().sum::() / len; + let avg_error: f64 = report_avg_error.into_iter().sum::() / len; + + let avg_total = avg_connect + avg_announce + avg_scrape + avg_error; + + println!( + concat!( + "\n# aquatic load test report\n\n", + "Test ran for {} seconds.\n", + "Average responses per second: {:.2}\n\nConfig: {:#?}\n" + ), + time_elapsed.as_secs(), + avg_total, + config + ); + println!(" - Connect responses: {:.2}", avg_connect); + println!(" - Announce responses: {:.2}", avg_announce); + println!(" - Scrape responses: {:.2}", avg_scrape); + println!(" - Error responses: {:.2}", avg_error); } fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { From 14474fb5e3a7c26613c782b1b9cda684de87e6d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 01:10:42 +0100 Subject: [PATCH 5/6] udp load test: improve statistics printing code --- aquatic_udp_load_test/src/main.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index c677477..957fcbc 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -177,20 +177,18 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let avg_total = avg_connect + avg_announce + avg_scrape + avg_error; - println!( - concat!( - "\n# aquatic load test report\n\n", - "Test ran for {} seconds.\n", - "Average responses per second: {:.2}\n\nConfig: {:#?}\n" - ), - time_elapsed.as_secs(), - avg_total, - config - ); + println!(); + println!("# aquatic load test report"); + println!(); + println!("Test ran for {} seconds", time_elapsed.as_secs()); + println!("Average responses per second: {:.2}", avg_total); println!(" - Connect responses: {:.2}", avg_connect); println!(" - Announce responses: {:.2}", avg_announce); println!(" - Scrape responses: {:.2}", avg_scrape); println!(" - Error responses: {:.2}", avg_error); + println!(); + println!("Config: {:#?}", config); + println!(); } fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { From 53b46d24f22b949e79130be1aa7620731d51b37d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 03:51:39 +0100 Subject: [PATCH 6/6] udp load test: set default additional_request_probability to 0.5 --- aquatic_udp_load_test/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index 456be0b..cc63900 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -91,7 +91,7 @@ impl Default for Config { log_level: LogLevel::Error, workers: 1, duration: 0, - additional_request_probability: 0.1, + additional_request_probability: 0.5, network: NetworkConfig::default(), handler: HandlerConfig::default(), #[cfg(feature = "cpu-pinning")]