aquatic_http_load_test: refactor timer code; panic on an error case

This commit is contained in:
Joakim Frostegård 2021-10-28 01:21:47 +02:00
parent e458cc54db
commit 80447d9d1b

View file

@ -2,7 +2,7 @@ use std::{cell::RefCell, convert::TryInto, io::{Cursor, ErrorKind, Read}, rc::Rc
use aquatic_http_protocol::response::Response;
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use glommio::{enclose, prelude::*, timer::TimerActionRepeat};
use glommio::{prelude::*, timer::TimerActionRepeat};
use glommio::net::TcpStream;
use rand::{SeedableRng, prelude::SmallRng};
use rustls::ClientConnection;
@ -17,25 +17,35 @@ pub async fn run_socket_thread(
let config = Rc::new(config);
let num_active_connections = Rc::new(RefCell::new(0usize));
TimerActionRepeat::repeat(enclose!((config, tls_config, load_test_state, num_active_connections) move || {
enclose!((config, tls_config, load_test_state, num_active_connections) move || async move {
if *num_active_connections.borrow() < config.num_connections {
spawn_local(async move {
if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await {
eprintln!("connection creation error: {:?}", err);
}
}).detach();
}
Some(Duration::from_secs(1))
})()
}));
TimerActionRepeat::repeat(move || periodically_open_connections(
config.clone(),
tls_config.clone(),
load_test_state.clone(),
num_active_connections.clone())
);
futures_lite::future::pending::<bool>().await;
Ok(())
}
async fn periodically_open_connections(
config: Rc<Config>,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
) -> Option<Duration> {
if *num_active_connections.borrow() < config.num_connections {
spawn_local(async move {
if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await {
eprintln!("connection creation error: {:?}", err);
}
}).detach();
}
Some(Duration::from_secs(1))
}
struct Connection {
config: Rc<Config>,
load_test_state: LoadTestState,
@ -148,7 +158,7 @@ impl Connection {
break;
}
Err(err) => {
break;
panic!("tls.reader().read: {}", err);
}
}
}