mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws load test: clean up, slight code refactor
This commit is contained in:
parent
2279e8390e
commit
64926ba46a
1 changed files with 57 additions and 65 deletions
|
|
@ -6,7 +6,9 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use aquatic_ws_protocol::{InMessage, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, InfoHash};
|
use aquatic_ws_protocol::{
|
||||||
|
InMessage, InfoHash, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType,
|
||||||
|
};
|
||||||
use async_tungstenite::{client_async, WebSocketStream};
|
use async_tungstenite::{client_async, WebSocketStream};
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use futures_rustls::{client::TlsStream, TlsConnector};
|
use futures_rustls::{client::TlsStream, TlsConnector};
|
||||||
|
|
@ -23,6 +25,8 @@ pub async fn run_socket_thread(
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let config = Rc::new(config);
|
let config = Rc::new(config);
|
||||||
let num_active_connections = Rc::new(RefCell::new(0usize));
|
let num_active_connections = Rc::new(RefCell::new(0usize));
|
||||||
|
let connection_creation_interval =
|
||||||
|
Duration::from_millis(config.connection_creation_interval_ms);
|
||||||
|
|
||||||
TimerActionRepeat::repeat(move || {
|
TimerActionRepeat::repeat(move || {
|
||||||
periodically_open_connections(
|
periodically_open_connections(
|
||||||
|
|
@ -30,12 +34,12 @@ pub async fn run_socket_thread(
|
||||||
tls_config.clone(),
|
tls_config.clone(),
|
||||||
load_test_state.clone(),
|
load_test_state.clone(),
|
||||||
num_active_connections.clone(),
|
num_active_connections.clone(),
|
||||||
|
connection_creation_interval,
|
||||||
)
|
)
|
||||||
});
|
})
|
||||||
|
.join()
|
||||||
futures::future::pending::<bool>().await;
|
.await
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("connection opener timer cancelled"))
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn periodically_open_connections(
|
async fn periodically_open_connections(
|
||||||
|
|
@ -43,9 +47,8 @@ async fn periodically_open_connections(
|
||||||
tls_config: Arc<rustls::ClientConfig>,
|
tls_config: Arc<rustls::ClientConfig>,
|
||||||
load_test_state: LoadTestState,
|
load_test_state: LoadTestState,
|
||||||
num_active_connections: Rc<RefCell<usize>>,
|
num_active_connections: Rc<RefCell<usize>>,
|
||||||
|
connection_creation_interval: Duration,
|
||||||
) -> Option<Duration> {
|
) -> Option<Duration> {
|
||||||
let wait = Duration::from_millis(config.connection_creation_interval_ms);
|
|
||||||
|
|
||||||
if *num_active_connections.borrow() < config.num_connections_per_worker {
|
if *num_active_connections.borrow() < config.num_connections_per_worker {
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
if let Err(err) =
|
if let Err(err) =
|
||||||
|
|
@ -57,16 +60,15 @@ async fn periodically_open_connections(
|
||||||
.detach();
|
.detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(wait)
|
Some(connection_creation_interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Connection {
|
struct Connection {
|
||||||
config: Rc<Config>,
|
config: Rc<Config>,
|
||||||
load_test_state: LoadTestState,
|
load_test_state: LoadTestState,
|
||||||
rng: SmallRng,
|
rng: SmallRng,
|
||||||
can_send: bool,
|
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
send_answer: Option<(InfoHash, PeerId, OfferId)>,
|
can_send_answer: Option<(InfoHash, PeerId, OfferId)>,
|
||||||
stream: WebSocketStream<TlsStream<TcpStream>>,
|
stream: WebSocketStream<TlsStream<TcpStream>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,9 +101,8 @@ impl Connection {
|
||||||
load_test_state,
|
load_test_state,
|
||||||
rng,
|
rng,
|
||||||
stream,
|
stream,
|
||||||
can_send: true,
|
|
||||||
peer_id,
|
peer_id,
|
||||||
send_answer: None,
|
can_send_answer: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
*num_active_connections.borrow_mut() += 1;
|
*num_active_connections.borrow_mut() += 1;
|
||||||
|
|
@ -119,19 +120,24 @@ impl Connection {
|
||||||
|
|
||||||
async fn run_connection_loop(&mut self) -> anyhow::Result<()> {
|
async fn run_connection_loop(&mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
if self.can_send {
|
self.send_message().await?;
|
||||||
|
self.read_message().await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_message(&mut self) -> anyhow::Result<()> {
|
||||||
let request = create_random_request(
|
let request = create_random_request(
|
||||||
&self.config,
|
&self.config,
|
||||||
&self.load_test_state,
|
&self.load_test_state,
|
||||||
&mut self.rng,
|
&mut self.rng,
|
||||||
self.peer_id,
|
self.peer_id,
|
||||||
self.send_answer.is_none(),
|
self.can_send_answer.is_none(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// If self.send_answer is set and request is announce request, make
|
// If self.send_answer is set and request is announce request, make
|
||||||
// the request an offer answer
|
// the request an offer answer
|
||||||
let request = if let InMessage::AnnounceRequest(mut r) = request {
|
let request = if let InMessage::AnnounceRequest(mut r) = request {
|
||||||
if let Some((info_hash, peer_id, offer_id)) = self.send_answer {
|
if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer {
|
||||||
r.info_hash = info_hash;
|
r.info_hash = info_hash;
|
||||||
r.answer_to_peer_id = Some(peer_id);
|
r.answer_to_peer_id = Some(peer_id);
|
||||||
r.answer_offer_id = Some(offer_id);
|
r.answer_offer_id = Some(offer_id);
|
||||||
|
|
@ -143,13 +149,13 @@ impl Connection {
|
||||||
r.offers = None;
|
r.offers = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.can_send_answer = None;
|
||||||
|
|
||||||
InMessage::AnnounceRequest(r)
|
InMessage::AnnounceRequest(r)
|
||||||
} else {
|
} else {
|
||||||
request
|
request
|
||||||
};
|
};
|
||||||
|
|
||||||
self.send_answer = None;
|
|
||||||
|
|
||||||
self.stream.send(request.to_ws_message()).await?;
|
self.stream.send(request.to_ws_message()).await?;
|
||||||
|
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
|
|
@ -157,11 +163,7 @@ impl Connection {
|
||||||
.requests
|
.requests
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = false;
|
Ok(())
|
||||||
}
|
|
||||||
|
|
||||||
self.read_message().await?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_message(&mut self) -> anyhow::Result<()> {
|
async fn read_message(&mut self) -> anyhow::Result<()> {
|
||||||
|
|
@ -191,33 +193,25 @@ impl Connection {
|
||||||
.responses_offer
|
.responses_offer
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id));
|
self.can_send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id));
|
||||||
|
|
||||||
self.can_send = true;
|
|
||||||
}
|
}
|
||||||
Ok(OutMessage::AnswerOutMessage(_)) => {
|
Ok(OutMessage::AnswerOutMessage(_)) => {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_answer
|
.responses_answer
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = true;
|
|
||||||
}
|
}
|
||||||
Ok(OutMessage::AnnounceResponse(_)) => {
|
Ok(OutMessage::AnnounceResponse(_)) => {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_announce
|
.responses_announce
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = true;
|
|
||||||
}
|
}
|
||||||
Ok(OutMessage::ScrapeResponse(_)) => {
|
Ok(OutMessage::ScrapeResponse(_)) => {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
.statistics
|
.statistics
|
||||||
.responses_scrape
|
.responses_scrape
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
self.can_send = true;
|
|
||||||
}
|
}
|
||||||
Ok(OutMessage::ErrorResponse(response)) => {
|
Ok(OutMessage::ErrorResponse(response)) => {
|
||||||
self.load_test_state
|
self.load_test_state
|
||||||
|
|
@ -226,8 +220,6 @@ impl Connection {
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
::log::warn!("received error response: {:?}", response.failure_reason);
|
::log::warn!("received error response: {:?}", response.failure_reason);
|
||||||
|
|
||||||
self.can_send = true;
|
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
::log::error!("error deserializing message: {:#}", err);
|
::log::error!("error deserializing message: {:#}", err);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue