ws load test: rewrite with glommio and futures-rustls

This commit is contained in:
Joakim Frostegård 2021-11-02 11:38:11 +01:00
parent eb2c294300
commit 839f516dcb
4 changed files with 197 additions and 272 deletions

View file

@ -11,16 +11,19 @@ name = "aquatic_ws_load_test"
[dependencies]
anyhow = "1"
async-tungstenite = "0.15"
aquatic_cli_helpers = "0.1.0"
aquatic_ws_protocol = "0.1.0"
futures = "0.3"
futures-rustls = "0.22"
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" }
hashbrown = { version = "0.11.2", features = ["serde"] }
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"
rustls = { version = "0.20", features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
slab = "0.4"
tungstenite = "0.15"
[dev-dependencies]

View file

@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc};
use std::thread;
use std::time::{Duration, Instant};
use glommio::LocalExecutorBuilder;
use rand::prelude::*;
use rand_distr::Pareto;
@ -48,11 +49,18 @@ fn run(config: Config) -> ::anyhow::Result<()> {
pareto: Arc::new(pareto),
};
let tls_config = create_tls_config().unwrap();
for _ in 0..config.num_workers {
let config = config.clone();
let tls_config = tls_config.clone();
let state = state.clone();
thread::spawn(move || run_socket_thread(&config, state));
LocalExecutorBuilder::default()
.spawn(|| async move {
run_socket_thread(config, tls_config, state).await.unwrap();
})
.unwrap();
}
monitor_statistics(state, &config);
@ -60,6 +68,36 @@ fn run(config: Config) -> ::anyhow::Result<()> {
Ok(())
}
struct FakeCertificateVerifier;
impl rustls::client::ServerCertVerifier for FakeCertificateVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}
fn create_tls_config() -> anyhow::Result<Arc<rustls::ClientConfig>> {
let mut config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth();
config
.dangerous()
.set_certificate_verifier(Arc::new(FakeCertificateVerifier));
Ok(Arc::new(config))
}
fn monitor_statistics(state: LoadTestState, config: &Config) {
let start_time = Instant::now();
let mut report_avg_response_vec: Vec<f64> = Vec::new();

View file

@ -1,307 +1,188 @@
use std::io::ErrorKind;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{
cell::RefCell,
convert::TryInto,
rc::Rc,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use hashbrown::HashMap;
use mio::{net::TcpStream, Events, Interest, Poll, Token};
use rand::{prelude::*, rngs::SmallRng};
use tungstenite::{handshake::MidHandshake, ClientHandshake, HandshakeError, WebSocket};
use aquatic_ws_protocol::{OfferId, OutMessage, PeerId};
use async_tungstenite::{WebSocketStream, client_async};
use futures::{StreamExt, SinkExt};
use futures_rustls::{TlsConnector, client::TlsStream};
use glommio::net::TcpStream;
use glommio::{prelude::*, timer::TimerActionRepeat};
use rand::{Rng, SeedableRng, prelude::SmallRng};
use crate::common::*;
use crate::config::*;
use crate::utils::create_random_request;
use crate::{common::LoadTestState, config::Config, utils::create_random_request};
// Allow large enum variant WebSocket because it should be very common
#[allow(clippy::large_enum_variant)]
pub enum ConnectionState {
TcpStream(TcpStream),
WebSocket(WebSocket<TcpStream>),
MidHandshake(MidHandshake<ClientHandshake<TcpStream>>),
pub async fn run_socket_thread(
config: Config,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
) -> anyhow::Result<()> {
let config = Rc::new(config);
let num_active_connections = Rc::new(RefCell::new(0usize));
TimerActionRepeat::repeat(move || {
periodically_open_connections(
config.clone(),
tls_config.clone(),
load_test_state.clone(),
num_active_connections.clone(),
)
});
futures::future::pending::<bool>().await;
Ok(())
}
impl ConnectionState {
fn advance(self, config: &Config) -> Option<Self> {
match self {
Self::TcpStream(stream) => {
let req = format!(
"ws://{}:{}",
config.server_address.ip(),
config.server_address.port()
);
match ::tungstenite::client(req, stream) {
Ok((ws, _)) => Some(ConnectionState::WebSocket(ws)),
Err(HandshakeError::Interrupted(handshake)) => {
Some(ConnectionState::MidHandshake(handshake))
}
Err(HandshakeError::Failure(err)) => {
eprintln!("handshake error: {:?}", err);
None
}
}
async fn periodically_open_connections(
config: Rc<Config>,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
) -> Option<Duration> {
if *num_active_connections.borrow() < config.num_connections {
spawn_local(async move {
if let Err(err) =
Connection::run(config, tls_config, load_test_state, num_active_connections).await
{
eprintln!("connection creation error: {:?}", err);
}
Self::MidHandshake(handshake) => match handshake.handshake() {
Ok((ws, _)) => Some(ConnectionState::WebSocket(ws)),
Err(HandshakeError::Interrupted(handshake)) => {
Some(ConnectionState::MidHandshake(handshake))
}
Err(HandshakeError::Failure(err)) => {
eprintln!("handshake error: {:?}", err);
None
}
},
Self::WebSocket(ws) => Some(Self::WebSocket(ws)),
}
})
.detach();
}
Some(Duration::from_secs(1))
}
pub struct Connection {
stream: ConnectionState,
peer_id: PeerId,
struct Connection {
config: Rc<Config>,
load_test_state: LoadTestState,
rng: SmallRng,
can_send: bool,
peer_id: PeerId,
send_answer: Option<(PeerId, OfferId)>,
stream: WebSocketStream<TlsStream<TcpStream>>,
}
impl Connection {
pub fn create_and_register(
config: &Config,
rng: &mut impl Rng,
connections: &mut ConnectionMap,
poll: &mut Poll,
token_counter: &mut usize,
async fn run(
config: Rc<Config>,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
) -> anyhow::Result<()> {
let mut stream = TcpStream::connect(config.server_address)?;
let mut rng = SmallRng::from_entropy();
let peer_id = PeerId(rng.gen());
let stream = TcpStream::connect(config.server_address)
.await
.map_err(|err| anyhow::anyhow!("connect: {:?}", err))?;
let stream = TlsConnector::from(tls_config).connect("example.com".try_into().unwrap(), stream).await?;
let request = format!(
"ws://{}:{}",
config.server_address.ip(),
config.server_address.port()
);
let (stream, _) = client_async(request, stream).await?;
poll.registry()
.register(
&mut stream,
Token(*token_counter),
Interest::READABLE | Interest::WRITABLE,
)
.unwrap();
let connection = Connection {
stream: ConnectionState::TcpStream(stream),
peer_id: PeerId(rng.gen()),
can_send: false,
let mut connection = Connection {
config,
load_test_state,
rng,
stream,
can_send: true,
peer_id,
send_answer: None,
};
connections.insert(*token_counter, connection);
*num_active_connections.borrow_mut() += 1;
*token_counter += 1;
println!("run connection");
if let Err(err) = connection.run_connection_loop().await {
eprintln!("connection error: {:?}", err);
}
*num_active_connections.borrow_mut() -= 1;
Ok(())
}
pub fn advance(self, config: &Config) -> Option<Self> {
if let Some(stream) = self.stream.advance(config) {
let can_send = matches!(stream, ConnectionState::WebSocket(_));
async fn run_connection_loop(&mut self) -> anyhow::Result<()> {
loop {
if self.can_send {
let request =
create_random_request(&self.config, &self.load_test_state, &mut self.rng, self.peer_id).to_ws_message();
Some(Self {
stream,
peer_id: self.peer_id,
can_send,
send_answer: None,
})
} else {
None
self.stream.send(request).await?;
self.stream.flush().await?;
self.load_test_state
.statistics
.requests
.fetch_add(1, Ordering::SeqCst);
self.can_send = false;
}
self.read_message().await?;
}
}
pub fn read_responses(&mut self, state: &LoadTestState) -> bool {
// bool = drop connection
if let ConnectionState::WebSocket(ref mut ws) = self.stream {
loop {
match ws.read_message() {
Ok(message) => match OutMessage::from_ws_message(message) {
Ok(OutMessage::Offer(offer)) => {
state
.statistics
.responses_offer
.fetch_add(1, Ordering::SeqCst);
async fn read_message(&mut self) -> anyhow::Result<()> {
match OutMessage::from_ws_message(self.stream.next().await.unwrap()?) {
Ok(OutMessage::Offer(offer)) => {
self.load_test_state
.statistics
.responses_offer
.fetch_add(1, Ordering::SeqCst);
self.send_answer = Some((offer.peer_id, offer.offer_id));
self.send_answer = Some((offer.peer_id, offer.offer_id));
self.can_send = true;
}
Ok(OutMessage::Answer(_)) => {
state
.statistics
.responses_answer
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::Answer(_)) => {
self.load_test_state
.statistics
.responses_answer
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::AnnounceResponse(_)) => {
state
.statistics
.responses_announce
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::AnnounceResponse(_)) => {
self.load_test_state
.statistics
.responses_announce
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::ScrapeResponse(_)) => {
state
.statistics
.responses_scrape
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::ScrapeResponse(_)) => {
self.load_test_state
.statistics
.responses_scrape
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::ErrorResponse(response)) => {
state
.statistics
.responses_error
.fetch_add(1, Ordering::SeqCst);
self.can_send = true;
}
Ok(OutMessage::ErrorResponse(response)) => {
self.load_test_state
.statistics
.responses_error
.fetch_add(1, Ordering::SeqCst);
eprintln!("received error response: {:?}", response.failure_reason);
eprintln!("received error response: {:?}", response.failure_reason);
self.can_send = true;
}
Err(err) => {
eprintln!("error deserializing offer: {:?}", err);
}
},
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {
return false;
}
Err(_) => {
return true;
}
}
self.can_send = true;
}
Err(err) => {
eprintln!("error deserializing offer: {:?}", err);
}
}
false
}
pub fn send_request(
&mut self,
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
) -> bool {
// bool = remove connection
if !self.can_send {
return false;
}
if let ConnectionState::WebSocket(ref mut ws) = self.stream {
let request = create_random_request(&config, &state, rng, self.peer_id);
// If self.send_answer is set and request is announce request, make
// the request an offer answer
let request = if let InMessage::AnnounceRequest(mut r) = request {
if let Some((peer_id, offer_id)) = self.send_answer {
r.to_peer_id = Some(peer_id);
r.offer_id = Some(offer_id);
r.answer = Some(JsonValue(::serde_json::json!(
{"sdp": "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-"}
)));
r.event = None;
r.offers = None;
}
self.send_answer = None;
InMessage::AnnounceRequest(r)
} else {
request
};
match ws.write_message(request.to_ws_message()) {
Ok(()) => {
state.statistics.requests.fetch_add(1, Ordering::SeqCst);
self.can_send = false;
false
}
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => false,
Err(_) => true,
}
} else {
println!("send request can't send to non-ws stream");
false
}
}
}
pub type ConnectionMap = HashMap<usize, Connection>;
pub fn run_socket_thread(config: &Config, state: LoadTestState) {
let timeout = Duration::from_micros(config.network.poll_timeout_microseconds);
let create_conn_interval = 2 ^ config.network.connection_creation_interval;
let mut connections: ConnectionMap = HashMap::with_capacity(config.num_connections);
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut rng = SmallRng::from_entropy();
let mut token_counter = 0usize;
let mut iter_counter = 0usize;
let mut drop_keys = Vec::new();
loop {
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for event in events.iter() {
let token = event.token();
if event.is_readable() {
if let Some(connection) = connections.get_mut(&token.0) {
if let ConnectionState::WebSocket(_) = connection.stream {
let drop_connection = connection.read_responses(&state);
if drop_connection {
connections.remove(&token.0);
}
continue;
}
}
}
if let Some(connection) = connections.remove(&token.0) {
if let Some(connection) = connection.advance(config) {
connections.insert(token.0, connection);
}
}
}
for (k, connection) in connections.iter_mut() {
let drop_connection = connection.send_request(config, &state, &mut rng);
if drop_connection {
drop_keys.push(*k)
}
}
for k in drop_keys.drain(..) {
connections.remove(&k);
}
// Slowly create new connections
if connections.len() < config.num_connections && iter_counter % create_conn_interval == 0 {
let res = Connection::create_and_register(
config,
&mut rng,
&mut connections,
&mut poll,
&mut token_counter,
);
if let Err(err) = res {
eprintln!("create connection error: {}", err);
}
}
iter_counter = iter_counter.wrapping_add(1);
Ok(())
}
}