diff --git a/Cargo.lock b/Cargo.lock index 4a7a0d9..73fc3bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,7 @@ dependencies = [ "ahash 0.7.6", "anyhow", "arc-swap", + "core_affinity", "hashbrown 0.11.2", "hex", "indexmap-amortized", @@ -166,7 +167,6 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "cfg-if", - "core_affinity", "crossbeam-channel", "futures-lite", "glommio", @@ -206,8 +206,8 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_udp_protocol", - "core_affinity", "crossbeam-channel", "hashbrown 0.11.2", "mimalloc", diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 3d9d9b3..083047d 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -14,6 +14,7 @@ name = "aquatic_common" ahash = "0.7" anyhow = "1" arc-swap = "1" +core_affinity = "0.5" hashbrown = "0.11.2" hex = "0.4" indexmap-amortized = "1" diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index a13fc73..a7d508f 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,17 +1,89 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] +#[serde(rename_all = "lowercase")] +pub enum CpuPinningMode { + Ascending, + Descending, +} + +impl Default for CpuPinningMode { + fn default() -> Self { + Self::Ascending + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct CpuPinningConfig { pub active: bool, + pub mode: CpuPinningMode, pub offset: usize, + pub multiple: usize, } impl Default for CpuPinningConfig { fn default() -> Self { Self { active: false, + mode: Default::default(), offset: 0, + multiple: 1, } } } + +impl CpuPinningConfig { + pub fn default_for_load_test() -> Self { + Self { + mode: CpuPinningMode::Descending, + ..Default::default() + } + } +} + +#[derive(Clone, Copy, Debug)] +pub enum WorkerIndex { + SocketWorker(usize), + RequestWorker(usize), + Other, +} + +impl WorkerIndex { + pub fn get_cpu_index(self, config: &CpuPinningConfig, socket_workers: usize) -> usize { + let index = match self { + Self::Other => config.offset, + Self::SocketWorker(index) => config.multiple * (config.offset + 1 + index), + Self::RequestWorker(index) => { + config.multiple * (config.offset + 1 + socket_workers + index) + } + }; + + let index = match config.mode { + CpuPinningMode::Ascending => index, + CpuPinningMode::Descending => { + let max = core_affinity::get_core_ids() + .map(|ids| ids.iter().map(|id| id.id).max()) + .flatten() + .unwrap_or(0); + + max - index + } + }; + + ::log::info!("Calculated CPU pin index {} for {:?}", index, self); + + index + } +} + +pub fn pin_current_if_configured_to( + config: &CpuPinningConfig, + socket_workers: usize, + worker_index: WorkerIndex, +) { + if config.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: worker_index.get_cpu_index(config, socket_workers), + }); + } +} diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index b0ea491..0fe4479 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -25,7 +25,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" hex = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 6836ed2..28e7842 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; use aquatic_common::access_list::update_access_list; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; @@ -18,11 +19,11 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let state = State::default(); @@ -50,11 +51,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_peers = config.socket_workers + config.request_workers; @@ -75,7 +76,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { @@ -101,7 +105,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu( + WorkerIndex::RequestWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index f21f9e7..1077878 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -3,6 +3,7 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; @@ -20,11 +21,11 @@ pub mod tasks; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let state = State::default(); @@ -52,11 +53,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); @@ -72,11 +73,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset + 1 + i, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); handlers::run_request_worker(state, config, request_receiver, response_sender) }) @@ -93,11 +94,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset + 1 + config.request_workers + i, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); network::run_socket_worker( state, @@ -118,11 +119,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { Builder::new() .name("statistics-collector".to_string()) .spawn(move || { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index f437d60..fdef0ea 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -12,8 +12,8 @@ name = "aquatic_udp_load_test" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" -core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index d3ec752..b22547c 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::cpu_pinning::CpuPinningConfig; use hashbrown::HashMap; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; @@ -27,7 +28,7 @@ pub struct Config { pub duration: usize, pub network: NetworkConfig, pub handler: HandlerConfig, - pub core_affinity: CoreAffinityConfig, + pub cpu_pinning: CpuPinningConfig, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -97,13 +98,6 @@ pub struct HandlerConfig { pub additional_request_factor: f64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct CoreAffinityConfig { - /// Set core affinities, descending from last core - pub set_affinities: bool, -} - impl Default for Config { fn default() -> Self { Self { @@ -113,7 +107,7 @@ impl Default for Config { duration: 0, network: NetworkConfig::default(), handler: HandlerConfig::default(), - core_affinity: CoreAffinityConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } @@ -192,11 +186,3 @@ pub struct SocketWorkerLocalStatistics { pub responses_scrape: usize, pub responses_error: usize, } - -impl Default for CoreAffinityConfig { - fn default() -> Self { - Self { - set_affinities: false, - } - } -} diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 92be3bb..ca246ce 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -3,6 +3,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use crossbeam_channel::unbounded; use hashbrown::HashMap; use parking_lot::Mutex; @@ -33,15 +34,6 @@ pub fn main() { impl aquatic_cli_helpers::Config for Config {} fn run(config: Config) -> ::anyhow::Result<()> { - let affinity_max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id).max()) - .flatten() - .unwrap_or(0); - - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max }); - } - if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { @@ -50,6 +42,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); for _ in 0..config.handler.number_of_torrents { @@ -101,11 +99,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); thread::spawn(move || { - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { - id: affinity_max - 1 - i as usize, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::SocketWorker(i as usize), + ); run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) }); @@ -118,11 +116,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let response_receiver = response_receiver.clone(); thread::spawn(move || { - if config.core_affinity.set_affinities { - core_affinity::set_for_current(core_affinity::CoreId { - id: affinity_max - config.num_socket_workers as usize - 1 - i as usize, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_socket_workers as usize, + WorkerIndex::RequestWorker(i as usize), + ); run_handler_thread(&config, state, pareto, request_senders, response_receiver) }); }