Improve aquatic_ws glommio implementation, fixing memory leak (#37)

* ws: upgrade tungstenite and async-tungstenite to 0.16

* ws: use latest glommio

* ws: glommio: add config key connection_cleaning_interval

* ws: glommio: limit socket worker local channel size

Quick fix for memory leak

* ws: glommio: use prioritized task queue for sending responses

This is a start on trying to fix the large amount of responses
being dropped due to local response channel being full

* scripts/watch-threads.sh: don't highlight changes

* ws: glommio: await when sending responses from shared to local

* ws: glommio: limit ws message queue; add lots of ::log::warn for debug

* ws: glommio: add timeout to send_out_message, maybe pinpointing issue

* ws: glommio: clean up, tweak channel sizes; update TODO

* ws: glommio: set send timeout to 10s, add backpressure, accept drops

* ws: glommio: yield if needed in ConnectionReader backoff loop

* ws load test: add config key connection_creation_interval_ms

* ws load test: don't print "run connection" when opening connection

* ws load test: rename config num_connection to num_connections_per_worker

* ws load test config: improve order of code in file

* ws: glommio: info-level log for socket send timeout

* ws glommio: clean idle connections, improve cleaning code

* ws: glommio: make ConnectionWriter::send_error_response fallible; fmt

* ws: glommio: store JoinHandle in conn reference, cancel task on clean

* add scripts/heaptrack.sh

* Update TODO
This commit is contained in:
Joakim Frostegård 2021-12-11 18:50:05 +01:00 committed by GitHub
parent e1bffae42c
commit 222fac0e09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 334 additions and 165 deletions

View file

@ -14,13 +14,13 @@ cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies]
anyhow = "1"
async-tungstenite = "0.15"
async-tungstenite = "0.16"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_ws_protocol = "0.1.0"
futures = "0.3"
futures-rustls = "0.22"
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" }
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "2efe2f2a08f54394a435b674e8e0125057cbff03" }
hashbrown = { version = "0.11", features = ["serde"] }
mimalloc = { version = "0.1", default-features = false }
rand = { version = "0.8", features = ["small_rng"] }
@ -28,7 +28,7 @@ rand_distr = "0.4"
rustls = { version = "0.20", features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tungstenite = "0.15"
tungstenite = "0.16"
[dev-dependencies]
quickcheck = "1"

View file

@ -11,7 +11,8 @@ pub struct Config {
pub server_address: SocketAddr,
pub log_level: LogLevel,
pub num_workers: usize,
pub num_connections: usize,
pub num_connections_per_worker: usize,
pub connection_creation_interval_ms: u64,
pub duration: usize,
pub torrents: TorrentConfig,
#[cfg(feature = "cpu-pinning")]
@ -24,6 +25,22 @@ impl aquatic_cli_helpers::Config for Config {
}
}
impl Default for Config {
fn default() -> Self {
Self {
server_address: "127.0.0.1:3000".parse().unwrap(),
log_level: LogLevel::Error,
num_workers: 1,
num_connections_per_worker: 16,
connection_creation_interval_ms: 10,
duration: 0,
torrents: TorrentConfig::default(),
#[cfg(feature = "cpu-pinning")]
cpu_pinning: CpuPinningConfig::default_for_load_test(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TorrentConfig {
@ -43,21 +60,6 @@ pub struct TorrentConfig {
pub weight_scrape: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
server_address: "127.0.0.1:3000".parse().unwrap(),
log_level: LogLevel::Error,
num_workers: 1,
num_connections: 16,
duration: 0,
torrents: TorrentConfig::default(),
#[cfg(feature = "cpu-pinning")]
cpu_pinning: CpuPinningConfig::default_for_load_test(),
}
}
}
impl Default for TorrentConfig {
fn default() -> Self {
Self {

View file

@ -44,7 +44,9 @@ async fn periodically_open_connections(
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
) -> Option<Duration> {
if *num_active_connections.borrow() < config.num_connections {
let wait = Duration::from_millis(config.connection_creation_interval_ms);
if *num_active_connections.borrow() < config.num_connections_per_worker {
spawn_local(async move {
if let Err(err) =
Connection::run(config, tls_config, load_test_state, num_active_connections).await
@ -55,7 +57,7 @@ async fn periodically_open_connections(
.detach();
}
Some(Duration::from_secs(1))
Some(wait)
}
struct Connection {
@ -102,8 +104,6 @@ impl Connection {
*num_active_connections.borrow_mut() += 1;
println!("run connection");
if let Err(err) = connection.run_connection_loop().await {
eprintln!("connection error: {:?}", err);
}