From aa332ab296aee221f9577ebcd2735cb3669066af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:03:49 +0100 Subject: [PATCH 1/9] aquatic_common, udp, udp load test: improve cpu pinning --- Cargo.lock | 4 +- aquatic_common/Cargo.toml | 1 + aquatic_common/src/cpu_pinning.rs | 74 ++++++++++++++++++++++++++++- aquatic_udp/Cargo.toml | 1 - aquatic_udp/src/lib/glommio/mod.rs | 31 +++++++----- aquatic_udp/src/lib/mio/mod.rs | 51 ++++++++++---------- aquatic_udp_load_test/Cargo.toml | 2 +- aquatic_udp_load_test/src/common.rs | 20 ++------ aquatic_udp_load_test/src/main.rs | 36 +++++++------- 9 files changed, 142 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a7a0d9..73fc3bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,7 @@ dependencies = [ "ahash 0.7.6", "anyhow", "arc-swap", + "core_affinity", "hashbrown 0.11.2", "hex", "indexmap-amortized", @@ -166,7 +167,6 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "cfg-if", - "core_affinity", "crossbeam-channel", "futures-lite", "glommio", @@ -206,8 +206,8 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_udp_protocol", - "core_affinity", "crossbeam-channel", "hashbrown 0.11.2", "mimalloc", diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 3d9d9b3..083047d 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -14,6 +14,7 @@ name = "aquatic_common" ahash = "0.7" anyhow = "1" arc-swap = "1" +core_affinity = "0.5" hashbrown = "0.11.2" hex = "0.4" indexmap-amortized = "1" diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index a13fc73..a7d508f 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,17 +1,89 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] +#[serde(rename_all = "lowercase")] +pub enum CpuPinningMode { + Ascending, + Descending, +} + +impl Default for CpuPinningMode { + fn default() -> Self { + Self::Ascending + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct CpuPinningConfig { pub active: bool, + pub mode: CpuPinningMode, pub offset: usize, + pub multiple: usize, } impl Default for CpuPinningConfig { fn default() -> Self { Self { active: false, + mode: Default::default(), offset: 0, + multiple: 1, } } } + +impl CpuPinningConfig { + pub fn default_for_load_test() -> Self { + Self { + mode: CpuPinningMode::Descending, + ..Default::default() + } + } +} + +#[derive(Clone, Copy, Debug)] +pub enum WorkerIndex { + SocketWorker(usize), + RequestWorker(usize), + Other, +} + +impl WorkerIndex { + pub fn get_cpu_index(self, config: &CpuPinningConfig, socket_workers: usize) -> usize { + let index = match self { + Self::Other => config.offset, + Self::SocketWorker(index) => config.multiple * (config.offset + 1 + index), + Self::RequestWorker(index) => { + config.multiple * (config.offset + 1 + socket_workers + index) + } + }; + + let index = match config.mode { + CpuPinningMode::Ascending => index, + CpuPinningMode::Descending => { + let max = core_affinity::get_core_ids() + .map(|ids| ids.iter().map(|id| id.id).max()) + .flatten() + .unwrap_or(0); + + max - index + } + }; + + ::log::info!("Calculated CPU pin index {} for {:?}", index, self); + + index + } +} + +pub fn pin_current_if_configured_to( + config: &CpuPinningConfig, + socket_workers: usize, + worker_index: WorkerIndex, +) { + if config.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: worker_index.get_cpu_index(config, socket_workers), + }); + } +} diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index b0ea491..0fe4479 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -25,7 +25,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" hex = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 6836ed2..28e7842 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; use aquatic_common::access_list::update_access_list; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; @@ -18,11 +19,11 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let state = State::default(); @@ -50,11 +51,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_peers = config.socket_workers + config.request_workers; @@ -75,7 +76,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { @@ -101,7 +105,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu( + WorkerIndex::RequestWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index f21f9e7..1077878 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -3,6 +3,7 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; @@ -20,11 +21,11 @@ pub mod tasks; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let state = State::default(); @@ -52,11 +53,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); @@ -72,11 +73,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset + 1 + i, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); handlers::run_request_worker(state, config, request_receiver, response_sender) }) @@ -93,11 +94,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset + 1 + config.request_workers + i, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); network::run_socket_worker( state, @@ -118,11 +119,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name("statistics-collector".to_string()) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index f437d60..fdef0ea 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -12,8 +12,8 @@ name = "aquatic_udp_load_test" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" -core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index d3ec752..b22547c 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::cpu_pinning::CpuPinningConfig; use hashbrown::HashMap; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; @@ -27,7 +28,7 @@ pub struct Config { pub duration: usize, pub network: NetworkConfig, pub handler: HandlerConfig, - pub core_affinity: CoreAffinityConfig, + pub cpu_pinning: CpuPinningConfig, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -97,13 +98,6 @@ pub struct HandlerConfig { pub additional_request_factor: f64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct CoreAffinityConfig { - /// Set core affinities, descending from last core - pub set_affinities: bool, -} - impl Default for Config { fn default() -> Self { Self { @@ -113,7 +107,7 @@ impl Default for Config { duration: 0, network: NetworkConfig::default(), handler: HandlerConfig::default(), - core_affinity: CoreAffinityConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } @@ -192,11 +186,3 @@ pub struct SocketWorkerLocalStatistics { pub responses_scrape: usize, pub responses_error: usize, } - -impl Default for CoreAffinityConfig { - fn default() -> Self { - Self { - set_affinities: false, - } - } -} diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 92be3bb..ca246ce 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -3,6 +3,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use crossbeam_channel::unbounded; use hashbrown::HashMap; use parking_lot::Mutex; @@ -33,15 +34,6 @@ pub fn main() { impl aquatic_cli_helpers::Config for Config {} fn run(config: Config) -> ::anyhow::Result<()> { - let affinity_max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id).max()) - .flatten() - .unwrap_or(0); - - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max }); - } - if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { @@ -50,6 +42,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); for _ in 0..config.handler.number_of_torrents { @@ -101,11 +99,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); thread::spawn(move || { - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { - id: affinity_max - 1 - i as usize, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::SocketWorker(i as usize), + ); run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) }); @@ -118,11 +116,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let response_receiver = response_receiver.clone(); thread::spawn(move || { - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { - id: affinity_max - config.num_socket_workers as usize - 1 - i as usize, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::RequestWorker(i as usize), + ); run_handler_thread(&config, state, pareto, request_senders, response_receiver) }); } From 57896f4648c8559c4bb1e21541f96e0b34a7aa1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:25:50 +0100 Subject: [PATCH 2/9] http, http load test: improve CPU pinning --- Cargo.lock | 2 +- aquatic_http/Cargo.toml | 1 - aquatic_http/src/lib/lib.rs | 34 +++++++++++++++++----------- aquatic_http_load_test/Cargo.toml | 1 + aquatic_http_load_test/src/config.rs | 14 ++++++++++-- aquatic_http_load_test/src/main.rs | 20 ++++++++++++++-- 6 files changed, 53 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73fc3bd..b66fecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,6 @@ dependencies = [ "aquatic_common", "aquatic_http_protocol", "cfg-if", - "core_affinity", "either", "futures-lite", "futures-rustls", @@ -123,6 +122,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_http_protocol", "futures-lite", "glommio", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index f6ea2ee..fffdb5c 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -21,7 +21,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" either = "1" futures-lite = "1" futures-rustls = "0.22" diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index ef584e5..9172caa 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -5,7 +5,9 @@ use std::{ }; use aquatic_common::{ - access_list::update_access_list, privileges::drop_privileges_after_socket_binding, + access_list::update_access_list, + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, + privileges::drop_privileges_after_socket_binding, }; use common::{State, TlsConfig}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; @@ -23,11 +25,11 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let state = State::default(); @@ -55,11 +57,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_peers = config.socket_workers + config.request_workers; @@ -83,7 +85,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { @@ -110,7 +115,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu( + WorkerIndex::RequestWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 25baab6..83161b1 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -12,6 +12,7 @@ name = "aquatic_http_load_test" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" futures-lite = "1" hashbrown = "0.11.2" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 1c8456a..a3db7d4 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,18 +1,26 @@ use std::net::SocketAddr; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub server_address: SocketAddr, - pub num_workers: u8, + pub log_level: LogLevel, + pub num_workers: usize, pub num_connections: usize, pub duration: usize, pub torrents: TorrentConfig, + pub cpu_pinning: CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -36,10 +44,12 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_workers: 1, num_connections: 8, duration: 0, torrents: TorrentConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index e719f77..2a1a4c5 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -3,6 +3,7 @@ use std::thread; use std::time::{Duration, Instant}; use ::glommio::LocalExecutorBuilder; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use rand::prelude::*; use rand_distr::Pareto; @@ -36,6 +37,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -56,12 +63,21 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for _ in 0..config.num_workers { + for i in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); - LocalExecutorBuilder::default() + let mut builder = LocalExecutorBuilder::default(); + + if config.cpu_pinning.active { + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.num_workers), + ); + } + + builder .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) From 6eaac536ba01d383feeefa1d186fc684071a5658 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:29:15 +0100 Subject: [PATCH 3/9] udp load test: enable log crate logging --- aquatic_udp_load_test/src/common.rs | 3 +++ aquatic_udp_load_test/src/main.rs | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index b22547c..83a6a1b 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_cli_helpers::LogLevel; use aquatic_common::cpu_pinning::CpuPinningConfig; use hashbrown::HashMap; use parking_lot::Mutex; @@ -16,6 +17,7 @@ pub struct ThreadId(pub u8); pub struct Config { /// Server address pub server_address: SocketAddr, + pub log_level: LogLevel, /// Number of sockets and socket worker threads /// /// Sockets will bind to one port each, and with @@ -102,6 +104,7 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_socket_workers: 1, num_request_workers: 1, duration: 0, diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index ca246ce..261cc5a 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -31,7 +31,11 @@ pub fn main() { ) } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} fn run(config: Config) -> ::anyhow::Result<()> { if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape From b54694bbc04d4a2a107a1715775712085bb97149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:46:48 +0100 Subject: [PATCH 4/9] aquatic_ws: improve cpu pinning --- Cargo.lock | 1 - aquatic_ws/Cargo.toml | 1 - aquatic_ws/src/lib/glommio/mod.rs | 25 +++++++++++++++-------- aquatic_ws/src/lib/lib.rs | 15 ++++++++------ aquatic_ws/src/lib/mio/mod.rs | 33 ++++++++++++++++++++++++++++--- 5 files changed, 56 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b66fecb..48a85da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,6 @@ dependencies = [ "aquatic_ws_protocol", "async-tungstenite", "cfg-if", - "core_affinity", "crossbeam-channel", "either", "futures", diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 98844d5..b76299c 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -26,7 +26,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" either = "1" hashbrown = { version = "0.11.2", features = ["serde"] } log = "0.4" diff --git a/aquatic_ws/src/lib/glommio/mod.rs b/aquatic_ws/src/lib/glommio/mod.rs index 0c84499..dfa59a9 100644 --- a/aquatic_ws/src/lib/glommio/mod.rs +++ b/aquatic_ws/src/lib/glommio/mod.rs @@ -9,7 +9,10 @@ use std::{ }; use crate::config::Config; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::{ + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, + privileges::drop_privileges_after_socket_binding, +}; use self::common::*; @@ -18,11 +21,11 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_peers = config.socket_workers + config.request_workers; @@ -46,7 +49,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { @@ -73,7 +79,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu( + WorkerIndex::RequestWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 57e10a7..a01f3de 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -1,4 +1,7 @@ -use aquatic_common::access_list::update_access_list; +use aquatic_common::{ + access_list::update_access_list, + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, +}; use cfg_if::cfg_if; use signal_hook::{consts::SIGUSR1, iterator::Signals}; @@ -14,11 +17,11 @@ pub mod mio; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); cfg_if!( if #[cfg(feature = "with-glommio")] { diff --git a/aquatic_ws/src/lib/mio/mod.rs b/aquatic_ws/src/lib/mio/mod.rs index f793823..df791cb 100644 --- a/aquatic_ws/src/lib/mio/mod.rs +++ b/aquatic_ws/src/lib/mio/mod.rs @@ -5,6 +5,7 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use histogram::Histogram; use mio::{Poll, Waker}; use native_tls::{Identity, TlsAcceptor}; @@ -21,6 +22,12 @@ use common::*; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config, state: State) -> anyhow::Result<()> { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + start_workers(config.clone(), state.clone()).expect("couldn't start workers"); // TODO: privdrop here instead @@ -69,6 +76,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); + network::run_socket_worker( config, state, @@ -121,6 +134,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); + handlers::run_request_worker( config, state, @@ -137,10 +156,18 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name("statistics".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); - print_statistics(&state); + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + print_statistics(&state); + } }) .expect("spawn statistics thread"); } From 1c8da337a1d8274592a0d7f59e3cb3998e745b06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:51:23 +0100 Subject: [PATCH 5/9] scripts/run-aquatic-ws: support running either mio or glommio impl --- scripts/run-aquatic-ws.sh | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 40be253..3a7e8a5 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -1,5 +1,14 @@ -#!/bin/sh +#!/bin/bash . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_ws -- $@ +if [[ -z $1 ]]; then + echo "Usage: $0 [mio|glommio]" +else + if [ "$1" = "mio" ]; then + cargo run --release --bin aquatic_ws -- "${@:2}" + else + cargo run --release --features "with-glommio" --no-default-features --bin aquatic_ws -- "${@:2}" + fi +fi + From 3114f8692beb1f902eb50e40939e8525f7e0e96e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:52:05 +0100 Subject: [PATCH 6/9] http load test: cleanup --- aquatic_http_load_test/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 2a1a4c5..8792603 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -72,8 +72,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { if config.cpu_pinning.active { builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.num_workers), + WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), ); } From b653e3e3ff1b83ae6351b949dd7334b3cb65bce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 12:00:36 +0100 Subject: [PATCH 7/9] ws load test: add cpu pinning and log crate logging --- Cargo.lock | 1 + aquatic_ws_load_test/Cargo.toml | 1 + aquatic_ws_load_test/src/config.rs | 14 ++++++++++++-- aquatic_ws_load_test/src/main.rs | 19 +++++++++++++++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48a85da..f968137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_ws_protocol", "async-tungstenite", "futures", diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index 581f1de..67a02f2 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -13,6 +13,7 @@ name = "aquatic_ws_load_test" anyhow = "1" async-tungstenite = "0.15" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" futures = "0.3" futures-rustls = "0.22" diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 10d4bdc..3bef7bc 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -1,18 +1,26 @@ use std::net::SocketAddr; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub server_address: SocketAddr, - pub num_workers: u8, + pub log_level: LogLevel, + pub num_workers: usize, pub num_connections: usize, pub duration: usize, pub torrents: TorrentConfig, + pub cpu_pinning: CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -37,10 +45,12 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_workers: 1, num_connections: 16, duration: 0, torrents: TorrentConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 54f9208..1a8c342 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Pareto; @@ -33,6 +34,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -51,12 +58,20 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for _ in 0..config.num_workers { + for i in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); - LocalExecutorBuilder::default() + let mut builder = LocalExecutorBuilder::default(); + + if config.cpu_pinning.active { + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), + ); + } + + builder .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) From 03192d2afbfe371750c7d21b38c7fa84f9976b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 13:30:42 +0100 Subject: [PATCH 8/9] scripts: improve run-aquatic-udp.sh and run-aquatic-ws.sh --- scripts/run-aquatic-udp.sh | 12 ++++++++++-- scripts/run-aquatic-ws.sh | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/scripts/run-aquatic-udp.sh b/scripts/run-aquatic-udp.sh index 6ccfb86..db41e58 100755 --- a/scripts/run-aquatic-udp.sh +++ b/scripts/run-aquatic-udp.sh @@ -1,5 +1,13 @@ -#!/bin/sh +#!/bin/bash . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_udp -- $@ +if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then + echo "Usage: $0 [mio|glommio] [ARGS]" +else + if [ "$1" = "mio" ]; then + cargo run --release --bin aquatic_udp -- "${@:2}" + else + cargo run --release --features "with-glommio" --no-default-features --bin aquatic_udp -- "${@:2}" + fi +fi diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 3a7e8a5..5aadff6 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -2,8 +2,8 @@ . ./scripts/env-native-cpu-without-avx-512 -if [[ -z $1 ]]; then - echo "Usage: $0 [mio|glommio]" +if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then + echo "Usage: $0 [mio|glommio] [ARGS]" else if [ "$1" = "mio" ]; then cargo run --release --bin aquatic_ws -- "${@:2}" From e86410291a440883596d0085ed8a52323a35aba3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 13:31:03 +0100 Subject: [PATCH 9/9] cpu pinning: set affinity to multiple hyperthreads, fix issues --- Cargo.lock | 47 +++++++++++++++++------- aquatic_common/Cargo.toml | 2 +- aquatic_common/src/cpu_pinning.rs | 59 +++++++++++++++++++----------- aquatic_http/src/lib/lib.rs | 50 +++++++++++-------------- aquatic_http_load_test/src/main.rs | 28 +++++++------- aquatic_udp/src/lib/glommio/mod.rs | 50 +++++++++++-------------- aquatic_udp/src/lib/mio/mod.rs | 24 ++++++------ aquatic_ws/src/lib/glommio/mod.rs | 38 ++++++++----------- aquatic_ws/src/lib/lib.rs | 12 +++--- aquatic_ws/src/lib/mio/mod.rs | 8 ++-- aquatic_ws_load_test/src/main.rs | 28 +++++++------- 11 files changed, 181 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f968137..c1aa85a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "ahash" version = "0.3.8" @@ -74,10 +86,10 @@ dependencies = [ name = "aquatic_common" version = "0.1.0" dependencies = [ + "affinity", "ahash 0.7.6", "anyhow", "arc-swap", - "core_affinity", "hashbrown 0.11.2", "hex", "indexmap-amortized", @@ -534,18 +546,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" -[[package]] -name = "core_affinity" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" -dependencies = [ - "kernel32-sys", - "libc", - "num_cpus", - "winapi 0.2.8", -] - [[package]] name = "cpufeatures" version = "0.2.1" @@ -718,6 +718,27 @@ dependencies = [ "regex", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "failure" version = "0.1.8" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 083047d..b10a3f2 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -14,7 +14,7 @@ name = "aquatic_common" ahash = "0.7" anyhow = "1" arc-swap = "1" -core_affinity = "0.5" +affinity = "0.1" hashbrown = "0.11.2" hex = "0.4" indexmap-amortized = "1" diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index a7d508f..ddfa833 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -17,8 +17,8 @@ impl Default for CpuPinningMode { pub struct CpuPinningConfig { pub active: bool, pub mode: CpuPinningMode, - pub offset: usize, - pub multiple: usize, + pub virtual_per_physical_cpu: usize, + pub offset_cpus: usize, } impl Default for CpuPinningConfig { @@ -26,8 +26,8 @@ impl Default for CpuPinningConfig { Self { active: false, mode: Default::default(), - offset: 0, - multiple: 1, + virtual_per_physical_cpu: 2, + offset_cpus: 0, } } } @@ -49,41 +49,58 @@ pub enum WorkerIndex { } impl WorkerIndex { - pub fn get_cpu_index(self, config: &CpuPinningConfig, socket_workers: usize) -> usize { - let index = match self { - Self::Other => config.offset, - Self::SocketWorker(index) => config.multiple * (config.offset + 1 + index), + fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec { + let offset = match self { + Self::Other => config.virtual_per_physical_cpu * config.offset_cpus, + Self::SocketWorker(index) => { + config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index) + } Self::RequestWorker(index) => { - config.multiple * (config.offset + 1 + socket_workers + index) + config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index) } }; - let index = match config.mode { - CpuPinningMode::Ascending => index, + let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i); + + let virtual_cpus: Vec = match config.mode { + CpuPinningMode::Ascending => virtual_cpus.collect(), CpuPinningMode::Descending => { - let max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id).max()) - .flatten() - .unwrap_or(0); + let max_index = affinity::get_core_num() - 1; - max - index + virtual_cpus + .map(|i| max_index.checked_sub(i).unwrap_or(0)) + .collect() } }; - ::log::info!("Calculated CPU pin index {} for {:?}", index, self); + ::log::info!( + "Calculated virtual CPU pin indices {:?} for {:?}", + virtual_cpus, + self + ); - index + virtual_cpus } } +/// Note: don't call this when affinities were already set in the current or in +/// a parent thread. Doing so limits the number of cores that are seen and +/// messes up setting affinities. pub fn pin_current_if_configured_to( config: &CpuPinningConfig, socket_workers: usize, worker_index: WorkerIndex, ) { if config.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: worker_index.get_cpu_index(config, socket_workers), - }); + let indices = worker_index.get_cpu_indices(config, socket_workers); + + if let Err(err) = affinity::set_thread_affinity(indices.clone()) { + ::log::error!( + "Failed setting thread affinities {:?} for {:?}: {:#?}", + indices, + worker_index, + err + ); + } } } diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 9172caa..873ed83 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -25,12 +25,6 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -44,6 +38,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -57,12 +57,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -82,16 +76,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), ); - } - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -112,16 +103,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::RequestWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), ); - } - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -136,6 +124,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 8792603..0dd5b25 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -37,12 +37,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - pin_current_if_configured_to( - &config.cpu_pinning, - config.num_workers as usize, - WorkerIndex::Other, - ); - let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -68,21 +62,25 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = tls_config.clone(); let state = state.clone(); - let mut builder = LocalExecutorBuilder::default(); + LocalExecutorBuilder::default() + .spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), - ); - } - - builder - .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) .unwrap(); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + monitor_statistics(state, &config); Ok(()) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 28e7842..058f5e9 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -19,12 +19,6 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -38,6 +32,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -51,12 +51,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -73,16 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), ); - } - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -102,16 +93,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::RequestWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), ); - } - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -126,6 +114,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 1077878..16ac519 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -21,12 +21,6 @@ pub mod tasks; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -40,6 +34,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -53,12 +53,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let (request_sender, request_receiver) = unbounded(); @@ -141,6 +135,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + loop { ::std::thread::sleep(Duration::from_secs( config.cleaning.torrent_cleaning_interval, diff --git a/aquatic_ws/src/lib/glommio/mod.rs b/aquatic_ws/src/lib/glommio/mod.rs index dfa59a9..17c4e2b 100644 --- a/aquatic_ws/src/lib/glommio/mod.rs +++ b/aquatic_ws/src/lib/glommio/mod.rs @@ -21,12 +21,6 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -46,16 +40,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), ); - } - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -76,16 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::RequestWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), ); - } - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -100,6 +88,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index a01f3de..b9542b4 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -17,12 +17,6 @@ pub mod mio; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - cfg_if!( if #[cfg(feature = "with-glommio")] { let state = glommio::common::State::default(); @@ -48,6 +42,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { diff --git a/aquatic_ws/src/lib/mio/mod.rs b/aquatic_ws/src/lib/mio/mod.rs index df791cb..0becdd8 100644 --- a/aquatic_ws/src/lib/mio/mod.rs +++ b/aquatic_ws/src/lib/mio/mod.rs @@ -22,16 +22,16 @@ use common::*; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config, state: State) -> anyhow::Result<()> { + start_workers(config.clone(), state.clone()).expect("couldn't start workers"); + + // TODO: privdrop here instead + pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, WorkerIndex::Other, ); - start_workers(config.clone(), state.clone()).expect("couldn't start workers"); - - // TODO: privdrop here instead - loop { ::std::thread::sleep(Duration::from_secs( config.cleaning.torrent_cleaning_interval, diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 1a8c342..05ed1a0 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -34,12 +34,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - pin_current_if_configured_to( - &config.cpu_pinning, - config.num_workers as usize, - WorkerIndex::Other, - ); - let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -63,21 +57,25 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = tls_config.clone(); let state = state.clone(); - let mut builder = LocalExecutorBuilder::default(); + LocalExecutorBuilder::default() + .spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), - ); - } - - builder - .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) .unwrap(); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + monitor_statistics(state, &config); Ok(())