From 839f516dcbc1b63a87512ea1620482b64208a7e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 11:38:11 +0100 Subject: [PATCH] ws load test: rewrite with glommio and futures-rustls --- Cargo.lock | 7 +- aquatic_ws_load_test/Cargo.toml | 7 +- aquatic_ws_load_test/src/main.rs | 40 ++- aquatic_ws_load_test/src/network.rs | 415 ++++++++++------------------ 4 files changed, 197 insertions(+), 272 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e415e3..0735770 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,16 +267,19 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_ws_protocol", + "async-tungstenite", + "futures", + "futures-rustls", + "glommio", "hashbrown 0.11.2", "mimalloc", - "mio", "quickcheck", "quickcheck_macros", "rand", "rand_distr", + "rustls", "serde", "serde_json", - "slab", "tungstenite", ] diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index abf14ab..581f1de 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -11,16 +11,19 @@ name = "aquatic_ws_load_test" [dependencies] anyhow = "1" +async-tungstenite = "0.15" aquatic_cli_helpers = "0.1.0" aquatic_ws_protocol = "0.1.0" +futures = "0.3" +futures-rustls = "0.22" +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } hashbrown = { version = "0.11.2", features = ["serde"] } mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" +rustls = { version = "0.20", features = ["dangerous_configuration"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -slab = "0.4" tungstenite = "0.15" [dev-dependencies] diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 27c51a8..241a136 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Pareto; @@ -48,11 +49,18 @@ fn run(config: Config) -> ::anyhow::Result<()> { pareto: Arc::new(pareto), }; + let tls_config = create_tls_config().unwrap(); + for _ in 0..config.num_workers { let config = config.clone(); + let tls_config = tls_config.clone(); let state = state.clone(); - thread::spawn(move || run_socket_thread(&config, state)); + LocalExecutorBuilder::default() + .spawn(|| async move { + run_socket_thread(config, tls_config, state).await.unwrap(); + }) + .unwrap(); } monitor_statistics(state, &config); @@ -60,6 +68,36 @@ fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } +struct FakeCertificateVerifier; + +impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} + +fn create_tls_config() -> anyhow::Result> { + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(rustls::RootCertStore::empty()) + .with_no_client_auth(); + + config + .dangerous() + .set_certificate_verifier(Arc::new(FakeCertificateVerifier)); + + Ok(Arc::new(config)) +} + + fn monitor_statistics(state: LoadTestState, config: &Config) { let start_time = Instant::now(); let mut report_avg_response_vec: Vec = Vec::new(); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 0a6cf8c..6db1850 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -1,307 +1,188 @@ -use std::io::ErrorKind; -use std::sync::atomic::Ordering; -use std::time::Duration; +use std::{ + cell::RefCell, + convert::TryInto, + rc::Rc, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; -use hashbrown::HashMap; -use mio::{net::TcpStream, Events, Interest, Poll, Token}; -use rand::{prelude::*, rngs::SmallRng}; -use tungstenite::{handshake::MidHandshake, ClientHandshake, HandshakeError, WebSocket}; +use aquatic_ws_protocol::{OfferId, OutMessage, PeerId}; +use async_tungstenite::{WebSocketStream, client_async}; +use futures::{StreamExt, SinkExt}; +use futures_rustls::{TlsConnector, client::TlsStream}; +use glommio::net::TcpStream; +use glommio::{prelude::*, timer::TimerActionRepeat}; +use rand::{Rng, SeedableRng, prelude::SmallRng}; -use crate::common::*; -use crate::config::*; -use crate::utils::create_random_request; +use crate::{common::LoadTestState, config::Config, utils::create_random_request}; -// Allow large enum variant WebSocket because it should be very common -#[allow(clippy::large_enum_variant)] -pub enum ConnectionState { - TcpStream(TcpStream), - WebSocket(WebSocket), - MidHandshake(MidHandshake>), +pub async fn run_socket_thread( + config: Config, + tls_config: Arc, + load_test_state: LoadTestState, +) -> anyhow::Result<()> { + let config = Rc::new(config); + let num_active_connections = Rc::new(RefCell::new(0usize)); + + TimerActionRepeat::repeat(move || { + periodically_open_connections( + config.clone(), + tls_config.clone(), + load_test_state.clone(), + num_active_connections.clone(), + ) + }); + + futures::future::pending::().await; + + Ok(()) } -impl ConnectionState { - fn advance(self, config: &Config) -> Option { - match self { - Self::TcpStream(stream) => { - let req = format!( - "ws://{}:{}", - config.server_address.ip(), - config.server_address.port() - ); - - match ::tungstenite::client(req, stream) { - Ok((ws, _)) => Some(ConnectionState::WebSocket(ws)), - Err(HandshakeError::Interrupted(handshake)) => { - Some(ConnectionState::MidHandshake(handshake)) - } - Err(HandshakeError::Failure(err)) => { - eprintln!("handshake error: {:?}", err); - - None - } - } +async fn periodically_open_connections( + config: Rc, + tls_config: Arc, + load_test_state: LoadTestState, + num_active_connections: Rc>, +) -> Option { + if *num_active_connections.borrow() < config.num_connections { + spawn_local(async move { + if let Err(err) = + Connection::run(config, tls_config, load_test_state, num_active_connections).await + { + eprintln!("connection creation error: {:?}", err); } - Self::MidHandshake(handshake) => match handshake.handshake() { - Ok((ws, _)) => Some(ConnectionState::WebSocket(ws)), - Err(HandshakeError::Interrupted(handshake)) => { - Some(ConnectionState::MidHandshake(handshake)) - } - Err(HandshakeError::Failure(err)) => { - eprintln!("handshake error: {:?}", err); - - None - } - }, - Self::WebSocket(ws) => Some(Self::WebSocket(ws)), - } + }) + .detach(); } + + Some(Duration::from_secs(1)) } -pub struct Connection { - stream: ConnectionState, - peer_id: PeerId, +struct Connection { + config: Rc, + load_test_state: LoadTestState, + rng: SmallRng, can_send: bool, + peer_id: PeerId, send_answer: Option<(PeerId, OfferId)>, + stream: WebSocketStream>, } impl Connection { - pub fn create_and_register( - config: &Config, - rng: &mut impl Rng, - connections: &mut ConnectionMap, - poll: &mut Poll, - token_counter: &mut usize, + async fn run( + config: Rc, + tls_config: Arc, + load_test_state: LoadTestState, + num_active_connections: Rc>, ) -> anyhow::Result<()> { - let mut stream = TcpStream::connect(config.server_address)?; + let mut rng = SmallRng::from_entropy(); + let peer_id = PeerId(rng.gen()); + let stream = TcpStream::connect(config.server_address) + .await + .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; + let stream = TlsConnector::from(tls_config).connect("example.com".try_into().unwrap(), stream).await?; + let request = format!( + "ws://{}:{}", + config.server_address.ip(), + config.server_address.port() + ); + let (stream, _) = client_async(request, stream).await?; - poll.registry() - .register( - &mut stream, - Token(*token_counter), - Interest::READABLE | Interest::WRITABLE, - ) - .unwrap(); - - let connection = Connection { - stream: ConnectionState::TcpStream(stream), - peer_id: PeerId(rng.gen()), - can_send: false, + let mut connection = Connection { + config, + load_test_state, + rng, + stream, + can_send: true, + peer_id, send_answer: None, }; - connections.insert(*token_counter, connection); + *num_active_connections.borrow_mut() += 1; - *token_counter += 1; + println!("run connection"); + + if let Err(err) = connection.run_connection_loop().await { + eprintln!("connection error: {:?}", err); + } + + *num_active_connections.borrow_mut() -= 1; Ok(()) } - pub fn advance(self, config: &Config) -> Option { - if let Some(stream) = self.stream.advance(config) { - let can_send = matches!(stream, ConnectionState::WebSocket(_)); + async fn run_connection_loop(&mut self) -> anyhow::Result<()> { + loop { + if self.can_send { + let request = + create_random_request(&self.config, &self.load_test_state, &mut self.rng, self.peer_id).to_ws_message(); - Some(Self { - stream, - peer_id: self.peer_id, - can_send, - send_answer: None, - }) - } else { - None + self.stream.send(request).await?; + self.stream.flush().await?; + + self.load_test_state + .statistics + .requests + .fetch_add(1, Ordering::SeqCst); + + self.can_send = false; + } + + self.read_message().await?; } } - pub fn read_responses(&mut self, state: &LoadTestState) -> bool { - // bool = drop connection - if let ConnectionState::WebSocket(ref mut ws) = self.stream { - loop { - match ws.read_message() { - Ok(message) => match OutMessage::from_ws_message(message) { - Ok(OutMessage::Offer(offer)) => { - state - .statistics - .responses_offer - .fetch_add(1, Ordering::SeqCst); + async fn read_message(&mut self) -> anyhow::Result<()> { + match OutMessage::from_ws_message(self.stream.next().await.unwrap()?) { + Ok(OutMessage::Offer(offer)) => { + self.load_test_state + .statistics + .responses_offer + .fetch_add(1, Ordering::SeqCst); - self.send_answer = Some((offer.peer_id, offer.offer_id)); + self.send_answer = Some((offer.peer_id, offer.offer_id)); - self.can_send = true; - } - Ok(OutMessage::Answer(_)) => { - state - .statistics - .responses_answer - .fetch_add(1, Ordering::SeqCst); + self.can_send = true; + } + Ok(OutMessage::Answer(_)) => { + self.load_test_state + .statistics + .responses_answer + .fetch_add(1, Ordering::SeqCst); - self.can_send = true; - } - Ok(OutMessage::AnnounceResponse(_)) => { - state - .statistics - .responses_announce - .fetch_add(1, Ordering::SeqCst); + self.can_send = true; + } + Ok(OutMessage::AnnounceResponse(_)) => { + self.load_test_state + .statistics + .responses_announce + .fetch_add(1, Ordering::SeqCst); - self.can_send = true; - } - Ok(OutMessage::ScrapeResponse(_)) => { - state - .statistics - .responses_scrape - .fetch_add(1, Ordering::SeqCst); + self.can_send = true; + } + Ok(OutMessage::ScrapeResponse(_)) => { + self.load_test_state + .statistics + .responses_scrape + .fetch_add(1, Ordering::SeqCst); - self.can_send = true; - } - Ok(OutMessage::ErrorResponse(response)) => { - state - .statistics - .responses_error - .fetch_add(1, Ordering::SeqCst); + self.can_send = true; + } + Ok(OutMessage::ErrorResponse(response)) => { + self.load_test_state + .statistics + .responses_error + .fetch_add(1, Ordering::SeqCst); - eprintln!("received error response: {:?}", response.failure_reason); + eprintln!("received error response: {:?}", response.failure_reason); - self.can_send = true; - } - Err(err) => { - eprintln!("error deserializing offer: {:?}", err); - } - }, - Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { - return false; - } - Err(_) => { - return true; - } - } + self.can_send = true; + } + Err(err) => { + eprintln!("error deserializing offer: {:?}", err); } } - false - } - - pub fn send_request( - &mut self, - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - ) -> bool { - // bool = remove connection - if !self.can_send { - return false; - } - - if let ConnectionState::WebSocket(ref mut ws) = self.stream { - let request = create_random_request(&config, &state, rng, self.peer_id); - - // If self.send_answer is set and request is announce request, make - // the request an offer answer - let request = if let InMessage::AnnounceRequest(mut r) = request { - if let Some((peer_id, offer_id)) = self.send_answer { - r.to_peer_id = Some(peer_id); - r.offer_id = Some(offer_id); - r.answer = Some(JsonValue(::serde_json::json!( - {"sdp": "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-"} - ))); - r.event = None; - r.offers = None; - } - - self.send_answer = None; - - InMessage::AnnounceRequest(r) - } else { - request - }; - - match ws.write_message(request.to_ws_message()) { - Ok(()) => { - state.statistics.requests.fetch_add(1, Ordering::SeqCst); - - self.can_send = false; - - false - } - Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => false, - Err(_) => true, - } - } else { - println!("send request can't send to non-ws stream"); - - false - } - } -} - -pub type ConnectionMap = HashMap; - -pub fn run_socket_thread(config: &Config, state: LoadTestState) { - let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); - let create_conn_interval = 2 ^ config.network.connection_creation_interval; - - let mut connections: ConnectionMap = HashMap::with_capacity(config.num_connections); - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut rng = SmallRng::from_entropy(); - - let mut token_counter = 0usize; - let mut iter_counter = 0usize; - - let mut drop_keys = Vec::new(); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - for event in events.iter() { - let token = event.token(); - - if event.is_readable() { - if let Some(connection) = connections.get_mut(&token.0) { - if let ConnectionState::WebSocket(_) = connection.stream { - let drop_connection = connection.read_responses(&state); - - if drop_connection { - connections.remove(&token.0); - } - - continue; - } - } - } - - if let Some(connection) = connections.remove(&token.0) { - if let Some(connection) = connection.advance(config) { - connections.insert(token.0, connection); - } - } - } - - for (k, connection) in connections.iter_mut() { - let drop_connection = connection.send_request(config, &state, &mut rng); - - if drop_connection { - drop_keys.push(*k) - } - } - - for k in drop_keys.drain(..) { - connections.remove(&k); - } - - // Slowly create new connections - if connections.len() < config.num_connections && iter_counter % create_conn_interval == 0 { - let res = Connection::create_and_register( - config, - &mut rng, - &mut connections, - &mut poll, - &mut token_counter, - ); - - if let Err(err) = res { - eprintln!("create connection error: {}", err); - } - } - - iter_counter = iter_counter.wrapping_add(1); + Ok(()) } }