diff --git a/Cargo.lock b/Cargo.lock index 4a7a0d9..c1aa85a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "ahash" version = "0.3.8" @@ -74,6 +86,7 @@ dependencies = [ name = "aquatic_common" version = "0.1.0" dependencies = [ + "affinity", "ahash 0.7.6", "anyhow", "arc-swap", @@ -95,7 +108,6 @@ dependencies = [ "aquatic_common", "aquatic_http_protocol", "cfg-if", - "core_affinity", "either", "futures-lite", "futures-rustls", @@ -122,6 +134,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_http_protocol", "futures-lite", "glommio", @@ -166,7 +179,6 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "cfg-if", - "core_affinity", "crossbeam-channel", "futures-lite", "glommio", @@ -206,8 +218,8 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_udp_protocol", - "core_affinity", "crossbeam-channel", "hashbrown 0.11.2", "mimalloc", @@ -241,7 +253,6 @@ dependencies = [ "aquatic_ws_protocol", "async-tungstenite", "cfg-if", - "core_affinity", "crossbeam-channel", "either", "futures", @@ -273,6 +284,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_ws_protocol", "async-tungstenite", "futures", @@ -534,18 +546,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" -[[package]] -name = "core_affinity" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" -dependencies = [ - "kernel32-sys", - "libc", - "num_cpus", - "winapi 0.2.8", -] - [[package]] name = "cpufeatures" version = "0.2.1" @@ -718,6 +718,27 @@ dependencies = [ "regex", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "failure" version = "0.1.8" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 3d9d9b3..b10a3f2 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -14,6 +14,7 @@ name = "aquatic_common" ahash = "0.7" anyhow = "1" arc-swap = "1" +affinity = "0.1" hashbrown = "0.11.2" hex = "0.4" indexmap-amortized = "1" diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index a13fc73..ddfa833 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,17 +1,106 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] +#[serde(rename_all = "lowercase")] +pub enum CpuPinningMode { + Ascending, + Descending, +} + +impl Default for CpuPinningMode { + fn default() -> Self { + Self::Ascending + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct CpuPinningConfig { pub active: bool, - pub offset: usize, + pub mode: CpuPinningMode, + pub virtual_per_physical_cpu: usize, + pub offset_cpus: usize, } impl Default for CpuPinningConfig { fn default() -> Self { Self { active: false, - offset: 0, + mode: Default::default(), + virtual_per_physical_cpu: 2, + offset_cpus: 0, + } + } +} + +impl CpuPinningConfig { + pub fn default_for_load_test() -> Self { + Self { + mode: CpuPinningMode::Descending, + ..Default::default() + } + } +} + +#[derive(Clone, Copy, Debug)] +pub enum WorkerIndex { + SocketWorker(usize), + RequestWorker(usize), + Other, +} + +impl WorkerIndex { + fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec { + let offset = match self { + Self::Other => config.virtual_per_physical_cpu * config.offset_cpus, + Self::SocketWorker(index) => { + config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index) + } + Self::RequestWorker(index) => { + config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index) + } + }; + + let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i); + + let virtual_cpus: Vec = match config.mode { + CpuPinningMode::Ascending => virtual_cpus.collect(), + CpuPinningMode::Descending => { + let max_index = affinity::get_core_num() - 1; + + virtual_cpus + .map(|i| max_index.checked_sub(i).unwrap_or(0)) + .collect() + } + }; + + ::log::info!( + "Calculated virtual CPU pin indices {:?} for {:?}", + virtual_cpus, + self + ); + + virtual_cpus + } +} + +/// Note: don't call this when affinities were already set in the current or in +/// a parent thread. Doing so limits the number of cores that are seen and +/// messes up setting affinities. +pub fn pin_current_if_configured_to( + config: &CpuPinningConfig, + socket_workers: usize, + worker_index: WorkerIndex, +) { + if config.active { + let indices = worker_index.get_cpu_indices(config, socket_workers); + + if let Err(err) = affinity::set_thread_affinity(indices.clone()) { + ::log::error!( + "Failed setting thread affinities {:?} for {:?}: {:#?}", + indices, + worker_index, + err + ); } } } diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index f6ea2ee..fffdb5c 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -21,7 +21,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" either = "1" futures-lite = "1" futures-rustls = "0.22" diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index ef584e5..873ed83 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -5,7 +5,9 @@ use std::{ }; use aquatic_common::{ - access_list::update_access_list, privileges::drop_privileges_after_socket_binding, + access_list::update_access_list, + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, + privileges::drop_privileges_after_socket_binding, }; use common::{State, TlsConfig}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; @@ -23,12 +25,6 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -42,6 +38,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -55,12 +57,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -80,13 +76,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); - } - - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -107,13 +103,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); - } - - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -128,6 +124,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 25baab6..83161b1 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -12,6 +12,7 @@ name = "aquatic_http_load_test" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" futures-lite = "1" hashbrown = "0.11.2" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 1c8456a..a3db7d4 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,18 +1,26 @@ use std::net::SocketAddr; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub server_address: SocketAddr, - pub num_workers: u8, + pub log_level: LogLevel, + pub num_workers: usize, pub num_connections: usize, pub duration: usize, pub torrents: TorrentConfig, + pub cpu_pinning: CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -36,10 +44,12 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_workers: 1, num_connections: 8, duration: 0, torrents: TorrentConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index e719f77..0dd5b25 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -3,6 +3,7 @@ use std::thread; use std::time::{Duration, Instant}; use ::glommio::LocalExecutorBuilder; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use rand::prelude::*; use rand_distr::Pareto; @@ -56,18 +57,30 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for _ in 0..config.num_workers { + for i in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); LocalExecutorBuilder::default() - .spawn(|| async move { + .spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers, + WorkerIndex::SocketWorker(i), + ); + run_socket_thread(config, tls_config, state).await.unwrap(); }) .unwrap(); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + monitor_statistics(state, &config); Ok(()) diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index b0ea491..0fe4479 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -25,7 +25,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" hex = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 6836ed2..058f5e9 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; use aquatic_common::access_list::update_access_list; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; @@ -18,12 +19,6 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -37,6 +32,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -50,12 +51,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -72,13 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); - } - - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -98,13 +93,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); - } - - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -119,6 +114,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index f21f9e7..16ac519 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -3,6 +3,7 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; @@ -20,12 +21,6 @@ pub mod tasks; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -39,6 +34,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -52,12 +53,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let (request_sender, request_receiver) = unbounded(); @@ -72,11 +67,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset + 1 + i, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); handlers::run_request_worker(state, config, request_receiver, response_sender) }) @@ -93,11 +88,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset + 1 + config.request_workers + i, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); network::run_socket_worker( state, @@ -118,11 +113,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name("statistics-collector".to_string()) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); @@ -140,6 +135,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + loop { ::std::thread::sleep(Duration::from_secs( config.cleaning.torrent_cleaning_interval, diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index f437d60..fdef0ea 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -12,8 +12,8 @@ name = "aquatic_udp_load_test" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" -core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index d3ec752..83a6a1b 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use hashbrown::HashMap; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; @@ -15,6 +17,7 @@ pub struct ThreadId(pub u8); pub struct Config { /// Server address pub server_address: SocketAddr, + pub log_level: LogLevel, /// Number of sockets and socket worker threads /// /// Sockets will bind to one port each, and with @@ -27,7 +30,7 @@ pub struct Config { pub duration: usize, pub network: NetworkConfig, pub handler: HandlerConfig, - pub core_affinity: CoreAffinityConfig, + pub cpu_pinning: CpuPinningConfig, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -97,23 +100,17 @@ pub struct HandlerConfig { pub additional_request_factor: f64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct CoreAffinityConfig { - /// Set core affinities, descending from last core - pub set_affinities: bool, -} - impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_socket_workers: 1, num_request_workers: 1, duration: 0, network: NetworkConfig::default(), handler: HandlerConfig::default(), - core_affinity: CoreAffinityConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } @@ -192,11 +189,3 @@ pub struct SocketWorkerLocalStatistics { pub responses_scrape: usize, pub responses_error: usize, } - -impl Default for CoreAffinityConfig { - fn default() -> Self { - Self { - set_affinities: false, - } - } -} diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 92be3bb..261cc5a 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -3,6 +3,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use crossbeam_channel::unbounded; use hashbrown::HashMap; use parking_lot::Mutex; @@ -30,18 +31,13 @@ pub fn main() { ) } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} fn run(config: Config) -> ::anyhow::Result<()> { - let affinity_max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id).max()) - .flatten() - .unwrap_or(0); - - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max }); - } - if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { @@ -50,6 +46,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); for _ in 0..config.handler.number_of_torrents { @@ -101,11 +103,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); thread::spawn(move || { - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { - id: affinity_max - 1 - i as usize, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::SocketWorker(i as usize), + ); run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) }); @@ -118,11 +120,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let response_receiver = response_receiver.clone(); thread::spawn(move || { - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { - id: affinity_max - config.num_socket_workers as usize - 1 - i as usize, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::RequestWorker(i as usize), + ); run_handler_thread(&config, state, pareto, request_senders, response_receiver) }); } diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 98844d5..b76299c 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -26,7 +26,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" either = "1" hashbrown = { version = "0.11.2", features = ["serde"] } log = "0.4" diff --git a/aquatic_ws/src/lib/glommio/mod.rs b/aquatic_ws/src/lib/glommio/mod.rs index 0c84499..17c4e2b 100644 --- a/aquatic_ws/src/lib/glommio/mod.rs +++ b/aquatic_ws/src/lib/glommio/mod.rs @@ -9,7 +9,10 @@ use std::{ }; use crate::config::Config; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::{ + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, + privileges::drop_privileges_after_socket_binding, +}; use self::common::*; @@ -18,12 +21,6 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -43,13 +40,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); - } - - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -70,13 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); - } - - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -91,6 +88,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 57e10a7..b9542b4 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -1,4 +1,7 @@ -use aquatic_common::access_list::update_access_list; +use aquatic_common::{ + access_list::update_access_list, + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, +}; use cfg_if::cfg_if; use signal_hook::{consts::SIGUSR1, iterator::Signals}; @@ -14,12 +17,6 @@ pub mod mio; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } - cfg_if!( if #[cfg(feature = "with-glommio")] { let state = glommio::common::State::default(); @@ -45,6 +42,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { diff --git a/aquatic_ws/src/lib/mio/mod.rs b/aquatic_ws/src/lib/mio/mod.rs index f793823..0becdd8 100644 --- a/aquatic_ws/src/lib/mio/mod.rs +++ b/aquatic_ws/src/lib/mio/mod.rs @@ -5,6 +5,7 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use histogram::Histogram; use mio::{Poll, Waker}; use native_tls::{Identity, TlsAcceptor}; @@ -25,6 +26,12 @@ pub fn run(config: Config, state: State) -> anyhow::Result<()> { // TODO: privdrop here instead + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + loop { ::std::thread::sleep(Duration::from_secs( config.cleaning.torrent_cleaning_interval, @@ -69,6 +76,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); + network::run_socket_worker( config, state, @@ -121,6 +134,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); + handlers::run_request_worker( config, state, @@ -137,10 +156,18 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name("statistics".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); - print_statistics(&state); + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + print_statistics(&state); + } }) .expect("spawn statistics thread"); } diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index 581f1de..67a02f2 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -13,6 +13,7 @@ name = "aquatic_ws_load_test" anyhow = "1" async-tungstenite = "0.15" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" futures = "0.3" futures-rustls = "0.22" diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 10d4bdc..3bef7bc 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -1,18 +1,26 @@ use std::net::SocketAddr; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub server_address: SocketAddr, - pub num_workers: u8, + pub log_level: LogLevel, + pub num_workers: usize, pub num_connections: usize, pub duration: usize, pub torrents: TorrentConfig, + pub cpu_pinning: CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -37,10 +45,12 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_workers: 1, num_connections: 16, duration: 0, torrents: TorrentConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 54f9208..05ed1a0 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Pareto; @@ -51,18 +52,30 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for _ in 0..config.num_workers { + for i in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); LocalExecutorBuilder::default() - .spawn(|| async move { + .spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers, + WorkerIndex::SocketWorker(i), + ); + run_socket_thread(config, tls_config, state).await.unwrap(); }) .unwrap(); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + monitor_statistics(state, &config); Ok(()) diff --git a/scripts/run-aquatic-udp.sh b/scripts/run-aquatic-udp.sh index 6ccfb86..db41e58 100755 --- a/scripts/run-aquatic-udp.sh +++ b/scripts/run-aquatic-udp.sh @@ -1,5 +1,13 @@ -#!/bin/sh +#!/bin/bash . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_udp -- $@ +if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then + echo "Usage: $0 [mio|glommio] [ARGS]" +else + if [ "$1" = "mio" ]; then + cargo run --release --bin aquatic_udp -- "${@:2}" + else + cargo run --release --features "with-glommio" --no-default-features --bin aquatic_udp -- "${@:2}" + fi +fi diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 40be253..5aadff6 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -1,5 +1,14 @@ -#!/bin/sh +#!/bin/bash . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_ws -- $@ +if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then + echo "Usage: $0 [mio|glommio] [ARGS]" +else + if [ "$1" = "mio" ]; then + cargo run --release --bin aquatic_ws -- "${@:2}" + else + cargo run --release --features "with-glommio" --no-default-features --bin aquatic_ws -- "${@:2}" + fi +fi +