mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
Merge pull request #118 from greatest-ape/work-2023-01-25
ws load test: use relaxed atomics, provide stats on number of active connections
This commit is contained in:
commit
61ae6dd7e7
3 changed files with 23 additions and 14 deletions
|
|
@ -13,6 +13,7 @@ pub struct Statistics {
|
||||||
pub responses_answer: AtomicUsize,
|
pub responses_answer: AtomicUsize,
|
||||||
pub responses_scrape: AtomicUsize,
|
pub responses_scrape: AtomicUsize,
|
||||||
pub responses_error: AtomicUsize,
|
pub responses_error: AtomicUsize,
|
||||||
|
pub connections: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
|
||||||
|
|
@ -128,24 +128,27 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
|
||||||
|
|
||||||
let statistics = state.statistics.as_ref();
|
let statistics = state.statistics.as_ref();
|
||||||
|
|
||||||
let responses_announce =
|
let responses_announce = statistics
|
||||||
statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64;
|
.responses_announce
|
||||||
|
.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
// let response_peers = statistics.response_peers
|
// let response_peers = statistics.response_peers
|
||||||
// .fetch_and(0, Ordering::SeqCst) as f64;
|
// .fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
|
|
||||||
let requests_per_second =
|
let requests_per_second =
|
||||||
statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
statistics.requests.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
|
||||||
let responses_offer_per_second =
|
let responses_offer_per_second =
|
||||||
statistics.responses_offer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
statistics.responses_offer.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
|
||||||
let responses_answer_per_second =
|
let responses_answer_per_second =
|
||||||
statistics.responses_answer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
statistics.responses_answer.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
|
||||||
let responses_scrape_per_second =
|
let responses_scrape_per_second =
|
||||||
statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
statistics.responses_scrape.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
|
||||||
let responses_error_per_second =
|
let responses_error_per_second =
|
||||||
statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
statistics.responses_error.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
|
||||||
|
|
||||||
let responses_announce_per_second = responses_announce / interval_f64;
|
let responses_announce_per_second = responses_announce / interval_f64;
|
||||||
|
|
||||||
|
let connections = statistics.connections.load(Ordering::Relaxed);
|
||||||
|
|
||||||
let responses_per_second = responses_announce_per_second
|
let responses_per_second = responses_announce_per_second
|
||||||
+ responses_offer_per_second
|
+ responses_offer_per_second
|
||||||
+ responses_answer_per_second
|
+ responses_answer_per_second
|
||||||
|
|
@ -165,6 +168,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
|
||||||
println!(" - Answer responses: {:.2}", responses_answer_per_second);
|
println!(" - Answer responses: {:.2}", responses_answer_per_second);
|
||||||
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
|
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
|
||||||
println!(" - Error responses: {:.2}", responses_error_per_second);
|
println!(" - Error responses: {:.2}", responses_error_per_second);
|
||||||
|
println!("Active connections: {}", connections);
|
||||||
|
|
||||||
let time_elapsed = start_time.elapsed();
|
let time_elapsed = start_time.elapsed();
|
||||||
let duration = Duration::from_secs(config.duration as u64);
|
let duration = Duration::from_secs(config.duration as u64);
|
||||||
|
|
|
||||||
|
|
@ -92,6 +92,8 @@ impl Connection {
|
||||||
);
|
);
|
||||||
let (stream, _) = client_async(request, stream).await?;
|
let (stream, _) = client_async(request, stream).await?;
|
||||||
|
|
||||||
|
let statistics = load_test_state.statistics.clone();
|
||||||
|
|
||||||
let mut connection = Connection {
|
let mut connection = Connection {
|
||||||
config,
|
config,
|
||||||
load_test_state,
|
load_test_state,
|
||||||
|
|
@ -103,12 +105,14 @@ impl Connection {
|
||||||
};
|
};
|
||||||
|
|
||||||
*num_active_connections.borrow_mut() += 1;
|
*num_active_connections.borrow_mut() += 1;
|
||||||
|
statistics.connections.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
if let Err(err) = connection.run_connection_loop().await {
|
if let Err(err) = connection.run_connection_loop().await {
|
||||||
::log::info!("connection error: {:#}", err);
|
::log::info!("connection error: {:#}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
*num_active_connections.borrow_mut() -= 1;
|
*num_active_connections.borrow_mut() -= 1;
|
||||||
|
statistics.connections.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -149,7 +153,7 @@ impl Connection {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.requests
|
.requests
|
||||||
.fetch_add(1, Ordering::SeqCst);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = false;
|
self.can_send = false;
|
||||||
}
|
}
|
||||||
|
|
@ -183,7 +187,7 @@ impl Connection {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_offer
|
.responses_offer
|
||||||
.fetch_add(1, Ordering::SeqCst);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.send_answer = Some((offer.peer_id, offer.offer_id));
|
self.send_answer = Some((offer.peer_id, offer.offer_id));
|
||||||
|
|
||||||
|
|
@ -193,7 +197,7 @@ impl Connection {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_answer
|
.responses_answer
|
||||||
.fetch_add(1, Ordering::SeqCst);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = true;
|
self.can_send = true;
|
||||||
}
|
}
|
||||||
|
|
@ -201,7 +205,7 @@ impl Connection {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_announce
|
.responses_announce
|
||||||
.fetch_add(1, Ordering::SeqCst);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = true;
|
self.can_send = true;
|
||||||
}
|
}
|
||||||
|
|
@ -209,7 +213,7 @@ impl Connection {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_scrape
|
.responses_scrape
|
||||||
.fetch_add(1, Ordering::SeqCst);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = true;
|
self.can_send = true;
|
||||||
}
|
}
|
||||||
|
|
@ -217,7 +221,7 @@ impl Connection {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_error
|
.responses_error
|
||||||
.fetch_add(1, Ordering::SeqCst);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
::log::warn!("received error response: {:?}", response.failure_reason);
|
::log::warn!("received error response: {:?}", response.failure_reason);
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue