diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index cbd1176..cc63900 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -18,6 +18,9 @@ pub struct Config { pub workers: u8, /// Run duration (quit and generate report after this many seconds) pub duration: usize, + /// Probability that an additional connect request will be sent for each + /// mio event + pub additional_request_probability: f32, pub network: NetworkConfig, pub handler: HandlerConfig, #[cfg(feature = "cpu-pinning")] @@ -88,6 +91,7 @@ impl Default for Config { log_level: LogLevel::Error, workers: 1, duration: 0, + additional_request_probability: 0.5, network: NetworkConfig::default(), handler: HandlerConfig::default(), #[cfg(feature = "cpu-pinning")] diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 3a7651c..957fcbc 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -1,4 +1,5 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; @@ -102,77 +103,94 @@ fn run(config: Config) -> ::anyhow::Result<()> { } fn monitor_statistics(state: LoadTestState, config: &Config) { - let start_time = Instant::now(); - let mut report_avg_response_vec: Vec = Vec::new(); + let mut report_avg_connect: Vec = Vec::new(); + let mut report_avg_announce: Vec = Vec::new(); + let mut report_avg_scrape: Vec = Vec::new(); + let mut report_avg_error: Vec = Vec::new(); let interval = 5; - let interval_f64 = interval as f64; - loop { + let start_time = Instant::now(); + let duration = Duration::from_secs(config.duration as u64); + + let mut last = start_time; + + let time_elapsed = loop { thread::sleep(Duration::from_secs(interval)); - let statistics = state.statistics.as_ref(); + let requests = fetch_and_reset(&state.statistics.requests); + let response_peers = fetch_and_reset(&state.statistics.response_peers); + let responses_connect = fetch_and_reset(&state.statistics.responses_connect); + let responses_announce = fetch_and_reset(&state.statistics.responses_announce); + let responses_scrape = fetch_and_reset(&state.statistics.responses_scrape); + let responses_error = fetch_and_reset(&state.statistics.responses_error); - let responses_announce = - statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64; - let response_peers = statistics.response_peers.fetch_and(0, Ordering::SeqCst) as f64; + let now = Instant::now(); - let requests_per_second = - statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_connect_per_second = - statistics.responses_connect.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_scrape_per_second = - statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; - let responses_error_per_second = - statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let elapsed = (now - last).as_secs_f64(); - let responses_announce_per_second = responses_announce / interval_f64; + last = now; - let responses_per_second = responses_connect_per_second - + responses_announce_per_second - + responses_scrape_per_second - + responses_error_per_second; + let peers_per_announce_response = response_peers / responses_announce; - report_avg_response_vec.push(responses_per_second); + let avg_requests = requests / elapsed; + let avg_responses_connect = responses_connect / elapsed; + let avg_responses_announce = responses_announce / elapsed; + let avg_responses_scrape = responses_scrape / elapsed; + let avg_responses_error = responses_error / elapsed; + + let avg_responses = avg_responses_connect + + avg_responses_announce + + avg_responses_scrape + + avg_responses_error; + + report_avg_connect.push(avg_responses_connect); + report_avg_announce.push(avg_responses_announce); + report_avg_scrape.push(avg_responses_scrape); + report_avg_error.push(avg_responses_error); println!(); - println!("Requests out: {:.2}/second", requests_per_second); - println!("Responses in: {:.2}/second", responses_per_second); - println!( - " - Connect responses: {:.2}", - responses_connect_per_second - ); - println!( - " - Announce responses: {:.2}", - responses_announce_per_second - ); - println!(" - Scrape responses: {:.2}", responses_scrape_per_second); - println!(" - Error responses: {:.2}", responses_error_per_second); + println!("Requests out: {:.2}/second", avg_requests); + println!("Responses in: {:.2}/second", avg_responses); + println!(" - Connect responses: {:.2}", avg_responses_connect); + println!(" - Announce responses: {:.2}", avg_responses_announce); + println!(" - Scrape responses: {:.2}", avg_responses_scrape); + println!(" - Error responses: {:.2}", avg_responses_error); println!( "Peers per announce response: {:.2}", - response_peers / responses_announce + peers_per_announce_response ); let time_elapsed = start_time.elapsed(); - let duration = Duration::from_secs(config.duration as u64); if config.duration != 0 && time_elapsed >= duration { - let report_len = report_avg_response_vec.len() as f64; - let report_sum: f64 = report_avg_response_vec.into_iter().sum(); - let report_avg: f64 = report_sum / report_len; - - println!( - concat!( - "\n# aquatic load test report\n\n", - "Test ran for {} seconds.\n", - "Average responses per second: {:.2}\n\nConfig: {:#?}\n" - ), - time_elapsed.as_secs(), - report_avg, - config - ); - - break; + break time_elapsed; } - } + }; + + let len = report_avg_connect.len() as f64; + + let avg_connect: f64 = report_avg_connect.into_iter().sum::() / len; + let avg_announce: f64 = report_avg_announce.into_iter().sum::() / len; + let avg_scrape: f64 = report_avg_scrape.into_iter().sum::() / len; + let avg_error: f64 = report_avg_error.into_iter().sum::() / len; + + let avg_total = avg_connect + avg_announce + avg_scrape + avg_error; + + println!(); + println!("# aquatic load test report"); + println!(); + println!("Test ran for {} seconds", time_elapsed.as_secs()); + println!("Average responses per second: {:.2}", avg_total); + println!(" - Connect responses: {:.2}", avg_connect); + println!(" - Announce responses: {:.2}", avg_announce); + println!(" - Scrape responses: {:.2}", avg_scrape); + println!(" - Error responses: {:.2}", avg_error); + println!(); + println!("Config: {:#?}", config); + println!(); +} + +fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { + atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 } diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index f6b08dc..3a61804 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -4,6 +4,7 @@ use std::sync::atomic::Ordering; use std::time::Duration; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; +use rand::Rng; use rand::{prelude::SmallRng, thread_rng, SeedableRng}; use rand_distr::Pareto; use socket2::{Domain, Protocol, Socket, Type}; @@ -126,14 +127,17 @@ pub fn run_worker_thread( } } - let additional_request = create_connect_request(generate_transaction_id(&mut rng)); + if rng.gen::() <= config.additional_request_probability { + let additional_request = + create_connect_request(generate_transaction_id(&mut rng)); - send_request( - &mut socket, - &mut buffer, - &mut statistics, - additional_request, - ); + send_request( + &mut socket, + &mut buffer, + &mut statistics, + additional_request, + ); + } update_shared_statistics(&state, &mut statistics); } @@ -173,27 +177,27 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker state .statistics .requests - .fetch_add(statistics.requests, Ordering::SeqCst); + .fetch_add(statistics.requests, Ordering::Relaxed); state .statistics .responses_connect - .fetch_add(statistics.responses_connect, Ordering::SeqCst); + .fetch_add(statistics.responses_connect, Ordering::Relaxed); state .statistics .responses_announce - .fetch_add(statistics.responses_announce, Ordering::SeqCst); + .fetch_add(statistics.responses_announce, Ordering::Relaxed); state .statistics .responses_scrape - .fetch_add(statistics.responses_scrape, Ordering::SeqCst); + .fetch_add(statistics.responses_scrape, Ordering::Relaxed); state .statistics .responses_error - .fetch_add(statistics.responses_error, Ordering::SeqCst); + .fetch_add(statistics.responses_error, Ordering::Relaxed); state .statistics .response_peers - .fetch_add(statistics.response_peers, Ordering::SeqCst); + .fetch_add(statistics.response_peers, Ordering::Relaxed); *statistics = SocketWorkerLocalStatistics::default(); }