From 49414e900667a132345ad09cbe1b880a79ef756e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 2 Aug 2020 10:04:24 +0200 Subject: [PATCH] aquatic http load test: parse requests properly, fix issues --- aquatic_http_load_test/src/network.rs | 170 ++++++++++++-------------- 1 file changed, 76 insertions(+), 94 deletions(-) diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 44ba8e9..1416885 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -15,7 +15,6 @@ pub struct Connection { stream: TcpStream, read_buffer: [u8; 4096], bytes_read: usize, - can_send_initial: bool, } @@ -36,7 +35,6 @@ impl Connection { stream, read_buffer: [0; 4096], bytes_read: 0, - can_send_initial: true, }; connections.insert(*token_counter, connection); @@ -49,73 +47,74 @@ impl Connection { pub fn read_response( &mut self, state: &LoadTestState, - ) -> bool { + ) -> bool { // bool = remove connection loop { match self.stream.read(&mut self.read_buffer[self.bytes_read..]){ Ok(bytes_read) => { self.bytes_read += bytes_read; - break + let interesting_bytes = &self.read_buffer[..self.bytes_read]; + + let mut opt_body_start_index = None; + + for (i, chunk) in interesting_bytes.windows(4).enumerate(){ + if chunk == b"\r\n\r\n" { + opt_body_start_index = Some(i + 4); + + break; + } + } + + if let Some(body_start_index) = opt_body_start_index { + let interesting_bytes = &interesting_bytes[body_start_index..]; + + match Response::from_bytes(interesting_bytes){ + Ok(response) => { + state.statistics.bytes_received + .fetch_add(self.bytes_read, Ordering::SeqCst); + + self.bytes_read = 0; + + match response { + Response::Announce(_) => { + state.statistics.responses_announce + .fetch_add(1, Ordering::SeqCst); + }, + Response::Scrape(_) => { + state.statistics.responses_scrape + .fetch_add(1, Ordering::SeqCst); + }, + Response::Failure(_) => { + state.statistics.responses_failure + .fetch_add(1, Ordering::SeqCst); + }, + } + + break false; + }, + Err(err) => { + eprintln!( + "deserialize response error with {} bytes read: {:?}, text: {}", + self.bytes_read, + err, + String::from_utf8_lossy(interesting_bytes) + ); + } + } + } }, Err(err) if err.kind() == ErrorKind::WouldBlock => { - // self.can_send_initial = false; - - return false; + break false; }, Err(err) => { self.bytes_read = 0; - eprintln!("handle_read_event error: {}", err); + // eprintln!("handle_read_event error: {}", err); - return false; + break true; } } - }; - - state.statistics.bytes_received - .fetch_add(self.bytes_read, Ordering::SeqCst); - - let interesting_bytes = &self.read_buffer[..self.bytes_read]; - - self.bytes_read = 0; - - Self::register_response_type(state, interesting_bytes); - - true - } - - /// Ultra-crappy byte searches to determine response type with some degree - /// of certainty. - fn register_response_type( - state: &LoadTestState, - response_bytes: &[u8], - ){ - for chunk in response_bytes.windows(12){ - if chunk == b"e8:intervali" { - state.statistics.responses_announce.fetch_add(1, Ordering::SeqCst); - - return; - } } - for chunk in response_bytes.windows(9){ - if chunk == b"d5:filesd" { - state.statistics.responses_scrape.fetch_add(1, Ordering::SeqCst); - - return; - } - } - for chunk in response_bytes.windows(18){ - if chunk == b"d14:failure reason" { - state.statistics.responses_failure.fetch_add(1, Ordering::SeqCst); - - return; - } - } - - eprintln!( - "couldn't determine response type: {}", - String::from_utf8_lossy(response_bytes) - ); } pub fn send_request( @@ -124,7 +123,7 @@ impl Connection { state: &LoadTestState, rng: &mut impl Rng, request_buffer: &mut Cursor<&mut [u8]>, - ) -> bool { + ) -> bool { // bool = remove connection let request = create_random_request( &config, &state, @@ -139,14 +138,12 @@ impl Connection { Ok(_) => { state.statistics.requests.fetch_add(1, Ordering::SeqCst); - self.can_send_initial = false; - - true + false }, Err(err) => { // eprintln!("send request error: {}", err); - false + true } } } @@ -211,9 +208,11 @@ pub fn run_socket_thread( let token = event.token(); if let Some(connection) = connections.get_mut(&token.0){ - if connection.read_response(&state){ - num_to_create += 1; + let remove = connection.read_response(&state); + + if remove { connections.remove(&token.0); + num_to_create += 1; } } else { @@ -222,44 +221,27 @@ pub fn run_socket_thread( } } - if !initial_sent { - let mut drop_keys = Vec::new(); + let mut drop_keys = Vec::new(); - for (k, connection) in connections.iter_mut(){ - let success = connection.send_request( - config, - &state, - &mut rng, - &mut request_buffer - ); - - if !success { - drop_keys.push(*k); - } - // initial_sent = true; - } - - for k in drop_keys { - connections.remove(&k); - num_to_create += 1; - } - } - - /* - // Slowly create new connections - if token_counter < config.num_connections && iter_counter % create_conn_interval == 0 { - Connection::create_and_register( + for (k, connection) in connections.iter_mut(){ + let remove_connection = connection.send_request( config, - &mut connections, - &mut poll, - &mut token_counter, - ).unwrap(); + &state, + &mut rng, + &mut request_buffer + ); - initial_sent = false; + if remove_connection { + drop_keys.push(*k); + } } - */ - num_to_create += 1; + for k in drop_keys { + connections.remove(&k); + num_to_create += 1; + } + + // num_to_create += 1; let max_new = 8 - connections.len(); let num_new = num_to_create.min(max_new);