diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 7cdbb7b..ee045fa 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -7,7 +7,9 @@ use std::{ }; use aquatic_ws_protocol::{ - InMessage, InfoHash, OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, + AnnounceAction, AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, InMessage, InfoHash, + OfferId, OutMessage, PeerId, RtcAnswer, RtcAnswerType, RtcOffer, RtcOfferType, ScrapeAction, + ScrapeRequest, ScrapeRequestInfoHashes, }; use async_tungstenite::{client_async, WebSocketStream}; use futures::{SinkExt, StreamExt}; @@ -15,8 +17,13 @@ use futures_rustls::{client::TlsStream, TlsConnector}; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; use rand::{prelude::SmallRng, Rng, SeedableRng}; +use rand_distr::{Distribution, WeightedIndex}; -use crate::{common::LoadTestState, config::Config, utils::create_random_request}; +use crate::{ + common::{LoadTestState, RequestType}, + config::Config, + utils::select_info_hash_index, +}; pub async fn run_socket_thread( config: Config, @@ -24,6 +31,7 @@ pub async fn run_socket_thread( load_test_state: LoadTestState, ) -> anyhow::Result<()> { let config = Rc::new(config); + let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let num_active_connections = Rc::new(RefCell::new(0usize)); let connection_creation_interval = Duration::from_millis(config.connection_creation_interval_ms); @@ -34,12 +42,14 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + rng.clone(), connection_creation_interval, ) }) .join() - .await - .ok_or_else(|| anyhow::anyhow!("connection opener timer cancelled")) + .await; + + Ok(()) } async fn periodically_open_connections( @@ -47,12 +57,19 @@ async fn periodically_open_connections( tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, connection_creation_interval: Duration, ) -> Option { if *num_active_connections.borrow() < config.num_connections_per_worker { spawn_local(async move { - if let Err(err) = - Connection::run(config, tls_config, load_test_state, num_active_connections).await + if let Err(err) = Connection::run( + config, + tls_config, + load_test_state, + num_active_connections, + rng, + ) + .await { ::log::info!("connection creation error: {:#}", err); } @@ -66,7 +83,7 @@ async fn periodically_open_connections( struct Connection { config: Rc, load_test_state: LoadTestState, - rng: SmallRng, + rng: Rc>, peer_id: PeerId, can_send_answer: Option<(InfoHash, PeerId, OfferId)>, stream: WebSocketStream>, @@ -78,9 +95,9 @@ impl Connection { tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, ) -> anyhow::Result<()> { - let mut rng = SmallRng::from_entropy(); - let peer_id = PeerId(rng.gen()); + let peer_id = PeerId(rng.borrow_mut().gen()); let stream = TcpStream::connect(config.server_address) .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; @@ -126,36 +143,87 @@ impl Connection { } async fn send_message(&mut self) -> anyhow::Result<()> { - let request = create_random_request( - &self.config, - &self.load_test_state, - &mut self.rng, - self.peer_id, - self.can_send_answer.is_none(), - ); + let mut rng = self.rng.borrow_mut(); - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { - r.info_hash = info_hash; - r.answer_to_peer_id = Some(peer_id); - r.answer_offer_id = Some(offer_id); - r.answer = Some(RtcAnswer { - t: RtcAnswerType::Answer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }); - r.event = None; - r.offers = None; + let request = match random_request_type(&self.config, &mut *rng) { + RequestType::Announce => { + let (event, bytes_left) = { + if rng.gen_bool(self.config.torrents.peer_seeder_probability) { + (AnnounceEvent::Completed, 0) + } else { + (AnnounceEvent::Started, 50) + } + }; + + const SDP: &str = "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg"; + + if let Some((info_hash, peer_id, offer_id)) = self.can_send_answer { + InMessage::AnnounceRequest(AnnounceRequest { + info_hash, + answer_to_peer_id: Some(peer_id), + answer_offer_id: Some(offer_id), + answer: Some(RtcAnswer { + t: RtcAnswerType::Answer, + sdp: SDP.into(), + }), + event: None, + offers: None, + action: aquatic_ws_protocol::AnnounceAction::Announce, + peer_id: self.peer_id, + bytes_left: Some(bytes_left), + numwant: Some(0), + }) + } else { + let info_hash_index = + select_info_hash_index(&self.config, &self.load_test_state, &mut *rng); + + let mut offers = Vec::with_capacity(self.config.torrents.offers_per_request); + + for _ in 0..self.config.torrents.offers_per_request { + offers.push(AnnounceRequestOffer { + offer_id: OfferId(rng.gen()), + offer: RtcOffer { + t: RtcOfferType::Offer, + sdp: SDP.into(), + }, + }) + } + + InMessage::AnnounceRequest(AnnounceRequest { + action: AnnounceAction::Announce, + info_hash: self.load_test_state.info_hashes[info_hash_index], + peer_id: self.peer_id, + bytes_left: Some(bytes_left), + event: Some(event), + numwant: Some(offers.len()), + offers: Some(offers), + answer: None, + answer_to_peer_id: None, + answer_offer_id: None, + }) + } } + RequestType::Scrape => { + let mut scrape_hashes = Vec::with_capacity(5); - self.can_send_answer = None; + for _ in 0..5 { + let info_hash_index = + select_info_hash_index(&self.config, &self.load_test_state, &mut *rng); - InMessage::AnnounceRequest(r) - } else { - request + scrape_hashes.push(self.load_test_state.info_hashes[info_hash_index]); + } + + InMessage::ScrapeRequest(ScrapeRequest { + action: ScrapeAction::Scrape, + info_hashes: Some(ScrapeRequestInfoHashes::Multiple(scrape_hashes)), + }) + } }; + drop(rng); + + self.can_send_answer = None; + self.stream.send(request.to_ws_message()).await?; self.load_test_state @@ -229,3 +297,16 @@ impl Connection { Ok(()) } } + +pub fn random_request_type(config: &Config, rng: &mut impl Rng) -> RequestType { + let weights = [ + config.torrents.weight_announce as u32, + config.torrents.weight_scrape as u32, + ]; + + let items = [RequestType::Announce, RequestType::Scrape]; + + let dist = WeightedIndex::new(&weights).expect("random request weighted index"); + + items[dist.sample(rng)] +} diff --git a/crates/ws_load_test/src/utils.rs b/crates/ws_load_test/src/utils.rs index bb17443..1c5c582 100644 --- a/crates/ws_load_test/src/utils.rs +++ b/crates/ws_load_test/src/utils.rs @@ -1,104 +1,13 @@ use std::sync::Arc; -use rand::distributions::WeightedIndex; use rand::prelude::*; use rand_distr::Gamma; use crate::common::*; use crate::config::*; -pub fn create_random_request( - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - peer_id: PeerId, - announce_gen_offers: bool, -) -> InMessage { - let weights = [ - config.torrents.weight_announce as u32, - config.torrents.weight_scrape as u32, - ]; - - let items = [RequestType::Announce, RequestType::Scrape]; - - let dist = WeightedIndex::new(&weights).expect("random request weighted index"); - - match items[dist.sample(rng)] { - RequestType::Announce => { - create_announce_request(config, state, rng, peer_id, announce_gen_offers) - } - RequestType::Scrape => create_scrape_request(config, state, rng), - } -} - #[inline] -fn create_announce_request( - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - peer_id: PeerId, - gen_offers: bool, -) -> InMessage { - let (event, bytes_left) = { - if rng.gen_bool(config.torrents.peer_seeder_probability) { - (AnnounceEvent::Completed, 0) - } else { - (AnnounceEvent::Started, 50) - } - }; - - let info_hash_index = select_info_hash_index(config, &state, rng); - - let offers = if gen_offers { - let mut offers = Vec::with_capacity(config.torrents.offers_per_request); - - for _ in 0..config.torrents.offers_per_request { - offers.push(AnnounceRequestOffer { - offer_id: OfferId(rng.gen()), - offer: RtcOffer { - t: RtcOfferType::Offer, - sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into() - }, - }) - } - - offers - } else { - Vec::new() - }; - - InMessage::AnnounceRequest(AnnounceRequest { - action: AnnounceAction::Announce, - info_hash: state.info_hashes[info_hash_index], - peer_id, - bytes_left: Some(bytes_left), - event: Some(event), - numwant: Some(offers.len()), - offers: Some(offers), - answer: None, - answer_to_peer_id: None, - answer_offer_id: None, - }) -} - -#[inline] -fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> InMessage { - let mut scrape_hashes = Vec::with_capacity(5); - - for _ in 0..5 { - let info_hash_index = select_info_hash_index(config, &state, rng); - - scrape_hashes.push(state.info_hashes[info_hash_index]); - } - - InMessage::ScrapeRequest(ScrapeRequest { - action: ScrapeAction::Scrape, - info_hashes: Some(ScrapeRequestInfoHashes::Multiple(scrape_hashes)), - }) -} - -#[inline] -fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { +pub fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) }