diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 288f484..ceecacb 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -13,6 +13,7 @@ pub struct Statistics { pub responses_answer: AtomicUsize, pub responses_scrape: AtomicUsize, pub responses_error: AtomicUsize, + pub connections: AtomicUsize, } #[derive(Clone)] diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 67a4a00..aef8648 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -128,24 +128,27 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let statistics = state.statistics.as_ref(); - let responses_announce = - statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64; + let responses_announce = statistics + .responses_announce + .fetch_and(0, Ordering::Relaxed) as f64; // let response_peers = statistics.response_peers - // .fetch_and(0, Ordering::SeqCst) as f64; + // .fetch_and(0, Ordering::Relaxed) as f64; let requests_per_second = - statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.requests.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_offer_per_second = - statistics.responses_offer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.responses_offer.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_answer_per_second = - statistics.responses_answer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.responses_answer.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_scrape_per_second = - statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.responses_scrape.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_error_per_second = - statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.responses_error.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_announce_per_second = responses_announce / interval_f64; + let connections = statistics.connections.load(Ordering::Relaxed); + let responses_per_second = responses_announce_per_second + responses_offer_per_second + responses_answer_per_second @@ -165,6 +168,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { println!(" - Answer responses: {:.2}", responses_answer_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second); println!(" - Error responses: {:.2}", responses_error_per_second); + println!("Active connections: {}", connections); let time_elapsed = start_time.elapsed(); let duration = Duration::from_secs(config.duration as u64); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index b537eab..b8b4c8d 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -92,6 +92,8 @@ impl Connection { ); let (stream, _) = client_async(request, stream).await?; + let statistics = load_test_state.statistics.clone(); + let mut connection = Connection { config, load_test_state, @@ -103,12 +105,14 @@ impl Connection { }; *num_active_connections.borrow_mut() += 1; + statistics.connections.fetch_add(1, Ordering::Relaxed); if let Err(err) = connection.run_connection_loop().await { ::log::info!("connection error: {:#}", err); } *num_active_connections.borrow_mut() -= 1; + statistics.connections.fetch_sub(1, Ordering::Relaxed); Ok(()) } @@ -149,7 +153,7 @@ impl Connection { self.load_test_state .statistics .requests - .fetch_add(1, Ordering::SeqCst); + .fetch_add(1, Ordering::Relaxed); self.can_send = false; } @@ -183,7 +187,7 @@ impl Connection { self.load_test_state .statistics .responses_offer - .fetch_add(1, Ordering::SeqCst); + .fetch_add(1, Ordering::Relaxed); self.send_answer = Some((offer.peer_id, offer.offer_id)); @@ -193,7 +197,7 @@ impl Connection { self.load_test_state .statistics .responses_answer - .fetch_add(1, Ordering::SeqCst); + .fetch_add(1, Ordering::Relaxed); self.can_send = true; } @@ -201,7 +205,7 @@ impl Connection { self.load_test_state .statistics .responses_announce - .fetch_add(1, Ordering::SeqCst); + .fetch_add(1, Ordering::Relaxed); self.can_send = true; } @@ -209,7 +213,7 @@ impl Connection { self.load_test_state .statistics .responses_scrape - .fetch_add(1, Ordering::SeqCst); + .fetch_add(1, Ordering::Relaxed); self.can_send = true; } @@ -217,7 +221,7 @@ impl Connection { self.load_test_state .statistics .responses_error - .fetch_add(1, Ordering::SeqCst); + .fetch_add(1, Ordering::Relaxed); ::log::warn!("received error response: {:?}", response.failure_reason);