diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 3da139a..64d9685 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -53,18 +53,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { pareto: Arc::new(pareto), }; - // Start socket workers - for _ in 0..config.num_workers { - let config = config.clone(); let state = state.clone(); - thread::spawn(move || run_socket_thread( - &config, - state, - 1 - )); + thread::spawn(move || run_socket_thread(&config, state,)); } monitor_statistics( diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 3c434ec..47cc44f 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -63,14 +63,12 @@ impl ConnectionState { Self::WebSocket(ws) => Some(Self::WebSocket(ws)), } } - } pub struct Connection { stream: ConnectionState, - can_send_initial: bool, - marked_as_complete: bool, + can_send: bool, send_answer: Option<(PeerId, OfferId)>, } @@ -85,13 +83,12 @@ impl Connection { let mut stream = TcpStream::connect(config.server_address)?; poll.registry() - .register(&mut stream, Token(*token_counter), Interest::READABLE) + .register(&mut stream, Token(*token_counter), Interest::READABLE | Interest::WRITABLE) .unwrap(); let connection = Connection { stream: ConnectionState::TcpStream(stream), - can_send_initial: false, - marked_as_complete: false, + can_send: false, send_answer: None, }; @@ -104,26 +101,23 @@ impl Connection { pub fn advance(self, config: &Config) -> Option { if let Some(stream) = self.stream.advance(config){ + let can_send = matches!(stream, ConnectionState::WebSocket(_)); + Some(Self { stream, - can_send_initial: self.can_send_initial, - marked_as_complete: false, - send_answer: self.send_answer, + can_send, + send_answer: None, }) } else { None } } - pub fn read_response_and_send_request( + pub fn read_responses( &mut self, - config: &Config, state: &LoadTestState, - rng: &mut impl Rng, - ){ + ) -> bool { // bool = drop connection if let ConnectionState::WebSocket(ref mut ws) = self.stream { - let mut send_random_request = false; - loop { match ws.read_message(){ Ok(message) => { @@ -137,25 +131,25 @@ impl Connection { offer.offer_id )); - send_random_request = true; + self.can_send = true; }, Ok(OutMessage::Answer(_)) => { state.statistics.responses_answer .fetch_add(1, Ordering::SeqCst); - send_random_request = true; + self.can_send = true; }, Ok(OutMessage::AnnounceResponse(_)) => { state.statistics.responses_announce .fetch_add(1, Ordering::SeqCst); - send_random_request = true; + self.can_send = true; }, Ok(OutMessage::ScrapeResponse(_)) => { state.statistics.responses_scrape .fetch_add(1, Ordering::SeqCst); - send_random_request = true; + self.can_send = true; }, Err(err) => { eprintln!("error deserializing offer: {:?}", err); @@ -163,78 +157,75 @@ impl Connection { } }, Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { - self.can_send_initial = false; - - break; + return false; }, Err(err) => { - eprintln!("handle_read_event error: {}", err); + // eprintln!("handle_read_event error: {}", err); - break; + return true; } } - }; - - if send_random_request { - self.send_random_request( - config, - state, - rng, - ); } } + + false } - pub fn send_random_request( + pub fn send_request( &mut self, config: &Config, state: &LoadTestState, rng: &mut impl Rng, - ){ - let request = create_random_request( - &config, - &state, - rng - ); + ) -> bool { // bool = remove connection + if !self.can_send { + return false; + } - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((peer_id, offer_id)) = self.send_answer { - r.to_peer_id = Some(peer_id); - r.offer_id = Some(offer_id); - r.answer = Some(JsonValue(::serde_json::Value::from(JSON_VALUE))); - r.offers = None; - } - - self.send_answer = None; - - InMessage::AnnounceRequest(r) - } else { - request - }; - - self.send_request(state, request) - } - - fn send_request( - &mut self, - state: &LoadTestState, - request: InMessage - ){ if let ConnectionState::WebSocket(ref mut ws) = self.stream { + let request = create_random_request( + &config, + &state, + rng + ); + + // If self.send_answer is set and request is announce request, make + // the request an offer answer + let request = if let InMessage::AnnounceRequest(mut r) = request { + if let Some((peer_id, offer_id)) = self.send_answer { + r.to_peer_id = Some(peer_id); + r.offer_id = Some(offer_id); + r.answer = Some(JsonValue(::serde_json::Value::from(JSON_VALUE))); + r.offers = None; + } + + self.send_answer = None; + + InMessage::AnnounceRequest(r) + } else { + request + }; + match ws.write_message(request.to_ws_message()){ - Ok(_) => { + Ok(()) => { state.statistics.requests.fetch_add(1, Ordering::SeqCst); + + self.can_send = false; + + false }, + Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { + false + } Err(err) => { - eprintln!("send request error: {}", err); + // eprintln!("send request error: {:?}", err); + + true } } - - self.can_send_initial = false; } else { println!("send request can't send to non-ws stream"); + + false } } } @@ -246,7 +237,6 @@ pub type ConnectionMap = HashMap; pub fn run_socket_thread( config: &Config, state: LoadTestState, - num_initial_requests: usize, ) { let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); let create_conn_interval = 2 ^ config.network.connection_creation_interval; @@ -257,98 +247,56 @@ pub fn run_socket_thread( let mut rng = SmallRng::from_entropy(); let mut token_counter = 0usize; - - for _ in 0..num_initial_requests { - Connection::create_and_register( - config, - &mut connections, - &mut poll, - &mut token_counter, - ).unwrap(); - } - - println!("num connections in map: {}", connections.len()); - - let mut initial_sent = false; let mut iter_counter = 0usize; - let mut num_completed = 0usize; + let mut drop_keys = Vec::new(); loop { poll.poll(&mut events, Some(timeout)) .expect("failed polling"); for event in events.iter(){ + let token = event.token(); + if event.is_readable(){ - let token = event.token(); - - let mut run_advance = false; - if let Some(connection) = connections.get_mut(&token.0){ if let ConnectionState::WebSocket(_) = connection.stream { - connection.read_response_and_send_request( - config, - &state, - &mut rng, - ); - } else { - run_advance = true; + let drop_connection = connection.read_responses(&state); - println!("set run_advance=true"); - } - } else { - eprintln!("connection not found: {:?}", token); - } - - if run_advance { - let connection = connections.remove(&token.0).unwrap(); - - if let Some(connection) = connection.advance(config){ - println!("advanced connection"); - connections.insert(token.0, connection); - } - } - } - } - - if num_completed != token_counter { - for k in 0..token_counter { - if let Some(mut connection) = connections.remove(&k){ - if let ConnectionState::WebSocket(_) = connection.stream { - if !connection.marked_as_complete { - connection.can_send_initial = true; - connection.marked_as_complete = true; - initial_sent = false; - num_completed += 1; + if drop_connection { + connections.remove(&token.0); } - connections.insert(k, connection); - } else if let Some(c) = connection.advance(config){ - connections.insert(k, c); + continue; } - } else { - // println!("connection not found for token {}", k); + } + } + + if let Some(connection) = connections.remove(&token.0){ + if let Some(connection) = connection.advance(config){ + connections.insert(token.0, connection); } } } - if !initial_sent { - for (_, connection) in connections.iter_mut(){ - if connection.can_send_initial { + for (k, connection) in connections.iter_mut(){ + let drop_connection = connection.send_request( + config, + &state, + &mut rng, + ); - connection.send_random_request( - config, - &state, - &mut rng, - ); - - initial_sent = true; - } + if drop_connection { + drop_keys.push(*k) } } + for k in drop_keys.drain(..){ + connections.remove(&k); + } + // Slowly create new connections - if token_counter < config.num_connections && iter_counter % create_conn_interval == 0 { + if connections.len() < config.num_connections && iter_counter % create_conn_interval == 0 { let res = Connection::create_and_register( config, &mut connections, @@ -359,8 +307,6 @@ pub fn run_socket_thread( if let Err(err) = res { eprintln!("create connection error: {}", err); } - - // initial_sent = false; } iter_counter = iter_counter.wrapping_add(1); diff --git a/aquatic_ws_protocol/src/lib.rs b/aquatic_ws_protocol/src/lib.rs index fec2172..839fdc6 100644 --- a/aquatic_ws_protocol/src/lib.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -260,7 +260,8 @@ impl InMessage { } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Deserialize)] +#[serde(untagged)] pub enum OutMessage { AnnounceResponse(AnnounceResponse), ScrapeResponse(ScrapeResponse),