From cb3ec8cbf1a6d92248bdb35798e49952997caf95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 2 Aug 2020 06:38:36 +0200 Subject: [PATCH] WIP: get http load test into better (but bad) state Is now able to maintain correspondence with aquatic_http and opentracker --- Cargo.lock | 1 + aquatic_http/src/lib/network/connection.rs | 11 ++ aquatic_http/src/lib/network/mod.rs | 2 + aquatic_http_load_test/Cargo.toml | 1 + aquatic_http_load_test/src/network.rs | 114 +++++++++++++-------- aquatic_http_protocol/src/request.rs | 4 +- 6 files changed, 89 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f67588..a9ab0e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,6 +90,7 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_http_protocol", + "hashbrown", "mimalloc", "mio", "quickcheck", diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs index aa6e6a0..2a374d5 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/network/connection.rs @@ -45,6 +45,17 @@ impl EstablishedConnection { } } + pub fn shutdown(&mut self){ + match self.stream { + Stream::TcpStream(ref mut s) => { + s.shutdown(::std::net::Shutdown::Both); + }, + Stream::TlsStream(ref mut s) => { + s.shutdown(); + }, + } + } + pub fn read_request(&mut self) -> Result { if (self.buf.len() - self.bytes_read < 512) & (self.buf.len() <= 3072){ self.buf.extend_from_slice(&[0; 1024]); diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 471e930..78c4a70 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -326,6 +326,8 @@ pub fn send_responses( match established.send_response(&buffer.get_mut()[..bytes_written]){ Ok(()) => { debug!("sent response"); + + // established.shutdown(); }, Err(err) if err.kind() == ErrorKind::WouldBlock => { debug!("send response: would block"); diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 0314652..eb1fa5f 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -12,6 +12,7 @@ name = "aquatic_http_load_test" anyhow = "1" aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_http_protocol = { path = "../aquatic_http_protocol" } +hashbrown = "0.8" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.7", features = ["small_rng"] } diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 05a70bd..44ba8e9 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -2,9 +2,9 @@ use std::sync::atomic::Ordering; use std::time::Duration; use std::io::{Read, Write, ErrorKind, Cursor}; +use hashbrown::HashMap; use mio::{net::TcpStream, Events, Poll, Interest, Token}; use rand::{rngs::SmallRng, prelude::*}; -use slab::Slab; use crate::common::*; use crate::config::*; @@ -27,10 +27,6 @@ impl Connection { token_counter: &mut usize, ) -> anyhow::Result<()> { let mut stream = TcpStream::connect(config.server_address)?; - - let entry = connections.vacant_entry(); - - *token_counter = entry.key(); poll.registry() .register(&mut stream, Token(*token_counter), Interest::READABLE) @@ -43,34 +39,35 @@ impl Connection { can_send_initial: true, }; - entry.insert(connection); + connections.insert(*token_counter, connection); + + *token_counter = token_counter.wrapping_add(1); Ok(()) } - pub fn read_response_and_send_request( + pub fn read_response( &mut self, - config: &Config, state: &LoadTestState, - rng: &mut impl Rng, - request_buffer: &mut Cursor<&mut [u8]>, - ){ + ) -> bool { loop { match self.stream.read(&mut self.read_buffer[self.bytes_read..]){ Ok(bytes_read) => { self.bytes_read += bytes_read; + + break }, Err(err) if err.kind() == ErrorKind::WouldBlock => { - self.can_send_initial = false; + // self.can_send_initial = false; - break; + return false; }, Err(err) => { self.bytes_read = 0; eprintln!("handle_read_event error: {}", err); - return; + return false; } } }; @@ -84,12 +81,7 @@ impl Connection { Self::register_response_type(state, interesting_bytes); - self.send_request( - config, - state, - rng, - request_buffer, - ); + true } /// Ultra-crappy byte searches to determine response type with some degree @@ -119,6 +111,11 @@ impl Connection { return; } } + + eprintln!( + "couldn't determine response type: {}", + String::from_utf8_lossy(response_bytes) + ); } pub fn send_request( @@ -127,7 +124,7 @@ impl Connection { state: &LoadTestState, rng: &mut impl Rng, request_buffer: &mut Cursor<&mut [u8]>, - ){ + ) -> bool { let request = create_random_request( &config, &state, @@ -141,13 +138,17 @@ impl Connection { match self.send_request_inner(state, &request_buffer.get_mut()[..position]){ Ok(_) => { state.statistics.requests.fetch_add(1, Ordering::SeqCst); + + self.can_send_initial = false; + + true }, Err(err) => { - eprintln!("send request error: {}", err); + // eprintln!("send request error: {}", err); + + false } } - - self.can_send_initial = false; } fn send_request_inner( @@ -168,7 +169,7 @@ impl Connection { } -pub type ConnectionMap = Slab; +pub type ConnectionMap = HashMap; pub fn run_socket_thread( @@ -179,7 +180,7 @@ pub fn run_socket_thread( let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); let create_conn_interval = 2 ^ config.network.connection_creation_interval; - let mut connections: ConnectionMap = Slab::with_capacity(config.num_connections); + let mut connections: ConnectionMap = HashMap::new(); let mut poll = Poll::new().expect("create poll"); let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut rng = SmallRng::from_entropy(); @@ -199,6 +200,7 @@ pub fn run_socket_thread( let mut initial_sent = false; let mut iter_counter = 0usize; + let mut num_to_create = 0usize; loop { poll.poll(&mut events, Some(timeout)) @@ -208,13 +210,12 @@ pub fn run_socket_thread( if event.is_readable(){ let token = event.token(); - if let Some(connection) = connections.get_mut(token.0){ - connection.read_response_and_send_request( - config, - &state, - &mut rng, - &mut request_buffer - ); + if let Some(connection) = connections.get_mut(&token.0){ + if connection.read_response(&state){ + num_to_create += 1; + connections.remove(&token.0); + } + } else { eprintln!("connection not found: {:?}", token); } @@ -222,20 +223,29 @@ pub fn run_socket_thread( } if !initial_sent { - for (_, connection) in connections.iter_mut(){ - if connection.can_send_initial { - connection.send_request( - config, - &state, - &mut rng, - &mut request_buffer - ); + let mut drop_keys = Vec::new(); - initial_sent = true; + for (k, connection) in connections.iter_mut(){ + let success = connection.send_request( + config, + &state, + &mut rng, + &mut request_buffer + ); + + if !success { + drop_keys.push(*k); } + // initial_sent = true; + } + + for k in drop_keys { + connections.remove(&k); + num_to_create += 1; } } + /* // Slowly create new connections if token_counter < config.num_connections && iter_counter % create_conn_interval == 0 { Connection::create_and_register( @@ -247,6 +257,26 @@ pub fn run_socket_thread( initial_sent = false; } + */ + + num_to_create += 1; + let max_new = 8 - connections.len(); + let num_new = num_to_create.min(max_new); + + for _ in 0..num_new { + let err = Connection::create_and_register( + config, + &mut connections, + &mut poll, + &mut token_counter, + ).is_err(); + + if !err { + num_to_create -= 1; + } + + initial_sent = false; + } iter_counter = iter_counter.wrapping_add(1); } diff --git a/aquatic_http_protocol/src/request.rs b/aquatic_http_protocol/src/request.rs index 3007c7d..edbbaab 100644 --- a/aquatic_http_protocol/src/request.rs +++ b/aquatic_http_protocol/src/request.rs @@ -55,7 +55,7 @@ impl AnnounceRequest { output.write_all(key.as_str().as_bytes())?; } - output.write_all(b" HTTP/1.1\r\n\r\n")?; + output.write_all(b" HTTP/1.1\r\nConnection: keep-alive\r\n\r\n")?; Ok(()) } @@ -85,7 +85,7 @@ impl ScrapeRequest { first = false; } - output.write_all(b" HTTP/1.1\r\n\r\n")?; + output.write_all(b" HTTP/1.1\r\nConnection: keep-alive\r\n\r\n")?; Ok(()) }