From 0e61744443bb25f156ec1e316f7a7145f2f955cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 11 Nov 2021 17:50:52 +0100 Subject: [PATCH 1/6] http load test: connection open interval setting, other improvements --- Cargo.lock | 1 + aquatic_http_load_test/Cargo.toml | 1 + aquatic_http_load_test/src/config.rs | 9 ++++- aquatic_http_load_test/src/network.rs | 47 +++++++++++++++++++-------- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c1aa85a..a27992b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,6 +139,7 @@ dependencies = [ "futures-lite", "glommio", "hashbrown 0.11.2", + "log", "mimalloc", "quickcheck", "quickcheck_macros", diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index a33fdeb..889de16 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -20,6 +20,7 @@ aquatic_http_protocol = "0.1.0" futures-lite = "1" hashbrown = "0.11.2" glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } +log = "0.4" mimalloc = { version = "0.1", default-features = false } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 78b3615..3352957 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -9,7 +9,13 @@ pub struct Config { pub server_address: SocketAddr, pub log_level: LogLevel, pub num_workers: usize, + /// Maximum number of connections to keep open pub num_connections: usize, + /// How often to check if num_connections connections are open, and + /// open a new one otherwise. A value of 0 means that connections are + /// opened as quickly as possible, which is useful when the tracker + /// doesn't keep connections alive. + pub connection_creation_interval_ms: u64, pub duration: usize, pub torrents: TorrentConfig, #[cfg(feature = "cpu-pinning")] @@ -46,7 +52,8 @@ impl Default for Config { server_address: "127.0.0.1:3000".parse().unwrap(), log_level: LogLevel::Error, num_workers: 1, - num_connections: 8, + num_connections: 128, + connection_creation_interval_ms: 10, duration: 0, torrents: TorrentConfig::default(), #[cfg(feature = "cpu-pinning")] diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index aaa15c0..fdae866 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -24,14 +24,36 @@ pub async fn run_socket_thread( let config = Rc::new(config); let num_active_connections = Rc::new(RefCell::new(0usize)); - TimerActionRepeat::repeat(move || { - periodically_open_connections( - config.clone(), - tls_config.clone(), - load_test_state.clone(), - num_active_connections.clone(), - ) - }); + let interval = config.connection_creation_interval_ms; + + if interval == 0 { + loop { + if *num_active_connections.borrow() < config.num_connections { + if let Err(err) = Connection::run( + config.clone(), + tls_config.clone(), + load_test_state.clone(), + num_active_connections.clone(), + ) + .await + { + ::log::error!("connection creation error: {:?}", err); + } + } + } + } else { + let interval = Duration::from_millis(interval); + + TimerActionRepeat::repeat(move || { + periodically_open_connections( + config.clone(), + interval, + tls_config.clone(), + load_test_state.clone(), + num_active_connections.clone(), + ) + }); + } futures_lite::future::pending::().await; @@ -40,6 +62,7 @@ pub async fn run_socket_thread( async fn periodically_open_connections( config: Rc, + interval: Duration, tls_config: Arc, load_test_state: LoadTestState, num_active_connections: Rc>, @@ -49,13 +72,13 @@ async fn periodically_open_connections( if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await { - eprintln!("connection creation error: {:?}", err); + ::log::error!("connection creation error: {:?}", err); } }) .detach(); } - Some(Duration::from_secs(1)) + Some(interval) } struct Connection { @@ -97,10 +120,8 @@ impl Connection { *num_active_connections.borrow_mut() += 1; - println!("run connection"); - if let Err(err) = connection.run_connection_loop().await { - eprintln!("connection error: {:?}", err); + ::log::info!("connection error: {:?}", err); } *num_active_connections.borrow_mut() -= 1; From 40df1b587fa6addc20d2b1d8d276be00cf8d0da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 11 Nov 2021 17:53:02 +0100 Subject: [PATCH 2/6] run cargo fmt --- aquatic_http/src/lib/lib.rs | 17 ++++++++--------- aquatic_ws/src/lib/lib.rs | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index cb27f9f..4c1f478 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -1,17 +1,16 @@ +#[cfg(feature = "cpu-pinning")] +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; +use aquatic_common::{ + access_list::update_access_list, privileges::drop_privileges_after_socket_binding, +}; +use common::{State, TlsConfig}; +use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; use std::{ fs::File, io::BufReader, sync::{atomic::AtomicUsize, Arc}, }; -use aquatic_common::{ - access_list::update_access_list, - privileges::drop_privileges_after_socket_binding, -}; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use common::{State, TlsConfig}; -use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; use crate::config::Config; diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 614ad95..3ae27b6 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -1,6 +1,6 @@ +use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::access_list::update_access_list; use cfg_if::cfg_if; use signal_hook::{consts::SIGUSR1, iterator::Signals}; From 5368dce9f0d1d1ec22aa048c8cbc647a2943f99a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 11 Nov 2021 17:59:31 +0100 Subject: [PATCH 3/6] http: name worker threads --- aquatic_http/src/lib/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 4c1f478..48ade1c 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -76,7 +76,9 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let executor = LocalExecutorBuilder::default().spawn(move || async move { + let builder = LocalExecutorBuilder::default().name("socket"); + + let executor = builder.spawn(move || async move { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -104,7 +106,9 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let executor = LocalExecutorBuilder::default().spawn(move || async move { + let builder = LocalExecutorBuilder::default().name("request"); + + let executor = builder.spawn(move || async move { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, From 321add8455e5d1634e0eb6503c99b32857450697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 11 Nov 2021 19:59:21 +0100 Subject: [PATCH 4/6] udp glommio, ws: give worker threads names --- aquatic_udp/src/lib/glommio/mod.rs | 8 ++++++-- aquatic_ws/src/lib/glommio/mod.rs | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 058f5e9..8cc8d27 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -67,7 +67,9 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let executor = LocalExecutorBuilder::default().spawn(move || async move { + let builder = LocalExecutorBuilder::default().name("socket"); + + let executor = builder.spawn(move || async move { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, @@ -93,7 +95,9 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let executor = LocalExecutorBuilder::default().spawn(move || async move { + let builder = LocalExecutorBuilder::default().name("request"); + + let executor = builder.spawn(move || async move { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, diff --git a/aquatic_ws/src/lib/glommio/mod.rs b/aquatic_ws/src/lib/glommio/mod.rs index 3d74042..ed6c495 100644 --- a/aquatic_ws/src/lib/glommio/mod.rs +++ b/aquatic_ws/src/lib/glommio/mod.rs @@ -39,7 +39,9 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let executor = LocalExecutorBuilder::default().spawn(move || async move { + let builder = LocalExecutorBuilder::default().name("socket"); + + let executor = builder.spawn(move || async move { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -67,7 +69,9 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let executor = LocalExecutorBuilder::default().spawn(move || async move { + let builder = LocalExecutorBuilder::default().name("request"); + + let executor = builder.spawn(move || async move { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, From af8b7769373e6fd2c1148c7e76e85d748e14fae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 11 Nov 2021 20:54:40 +0100 Subject: [PATCH 5/6] udp, ws: auto-enable cpu-pinning feature when running with glommio --- aquatic_udp/Cargo.toml | 2 +- aquatic_ws/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 0ddf68b..7fc5164 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -17,7 +17,7 @@ name = "aquatic_udp" [features] default = ["with-mio"] cpu-pinning = ["aquatic_common/cpu-pinning"] -with-glommio = ["glommio", "futures-lite"] +with-glommio = ["cpu-pinning", "glommio", "futures-lite"] with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"] [dependencies] diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index b12716a..2fc27cf 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -18,7 +18,7 @@ path = "src/bin/main.rs" [features] default = ["with-mio"] cpu-pinning = ["aquatic_common/cpu-pinning"] -with-glommio = ["async-tungstenite", "futures-lite", "futures", "futures-rustls", "glommio", "rustls-pemfile"] +with-glommio = ["cpu-pinning", "async-tungstenite", "futures-lite", "futures", "futures-rustls", "glommio", "rustls-pemfile"] with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "parking_lot", "socket2"] [dependencies] From f93db6a9f2bf437e4f63d0a23427cdee3cd23fdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 12 Nov 2021 11:31:24 +0100 Subject: [PATCH 6/6] Update TODO --- TODO.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index b910bca..6f04a76 100644 --- a/TODO.md +++ b/TODO.md @@ -39,8 +39,9 @@ * consider better error type for request parsing, so that better error messages can be sent back (e.g., "full scrapes are not supported") -* http and ws load tests - * add config key 'connection_open_interval_ms', default to 1000 +* aquatic_ws + * glommio + * fix memory leak / huge growth ## Less important