diff --git a/Cargo.lock b/Cargo.lock index ef35171..06c0ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,7 @@ dependencies = [ "aquatic_http_protocol", "aquatic_toml_config", "futures-lite", + "futures-rustls", "glommio", "hashbrown 0.12.0", "log", diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 8403959..5663acb 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -19,6 +19,7 @@ aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } anyhow = "1" futures-lite = "1" +futures-rustls = "0.22" hashbrown = "0.12" glommio = "0.7" log = "0.4" diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index a8fc57a..9b42ba6 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,7 +1,7 @@ use std::{ cell::RefCell, convert::TryInto, - io::{Cursor, ErrorKind, Read}, + io::Cursor, rc::Rc, sync::{atomic::Ordering, Arc}, time::Duration, @@ -9,10 +9,10 @@ use std::{ use aquatic_http_protocol::response::Response; use futures_lite::{AsyncReadExt, AsyncWriteExt}; +use futures_rustls::{client::TlsStream, TlsConnector}; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; use rand::{prelude::SmallRng, SeedableRng}; -use rustls::ClientConnection; use crate::{common::LoadTestState, config::Config, utils::create_random_request}; @@ -23,6 +23,7 @@ pub async fn run_socket_thread( ) -> anyhow::Result<()> { let config = Rc::new(config); let num_active_connections = Rc::new(RefCell::new(0usize)); + let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let interval = config.connection_creation_interval_ms; @@ -34,6 +35,7 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + rng.clone(), ) .await { @@ -51,6 +53,7 @@ pub async fn run_socket_thread( tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), + rng.clone(), ) }); } @@ -66,11 +69,18 @@ async fn periodically_open_connections( tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, ) -> Option { if *num_active_connections.borrow() < config.num_connections { spawn_local(async move { - if let Err(err) = - Connection::run(config, tls_config, load_test_state, num_active_connections).await + if let Err(err) = Connection::run( + config, + tls_config, + load_test_state, + num_active_connections, + rng.clone(), + ) + .await { ::log::error!("connection creation error: {:?}", err); } @@ -84,13 +94,9 @@ async fn periodically_open_connections( struct Connection { config: Rc, load_test_state: LoadTestState, - rng: SmallRng, - stream: TcpStream, - tls: ClientConnection, - response_buffer: [u8; 2048], - response_buffer_position: usize, - send_new_request: bool, - queued_responses: usize, + rng: Rc>, + stream: TlsStream, + buffer: [u8; 2048], } impl Connection { @@ -99,23 +105,22 @@ impl Connection { tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, + rng: Rc>, ) -> anyhow::Result<()> { let stream = TcpStream::connect(config.server_address) .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; - let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); - let rng = SmallRng::from_entropy(); + + let stream = TlsConnector::from(tls_config) + .connect("example.com".try_into().unwrap(), stream) + .await?; let mut connection = Connection { config, load_test_state, rng, stream, - tls, - response_buffer: [0; 2048], - response_buffer_position: 0, - send_new_request: true, - queued_responses: 0, + buffer: [0; 2048], }; *num_active_connections.borrow_mut() += 1; @@ -131,169 +136,108 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { - if self.send_new_request { - let request = - create_random_request(&self.config, &self.load_test_state, &mut self.rng); - - request.write(&mut self.tls.writer(), self.config.url_suffix.as_bytes())?; - self.queued_responses += 1; - - self.send_new_request = false; - } - - self.write_tls().await?; - self.read_tls().await?; + self.send_request().await?; + self.read_response().await?; } } - async fn read_tls(&mut self) -> anyhow::Result<()> { - loop { - let mut buf = [0u8; 1024]; + async fn send_request(&mut self) -> anyhow::Result<()> { + let request = create_random_request( + &self.config, + &self.load_test_state, + &mut self.rng.borrow_mut(), + ); - let bytes_read = self.stream.read(&mut buf).await?; + let mut cursor = Cursor::new(&mut self.buffer[..]); - if bytes_read == 0 { - return Err(anyhow::anyhow!("Peer has closed connection")); - } + request.write(&mut cursor, self.config.url_suffix.as_bytes())?; - self.load_test_state - .statistics - .bytes_received - .fetch_add(bytes_read, Ordering::SeqCst); + let cursor_position = cursor.position() as usize; - let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); + let bytes_sent = self + .stream + .write(&cursor.into_inner()[..cursor_position]) + .await?; - let io_state = self.tls.process_new_packets()?; - - let mut added_plaintext = false; - - if io_state.plaintext_bytes_to_read() != 0 { - loop { - match self.tls.reader().read(&mut buf) { - Ok(0) => { - break; - } - Ok(amt) => { - let end = self.response_buffer_position + amt; - - if end > self.response_buffer.len() { - return Err(anyhow::anyhow!("response too large")); - } else { - let response_buffer_slice = - &mut self.response_buffer[self.response_buffer_position..end]; - - response_buffer_slice.copy_from_slice(&buf[..amt]); - - self.response_buffer_position = end; - - added_plaintext = true; - } - } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - break; - } - Err(err) => { - panic!("tls.reader().read: {}", err); - } - } - } - } - - if added_plaintext { - let interesting_bytes = &self.response_buffer[..self.response_buffer_position]; - - let mut opt_body_start_index = None; - - for (i, chunk) in interesting_bytes.windows(4).enumerate() { - if chunk == b"\r\n\r\n" { - opt_body_start_index = Some(i + 4); - - break; - } - } - - if let Some(body_start_index) = opt_body_start_index { - match Response::from_bytes(&interesting_bytes[body_start_index..]) { - Ok(response) => { - match response { - Response::Announce(_) => { - self.load_test_state - .statistics - .responses_announce - .fetch_add(1, Ordering::SeqCst); - } - Response::Scrape(_) => { - self.load_test_state - .statistics - .responses_scrape - .fetch_add(1, Ordering::SeqCst); - } - Response::Failure(response) => { - self.load_test_state - .statistics - .responses_failure - .fetch_add(1, Ordering::SeqCst); - println!( - "failure response: reason: {}", - response.failure_reason - ); - } - } - - self.response_buffer_position = 0; - self.send_new_request = true; - - break; - } - Err(err) => { - eprintln!( - "deserialize response error with {} bytes read: {:?}, text: {}", - self.response_buffer_position, - err, - String::from_utf8_lossy(interesting_bytes) - ); - } - } - } - } - - if self.tls.wants_write() { - break; - } - } - - Ok(()) - } - - async fn write_tls(&mut self) -> anyhow::Result<()> { - if !self.tls.wants_write() { - return Ok(()); - } - - let mut buf = Vec::new(); - let mut buf = Cursor::new(&mut buf); - - while self.tls.wants_write() { - self.tls.write_tls(&mut buf).unwrap(); - } - - let len = buf.get_ref().len(); - - self.stream.write_all(&buf.into_inner()).await?; self.stream.flush().await?; self.load_test_state .statistics .bytes_sent - .fetch_add(len, Ordering::SeqCst); + .fetch_add(bytes_sent, Ordering::Relaxed); - if self.queued_responses != 0 { - self.load_test_state - .statistics - .requests - .fetch_add(self.queued_responses, Ordering::SeqCst); + self.load_test_state + .statistics + .requests + .fetch_add(1, Ordering::Relaxed); - self.queued_responses = 0; + Ok(()) + } + + async fn read_response(&mut self) -> anyhow::Result<()> { + let mut buffer_position = 0; + + loop { + let bytes_read = self + .stream + .read(&mut self.buffer[buffer_position..]) + .await?; + + if bytes_read == 0 { + break; + } + + buffer_position += bytes_read; + + let interesting_bytes = &self.buffer[..buffer_position]; + + let mut opt_body_start_index = None; + + for (i, chunk) in interesting_bytes.windows(4).enumerate() { + if chunk == b"\r\n\r\n" { + opt_body_start_index = Some(i + 4); + + break; + } + } + + if let Some(body_start_index) = opt_body_start_index { + match Response::from_bytes(&interesting_bytes[body_start_index..]) { + Ok(response) => { + match response { + Response::Announce(_) => { + self.load_test_state + .statistics + .responses_announce + .fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + self.load_test_state + .statistics + .responses_scrape + .fetch_add(1, Ordering::Relaxed); + } + Response::Failure(response) => { + self.load_test_state + .statistics + .responses_failure + .fetch_add(1, Ordering::Relaxed); + println!("failure response: reason: {}", response.failure_reason); + } + } + + break; + } + Err(err) => { + eprintln!( + "deserialize response error with {} bytes read: {:?}, text: {}", + buffer_position, + err, + String::from_utf8_lossy(interesting_bytes) + ); + } + } + } } Ok(()) diff --git a/aquatic_http_load_test/src/utils.rs b/aquatic_http_load_test/src/utils.rs index c6d9e54..313657a 100644 --- a/aquatic_http_load_test/src/utils.rs +++ b/aquatic_http_load_test/src/utils.rs @@ -10,7 +10,7 @@ use crate::config::*; pub fn create_random_request( config: &Config, state: &LoadTestState, - rng: &mut impl Rng, + rng: &mut SmallRng, ) -> Request { let weights = [ config.torrents.weight_announce as u32,