From 93a7ad034428a7aaec9a0ffb04c74908f55fb0d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 2 Aug 2020 10:24:27 +0200 Subject: [PATCH] aquatic http load test: only send request when appropriate, other fixes --- aquatic_http_load_test/src/config.rs | 2 +- aquatic_http_load_test/src/network.rs | 36 +++++++++++++++++---------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index f871bc4..482fe2a 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -48,7 +48,7 @@ impl Default for Config { Self { server_address: "127.0.0.1:3000".parse().unwrap(), num_workers: 1, - num_connections: 128, + num_connections: 8, duration: 0, network: NetworkConfig::default(), torrents: TorrentConfig::default(), diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 1416885..747d3b6 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -15,6 +15,7 @@ pub struct Connection { stream: TcpStream, read_buffer: [u8; 4096], bytes_read: usize, + can_send: bool, } @@ -35,6 +36,7 @@ impl Connection { stream, read_buffer: [0; 4096], bytes_read: 0, + can_send: true, }; connections.insert(*token_counter, connection); @@ -73,8 +75,6 @@ impl Connection { state.statistics.bytes_received .fetch_add(self.bytes_read, Ordering::SeqCst); - self.bytes_read = 0; - match response { Response::Announce(_) => { state.statistics.responses_announce @@ -90,6 +90,9 @@ impl Connection { }, } + self.bytes_read = 0; + self.can_send = true; + break false; }, Err(err) => { @@ -124,6 +127,10 @@ impl Connection { rng: &mut impl Rng, request_buffer: &mut Cursor<&mut [u8]>, ) -> bool { // bool = remove connection + if !self.can_send { + return false; + } + let request = create_random_request( &config, &state, @@ -138,6 +145,8 @@ impl Connection { Ok(_) => { state.statistics.requests.fetch_add(1, Ordering::SeqCst); + self.can_send = false; + false }, Err(err) => { @@ -195,10 +204,11 @@ pub fn run_socket_thread( ).unwrap(); } - let mut initial_sent = false; let mut iter_counter = 0usize; let mut num_to_create = 0usize; + let mut drop_connections = Vec::with_capacity(config.num_connections); + loop { poll.poll(&mut events, Some(timeout)) .expect("failed polling"); @@ -208,9 +218,9 @@ pub fn run_socket_thread( let token = event.token(); if let Some(connection) = connections.get_mut(&token.0){ - let remove = connection.read_response(&state); + let remove_connection = connection.read_response(&state); - if remove { + if remove_connection { connections.remove(&token.0); num_to_create += 1; } @@ -221,8 +231,6 @@ pub fn run_socket_thread( } } - let mut drop_keys = Vec::new(); - for (k, connection) in connections.iter_mut(){ let remove_connection = connection.send_request( config, @@ -232,17 +240,21 @@ pub fn run_socket_thread( ); if remove_connection { - drop_keys.push(*k); + drop_connections.push(*k); } } - for k in drop_keys { + for k in drop_connections.drain(..) { connections.remove(&k); num_to_create += 1; } - // num_to_create += 1; - let max_new = 8 - connections.len(); + let max_new = config.num_connections - connections.len(); + + if max_new != 0 && iter_counter % create_conn_interval == 0 { + num_to_create += 1; + } + let num_new = num_to_create.min(max_new); for _ in 0..num_new { @@ -256,8 +268,6 @@ pub fn run_socket_thread( if !err { num_to_create -= 1; } - - initial_sent = false; } iter_counter = iter_counter.wrapping_add(1);