From 13d18bbf03d0cb244e1f9bffabed0484d93b4f1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 28 Oct 2021 01:06:13 +0200 Subject: [PATCH] aquatic_http_load_test: add glommio implementation --- Cargo.lock | 4 + aquatic_http_load_test/Cargo.toml | 5 + aquatic_http_load_test/src/config.rs | 2 +- aquatic_http_load_test/src/glommio.rs | 260 ++++++++++++++++++++++++++ aquatic_http_load_test/src/main.rs | 38 +++- aquatic_http_load_test/src/network.rs | 44 ++++- 6 files changed, 347 insertions(+), 6 deletions(-) create mode 100644 aquatic_http_load_test/src/glommio.rs diff --git a/Cargo.lock b/Cargo.lock index 29bed3f..aa199b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -127,6 +127,8 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_http_protocol", + "futures-lite", + "glommio", "hashbrown 0.11.2", "mimalloc", "mio", @@ -134,6 +136,8 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", + "rustls", + "rustls-pemfile", "serde", ] diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 7d29e52..0183b75 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -18,8 +18,13 @@ mimalloc = { version = "0.1", default-features = false } mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" +rustls = { version = "0.20", features = ["dangerous_configuration"] } +rustls-pemfile = "0.2" serde = { version = "1", features = ["derive"] } +futures-lite = "1" +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } + [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index b05655c..eb87bb9 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, path::PathBuf}; use serde::{Deserialize, Serialize}; diff --git a/aquatic_http_load_test/src/glommio.rs b/aquatic_http_load_test/src/glommio.rs new file mode 100644 index 0000000..5783a85 --- /dev/null +++ b/aquatic_http_load_test/src/glommio.rs @@ -0,0 +1,260 @@ +use std::{cell::RefCell, convert::TryInto, io::{Cursor, ErrorKind, Read}, rc::Rc, sync::{Arc, atomic::Ordering}, time::Duration}; + +use aquatic_http_protocol::response::Response; +use futures_lite::{AsyncReadExt, AsyncWriteExt}; +use glommio::{enclose, prelude::*, timer::TimerActionRepeat}; +use glommio::net::TcpStream; +use rand::{SeedableRng, prelude::SmallRng}; +use rustls::ClientConnection; + +use crate::{common::LoadTestState, config::Config, utils::create_random_request}; + +pub async fn run_socket_thread( + config: Config, + tls_config: Arc, + load_test_state: LoadTestState, +) -> anyhow::Result<()> { + let config = Rc::new(config); + let num_active_connections = Rc::new(RefCell::new(0usize)); + + TimerActionRepeat::repeat(enclose!((config, tls_config, load_test_state, num_active_connections) move || { + enclose!((config, tls_config, load_test_state, num_active_connections) move || async move { + if *num_active_connections.borrow() < config.num_connections { + spawn_local(async move { + if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await { + eprintln!("connection creation error: {:?}", err); + } + }).detach(); + } + + Some(Duration::from_secs(1)) + })() + })); + + futures_lite::future::pending::().await; + + Ok(()) +} + +struct Connection { + config: Rc, + load_test_state: LoadTestState, + rng: SmallRng, + stream: TcpStream, + tls: ClientConnection, + response_buffer: [u8; 2048], + response_buffer_position: usize, + send_new_request: bool, + queued_responses: usize, +} + +impl Connection { + async fn run( + config: Rc, + tls_config: Arc, + load_test_state: LoadTestState, + num_active_connections: Rc>, + ) -> anyhow::Result<()> { + let stream = TcpStream::connect(config.server_address).await + .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; + let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); + let rng = SmallRng::from_entropy(); + + let mut connection = Connection { + config, + load_test_state, + rng, + stream, + tls, + response_buffer: [0; 2048], + response_buffer_position: 0, + send_new_request: true, + queued_responses: 0, + }; + + *num_active_connections.borrow_mut() += 1; + + println!("run connection"); + + if let Err(err) = connection.run_connection_loop().await { + eprintln!("connection error: {:?}", err); + } + + *num_active_connections.borrow_mut() -= 1; + + Ok(()) + } + + async fn run_connection_loop(&mut self) -> anyhow::Result<()> { + loop { + if self.send_new_request { + let request = create_random_request(&self.config, &self.load_test_state, &mut self.rng); + + request.write(&mut self.tls.writer())?; + self.queued_responses += 1; + + self.send_new_request = false; + } + + self.write_tls().await?; + self.read_tls().await?; + } + } + + async fn read_tls(&mut self) -> anyhow::Result<()> { + loop { + let mut buf = [0u8; 1024]; + + let bytes_read = self.stream.read(&mut buf).await?; + + if bytes_read == 0 { + return Err(anyhow::anyhow!("Peer has closed connection")); + } + + self + .load_test_state + .statistics + .bytes_received + .fetch_add(bytes_read, Ordering::SeqCst); + + let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); + + let io_state = self.tls.process_new_packets()?; + + let mut added_plaintext = false; + + if io_state.plaintext_bytes_to_read() != 0 { + loop { + match self.tls.reader().read(&mut buf) { + Ok(0) => { + break; + } + Ok(amt) => { + let end = self.response_buffer_position + amt; + + if end > self.response_buffer.len() { + return Err(anyhow::anyhow!("response too large")); + } else { + let response_buffer_slice = &mut self.response_buffer[self.response_buffer_position..end]; + + response_buffer_slice.copy_from_slice(&buf[..amt]); + + self.response_buffer_position = end; + + added_plaintext = true; + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { + break; + } + } + } + } + + if added_plaintext { + let interesting_bytes = &self.response_buffer[..self.response_buffer_position]; + + let mut opt_body_start_index = None; + + for (i, chunk) in interesting_bytes.windows(4).enumerate() { + if chunk == b"\r\n\r\n" { + opt_body_start_index = Some(i + 4); + + break; + } + } + + if let Some(body_start_index) = opt_body_start_index { + let interesting_bytes = &interesting_bytes[body_start_index..]; + + match Response::from_bytes(interesting_bytes) { + Ok(response) => { + + match response { + Response::Announce(_) => { + self + .load_test_state + .statistics + .responses_announce + .fetch_add(1, Ordering::SeqCst); + } + Response::Scrape(_) => { + self + .load_test_state + .statistics + .responses_scrape + .fetch_add(1, Ordering::SeqCst); + } + Response::Failure(response) => { + self + .load_test_state + .statistics + .responses_failure + .fetch_add(1, Ordering::SeqCst); + println!( + "failure response: reason: {}", + response.failure_reason + ); + } + } + + self.response_buffer_position = 0; + self.send_new_request = true; + + break; + } + Err(err) => { + eprintln!( + "deserialize response error with {} bytes read: {:?}, text: {}", + self.response_buffer_position, + err, + String::from_utf8_lossy(interesting_bytes) + ); + } + } + } + } + + if self.tls.wants_write() { + break; + } + } + + Ok(()) + } + + async fn write_tls(&mut self) -> anyhow::Result<()> { + if !self.tls.wants_write() { + return Ok(()); + } + + let mut buf = Vec::new(); + let mut buf = Cursor::new(&mut buf); + + while self.tls.wants_write() { + self.tls.write_tls(&mut buf).unwrap(); + } + + let len = buf.get_ref().len(); + + self.stream.write_all(&buf.into_inner()).await?; + self.stream.flush().await?; + + self + .load_test_state + .statistics + .bytes_sent + .fetch_add(len, Ordering::SeqCst); + + if self.queued_responses != 0 { + self.load_test_state.statistics.requests.fetch_add(self.queued_responses, Ordering::SeqCst); + + self.queued_responses = 0; + } + + Ok(()) + } +} \ No newline at end of file diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index a23be10..efe3f6b 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -1,12 +1,16 @@ +use std::fs::File; +use std::io::BufReader; use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use ::glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Pareto; mod common; mod config; +mod glommio; mod network; mod utils; @@ -53,11 +57,16 @@ fn run(config: Config) -> ::anyhow::Result<()> { // Start socket workers + let tls_config = create_tls_config().unwrap(); + for _ in 0..config.num_workers { let config = config.clone(); + let tls_config = tls_config.clone(); let state = state.clone(); - thread::spawn(move || run_socket_thread(&config, state, 1)); + LocalExecutorBuilder::default().spawn(|| async move { + glommio::run_socket_thread(config, tls_config, state).await.unwrap(); + }).unwrap(); } monitor_statistics(state, &config); @@ -65,6 +74,33 @@ fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } +struct FakeCertificateVerifier; + +impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} + +fn create_tls_config() -> anyhow::Result> { + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(rustls::RootCertStore::empty()) + .with_no_client_auth(); + + config.dangerous().set_certificate_verifier(Arc::new(FakeCertificateVerifier)); + + Ok(Arc::new(config)) +} + fn monitor_statistics(state: LoadTestState, config: &Config) { let start_time = Instant::now(); let mut report_avg_response_vec: Vec = Vec::new(); diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 6ddc19a..bbd9b6c 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,4 +1,6 @@ +use std::convert::TryInto; use std::io::{Cursor, ErrorKind, Read, Write}; +use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; @@ -12,7 +14,9 @@ use crate::utils::create_random_request; pub struct Connection { stream: TcpStream, + tls: rustls::ClientConnection, read_buffer: [u8; 4096], + response_buffer: Vec, bytes_read: usize, can_send: bool, } @@ -20,11 +24,13 @@ pub struct Connection { impl Connection { pub fn create_and_register( config: &Config, + tls_config: Arc, connections: &mut ConnectionMap, poll: &mut Poll, token_counter: &mut usize, ) -> anyhow::Result<()> { let mut stream = TcpStream::connect(config.server_address)?; + let tls = rustls::ClientConnection::new(tls_config, "example.com".try_into().unwrap())?; poll.registry() .register(&mut stream, Token(*token_counter), Interest::READABLE) @@ -32,7 +38,9 @@ impl Connection { let connection = Connection { stream, + tls, read_buffer: [0; 4096], + response_buffer: Vec::new(), bytes_read: 0, can_send: true, }; @@ -58,7 +66,23 @@ impl Connection { Ok(bytes_read) => { self.bytes_read += bytes_read; - let interesting_bytes = &self.read_buffer[..self.bytes_read]; + let mut interesting_bytes = &self.read_buffer[..self.bytes_read]; + + self.tls.read_tls(&mut interesting_bytes as &mut dyn std::io::Read).unwrap(); + + let io_state = self.tls.process_new_packets().unwrap(); + + if io_state.plaintext_bytes_to_read() == 0 { + while self.tls.wants_write(){ + self.tls.write_tls(&mut self.stream).unwrap(); + } + + break false; + } + + self.tls.reader().read_to_end(&mut self.response_buffer).unwrap(); + + let interesting_bytes = &self.response_buffer[..]; let mut opt_body_start_index = None; @@ -166,7 +190,13 @@ impl Connection { state: &LoadTestState, request: &[u8], ) -> ::std::io::Result<()> { - let bytes_sent = self.stream.write(request)?; + self.tls.writer().write(request)?; + + let mut bytes_sent = 0; + + while self.tls.wants_write(){ + bytes_sent += self.tls.write_tls(&mut self.stream)?; + } state .statistics @@ -185,7 +215,12 @@ impl Connection { pub type ConnectionMap = HashMap; -pub fn run_socket_thread(config: &Config, state: LoadTestState, num_initial_requests: usize) { +pub fn run_socket_thread( + config: &Config, + tls_config: Arc, + state: LoadTestState, + num_initial_requests: usize +) { let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); let create_conn_interval = 2 ^ config.network.connection_creation_interval; @@ -199,7 +234,7 @@ pub fn run_socket_thread(config: &Config, state: LoadTestState, num_initial_requ let mut token_counter = 0usize; for _ in 0..num_initial_requests { - Connection::create_and_register(config, &mut connections, &mut poll, &mut token_counter) + Connection::create_and_register(config, tls_config.clone(), &mut connections, &mut poll, &mut token_counter) .unwrap(); } @@ -256,6 +291,7 @@ pub fn run_socket_thread(config: &Config, state: LoadTestState, num_initial_requ for _ in 0..num_to_create { let ok = Connection::create_and_register( config, + tls_config.clone(), &mut connections, &mut poll, &mut token_counter,