diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 0ec7d37..7cdbb7b 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -6,7 +6,9 @@ use std::{ time::Duration, }; -use aquatic_ws_protocol::{InMessage, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, InfoHash}; +use aquatic_ws_protocol::{ + InMessage, InfoHash, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, +}; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; use futures_rustls::{client::TlsStream, TlsConnector}; @@ -23,6 +25,8 @@ pub async fn run_socket_thread( ) -> anyhow::Result<()> { let config = Rc::new(config); let num_active_connections = Rc::new(RefCell::new(0usize)); + let connection_creation_interval = + Duration::from_millis(config.connection_creation_interval_ms); TimerActionRepeat::repeat(move || { periodically_open_connections( @@ -30,12 +34,12 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + connection_creation_interval, ) - }); - - futures::future::pending::().await; - - Ok(()) + }) + .join() + .await + .ok_or_else(|| anyhow::anyhow!("connection opener timer cancelled")) } async fn periodically_open_connections( @@ -43,9 +47,8 @@ async fn periodically_open_connections( tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + connection_creation_interval: Duration, ) -> Option { - let wait = Duration::from_millis(config.connection_creation_interval_ms); - if *num_active_connections.borrow() < config.num_connections_per_worker { spawn_local(async move { if let Err(err) = @@ -57,16 +60,15 @@ async fn periodically_open_connections( .detach(); } - Some(wait) + Some(connection_creation_interval) } struct Connection { config: Rc, load_test_state: LoadTestState, rng: SmallRng, - can_send: bool, peer_id: PeerId, - send_answer: Option<(InfoHash, PeerId, OfferId)>, + can_send_answer: Option<(InfoHash, PeerId, OfferId)>, stream: WebSocketStream>, } @@ -99,9 +101,8 @@ impl Connection { load_test_state, rng, stream, - can_send: true, peer_id, - send_answer: None, + can_send_answer: None, }; *num_active_connections.borrow_mut() += 1; @@ -119,51 +120,52 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { - if self.can_send { - let request = create_random_request( - &self.config, - &self.load_test_state, - &mut self.rng, - self.peer_id, - self.send_answer.is_none(), - ); - - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((info_hash, peer_id, offer_id)) = self.send_answer { - r.info_hash = info_hash; - r.answer_to_peer_id = Some(peer_id); - r.answer_offer_id = Some(offer_id); - r.answer = Some(RtcAnswer { - t: RtcAnswerType::Answer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }); - r.event = None; - r.offers = None; - } - - InMessage::AnnounceRequest(r) - } else { - request - }; - - self.send_answer = None; - - self.stream.send(request.to_ws_message()).await?; - - self.load_test_state - .statistics - .requests - .fetch_add(1, Ordering::Relaxed); - - self.can_send = false; - } - + self.send_message().await?; self.read_message().await?; } } + async fn send_message(&mut self) -> anyhow::Result<()> { + let request = create_random_request( + &self.config, + &self.load_test_state, + &mut self.rng, + self.peer_id, + self.can_send_answer.is_none(), + ); + + // If self.send_answer is set and request is announce request, make + // the request an offer answer + let request = if let InMessage::AnnounceRequest(mut r) = request { + if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { + r.info_hash = info_hash; + r.answer_to_peer_id = Some(peer_id); + r.answer_offer_id = Some(offer_id); + r.answer = Some(RtcAnswer { + t: RtcAnswerType::Answer, + sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() + }); + r.event = None; + r.offers = None; + } + + self.can_send_answer = None; + + InMessage::AnnounceRequest(r) + } else { + request + }; + + self.stream.send(request.to_ws_message()).await?; + + self.load_test_state + .statistics + .requests + .fetch_add(1, Ordering::Relaxed); + + Ok(()) + } + async fn read_message(&mut self) -> anyhow::Result<()> { let message = match self .stream @@ -191,33 +193,25 @@ impl Connection { .responses_offer .fetch_add(1, Ordering::Relaxed); - self.send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id)); - - self.can_send = true; + self.can_send_answer = Some((offer.info_hash, offer.peer_id, offer.offer_id)); } Ok(OutMessage::AnswerOutMessage(_)) => { self.load_test_state .statistics .responses_answer .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::AnnounceResponse(_)) => { self.load_test_state .statistics .responses_announce .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::ScrapeResponse(_)) => { self.load_test_state .statistics .responses_scrape .fetch_add(1, Ordering::Relaxed); - - self.can_send = true; } Ok(OutMessage::ErrorResponse(response)) => { self.load_test_state @@ -226,8 +220,6 @@ impl Connection { .fetch_add(1, Ordering::Relaxed); ::log::warn!("received error response: {:?}", response.failure_reason); - - self.can_send = true; } Err(err) => { ::log::error!("error deserializing message: {:#}", err);