diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 282a31e..b053216 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -20,6 +20,7 @@ pub struct Statistics { pub response_peers: AtomicUsize, pub responses_announce: AtomicUsize, pub responses_offer: AtomicUsize, + pub responses_answer: AtomicUsize, pub responses_scrape: AtomicUsize, pub responses_failure: AtomicUsize, pub bytes_sent: AtomicUsize, diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index e35be1f..141f51d 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -104,6 +104,8 @@ fn monitor_statistics( .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_offer_per_second = statistics.responses_offer .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_answer_per_second = statistics.responses_answer + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_scrape_per_second = statistics.responses_scrape .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_failure_per_second = statistics.responses_failure @@ -119,6 +121,7 @@ fn monitor_statistics( let responses_per_second = responses_announce_per_second + responses_offer_per_second + + responses_answer_per_second + responses_scrape_per_second + responses_failure_per_second; @@ -129,11 +132,12 @@ fn monitor_statistics( println!("Responses in: {:.2}/second", responses_per_second); println!(" - Announce responses: {:.2}", responses_announce_per_second); println!(" - Offer responses: {:.2}", responses_offer_per_second); + println!(" - Answer responses: {:.2}", responses_answer_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second); - println!(" - Failure responses: {:.2}", responses_failure_per_second); - //println!("Peers per announce response: {:.2}", response_peers / responses_announce); - println!("Bandwidth out: {:.2}Mbit/s", bytes_sent_per_second * MBITS_FACTOR); - println!("Bandwidth in: {:.2}Mbit/s", bytes_received_per_second * MBITS_FACTOR); + // println!(" - Failure responses: {:.2}", responses_failure_per_second); + // println!("Peers per announce response: {:.2}", response_peers / responses_announce); + // println!("Bandwidth out: {:.2}Mbit/s", bytes_sent_per_second * MBITS_FACTOR); + // println!("Bandwidth in: {:.2}Mbit/s", bytes_received_per_second * MBITS_FACTOR); let time_elapsed = start_time.elapsed(); let duration = Duration::from_secs(config.duration as u64); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index e7d4167..b9b7e3c 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -68,7 +68,8 @@ impl ConnectionState { pub struct Connection { stream: ConnectionState, can_send_initial: bool, - marked_as_complete: bool + marked_as_complete: bool, + send_answer: Option<(PeerId, OfferId)>, } @@ -89,6 +90,7 @@ impl Connection { stream: ConnectionState::TcpStream(stream), can_send_initial: false, marked_as_complete: false, + send_answer: None, }; connections.insert(*token_counter, connection); @@ -104,6 +106,7 @@ impl Connection { stream, can_send_initial: self.can_send_initial, marked_as_complete: false, + send_answer: self.send_answer, }) } else { None @@ -117,12 +120,51 @@ impl Connection { rng: &mut impl Rng, ){ if let ConnectionState::WebSocket(ref mut ws) = self.stream { - let mut send_request = false; + let mut send_random_request = false; loop { match ws.read_message(){ Ok(message) => { - send_request |= Self::register_response_type(state, message); + if let ::tungstenite::Message::Text(text) = message { + if text.contains("answer"){ + state.statistics.responses_answer + .fetch_add(1, Ordering::SeqCst); + + send_random_request = true; + } else if text.contains("offer"){ + // If message is an offer, don't send random + // request in return, since that would cause + // exponential growth of number of requests. + // However, add an answer to next request. + + let res_offer: Result = ::serde_json::from_str(&text); + + match res_offer { + Ok(offer) => { + state.statistics.responses_offer + .fetch_add(1, Ordering::SeqCst); + + self.send_answer = Some(( + offer.peer_id, + offer.offer_id + )); + }, + Err(err) => { + eprintln!("error decoding offer: {:?}", err); + } + } + } else if text.contains("interval"){ + state.statistics.responses_announce + .fetch_add(1, Ordering::SeqCst); + + send_random_request = true; + } else if text.contains("scrape"){ + state.statistics.responses_scrape + .fetch_add(1, Ordering::SeqCst); + + send_random_request = true; + } + } }, Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { self.can_send_initial = false; @@ -137,8 +179,8 @@ impl Connection { } }; - if send_request { - self.send_request( + if send_random_request { + self.send_random_request( config, state, rng, @@ -147,44 +189,43 @@ impl Connection { } } - fn register_response_type( - state: &LoadTestState, - message: ::tungstenite::Message, - ) -> bool { - if let ::tungstenite::Message::Text(text) = message { - if text.contains("offer"){ - state.statistics.responses_offer - .fetch_add(1, Ordering::SeqCst); - - return false; - } else if text.contains("announce"){ - state.statistics.responses_announce - .fetch_add(1, Ordering::SeqCst); - } else if text.contains("scrape"){ - state.statistics.responses_scrape - .fetch_add(1, Ordering::SeqCst); - } - } - - true - } - - pub fn send_request( + pub fn send_random_request( &mut self, config: &Config, state: &LoadTestState, rng: &mut impl Rng, + ){ + let request = create_random_request( + &config, + &state, + rng + ); + + // Add offer answer data if applicable + let request = if let InMessage::AnnounceRequest(mut r) = request { + if let Some((peer_id, offer_id)) = self.send_answer { + r.to_peer_id = Some(peer_id); + r.offer_id = Some(offer_id); + r.answer = Some(JsonValue(::serde_json::Value::from("{}"))); + } + + self.send_answer = None; + + InMessage::AnnounceRequest(r) + } else { + request + }; + + self.send_request(state, request) + } + + fn send_request( + &mut self, + state: &LoadTestState, + request: InMessage ){ if let ConnectionState::WebSocket(ref mut ws) = self.stream { - let request = create_random_request( - &config, - &state, - rng - ); - - let message = request.to_ws_message(); - - match ws.write_message(message){ + match ws.write_message(request.to_ws_message()){ Ok(_) => { state.statistics.requests.fetch_add(1, Ordering::SeqCst); }, @@ -302,7 +343,7 @@ pub fn run_socket_thread( for (_, connection) in connections.iter_mut(){ if connection.can_send_initial { - connection.send_request( + connection.send_random_request( config, &state, &mut rng, diff --git a/aquatic_ws_protocol/src/lib.rs b/aquatic_ws_protocol/src/lib.rs index b4870bc..082722c 100644 --- a/aquatic_ws_protocol/src/lib.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -65,7 +65,7 @@ impl Default for AnnounceEvent { /// Apparently, these are sent to a number of peers when they are set /// in an AnnounceRequest /// action = "announce" -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MiddlemanOfferToPeer { /// Peer id of peer sending offer /// Note: if equal to client peer_id, client ignores offer