diff --git a/Cargo.lock b/Cargo.lock index 75e69e6..43d23c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,7 +158,6 @@ dependencies = [ "arc-swap", "duplicate", "git-testament", - "glommio", "hashbrown 0.14.3", "hex", "hwloc", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index b4c624a..6e09b9a 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -15,6 +15,8 @@ name = "aquatic_common" [features] rustls = ["dep:rustls", "rustls-pemfile"] prometheus = ["dep:metrics", "dep:metrics-util", "dep:metrics-exporter-prometheus", "dep:tokio"] +# Experimental CPU pinning support. Requires hwloc (apt-get install libhwloc-dev) +cpu-pinning = ["dep:hwloc"] [dependencies] aquatic_toml_config.workspace = true @@ -45,6 +47,5 @@ metrics-util = { version = "0.16", optional = true } metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } -# other optional -glommio = { version = "0.8", optional = true } +# cpu pinning feature hwloc = { version = "0.5", optional = true } \ No newline at end of file diff --git a/crates/common/src/cpu_pinning.rs b/crates/common/src/cpu_pinning.rs index 529f15c..30c9918 100644 --- a/crates/common/src/cpu_pinning.rs +++ b/crates/common/src/cpu_pinning.rs @@ -16,27 +16,9 @@ impl Default for CpuPinningDirection { } } -#[cfg(feature = "glommio")] -#[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub enum HyperThreadMapping { - System, - Subsequent, - Split, -} - -#[cfg(feature = "glommio")] -impl Default for HyperThreadMapping { - fn default() -> Self { - Self::System - } -} - pub trait CpuPinningConfig { fn active(&self) -> bool; fn direction(&self) -> CpuPinningDirection; - #[cfg(feature = "glommio")] - fn hyperthread(&self) -> HyperThreadMapping; fn core_offset(&self) -> usize; } @@ -54,8 +36,6 @@ pub mod mod_name { pub struct struct_name { pub active: bool, pub direction: CpuPinningDirection, - #[cfg(feature = "glommio")] - pub hyperthread: HyperThreadMapping, pub core_offset: usize, } @@ -64,8 +44,6 @@ pub mod mod_name { Self { active: false, direction: cpu_pinning_direction, - #[cfg(feature = "glommio")] - hyperthread: Default::default(), core_offset: 0, } } @@ -77,10 +55,6 @@ pub mod mod_name { fn direction(&self) -> CpuPinningDirection { self.direction } - #[cfg(feature = "glommio")] - fn hyperthread(&self) -> HyperThreadMapping { - self.hyperthread - } fn core_offset(&self) -> usize { self.core_offset } @@ -119,158 +93,9 @@ impl WorkerIndex { } } -#[cfg(feature = "glommio")] -pub mod glommio { - use ::glommio::{CpuSet, Placement}; - - use super::*; - - fn get_cpu_set() -> anyhow::Result { - CpuSet::online().map_err(|err| anyhow::anyhow!("Couldn't get CPU set: {:#}", err)) - } - - fn get_num_cpu_cores() -> anyhow::Result { - get_cpu_set()? - .iter() - .map(|l| l.core) - .max() - .map(|index| index + 1) - .ok_or(anyhow::anyhow!("CpuSet is empty")) - } - - fn logical_cpus_string(cpu_set: &CpuSet) -> String { - let mut logical_cpus = cpu_set.iter().map(|l| l.cpu).collect::>(); - - logical_cpus.sort_unstable(); - - logical_cpus - .into_iter() - .map(|cpu| cpu.to_string()) - .collect::>() - .join(", ") - } - - fn get_worker_cpu_set( - config: &C, - socket_workers: usize, - swarm_workers: usize, - worker_index: WorkerIndex, - ) -> anyhow::Result { - let num_cpu_cores = get_num_cpu_cores()?; - - let core_index = - worker_index.get_core_index(config, socket_workers, swarm_workers, num_cpu_cores); - - let too_many_workers = match (&config.hyperthread(), &config.direction()) { - ( - HyperThreadMapping::Split | HyperThreadMapping::Subsequent, - CpuPinningDirection::Ascending, - ) => core_index >= num_cpu_cores / 2, - ( - HyperThreadMapping::Split | HyperThreadMapping::Subsequent, - CpuPinningDirection::Descending, - ) => core_index < num_cpu_cores / 2, - (_, _) => false, - }; - - if too_many_workers { - return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode")); - } - - let cpu_set = match config.hyperthread() { - HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index), - HyperThreadMapping::Split => match config.direction() { - CpuPinningDirection::Ascending => get_cpu_set()? - .filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2), - CpuPinningDirection::Descending => get_cpu_set()? - .filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2), - }, - HyperThreadMapping::Subsequent => { - let cpu_index_offset = match config.direction() { - // 0 -> 0 and 1 - // 1 -> 2 and 3 - // 2 -> 4 and 5 - CpuPinningDirection::Ascending => core_index * 2, - // 15 -> 14 and 15 - // 14 -> 12 and 13 - // 13 -> 10 and 11 - CpuPinningDirection::Descending => { - num_cpu_cores - 2 * (num_cpu_cores - core_index) - } - }; - - get_cpu_set()? - .filter(|l| l.cpu == cpu_index_offset || l.cpu == cpu_index_offset + 1) - } - }; - - if cpu_set.is_empty() { - Err(anyhow::anyhow!( - "CPU pinning: produced empty CPU set for {:?}. Try decreasing number of workers", - worker_index - )) - } else { - ::log::info!( - "Logical CPUs for {:?}: {}", - worker_index, - logical_cpus_string(&cpu_set) - ); - - Ok(cpu_set) - } - } - - pub fn get_worker_placement( - config: &C, - socket_workers: usize, - swarm_workers: usize, - worker_index: WorkerIndex, - ) -> anyhow::Result { - if config.active() { - let cpu_set = get_worker_cpu_set(config, socket_workers, swarm_workers, worker_index)?; - - Ok(Placement::Fenced(cpu_set)) - } else { - Ok(Placement::Unbound) - } - } - - pub fn set_affinity_for_util_worker( - config: &C, - socket_workers: usize, - swarm_workers: usize, - ) -> anyhow::Result<()> { - let worker_cpu_set = - get_worker_cpu_set(config, socket_workers, swarm_workers, WorkerIndex::Util)?; - - unsafe { - let mut set: libc::cpu_set_t = ::std::mem::zeroed(); - - for cpu_location in worker_cpu_set { - libc::CPU_SET(cpu_location.cpu, &mut set); - } - - let status = libc::pthread_setaffinity_np( - libc::pthread_self(), - ::std::mem::size_of::(), - &set, - ); - - if status != 0 { - return Err(anyhow::Error::new(::std::io::Error::from_raw_os_error( - status, - ))); - } - } - - Ok(()) - } -} - /// Pin current thread to a suitable core /// /// Requires hwloc (`apt-get install libhwloc-dev`) -#[cfg(feature = "hwloc")] pub fn pin_current_if_configured_to( config: &C, socket_workers: usize, diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 46453e2..375bd98 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -6,6 +6,7 @@ use ahash::RandomState; pub mod access_list; pub mod cli; +#[cfg(feature = "cpu-pinning")] pub mod cpu_pinning; pub mod privileges; #[cfg(feature = "rustls")] diff --git a/crates/http/src/config.rs b/crates/http/src/config.rs index fa4640a..68c0e3d 100644 --- a/crates/http/src/config.rs +++ b/crates/http/src/config.rs @@ -1,8 +1,6 @@ use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::{ - access_list::AccessListConfig, privileges::PrivilegeConfig, -}; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_toml_config::TomlConfig; use serde::{Deserialize, Serialize}; diff --git a/crates/http_load_test/Cargo.toml b/crates/http_load_test/Cargo.toml index 52c942f..b15c260 100644 --- a/crates/http_load_test/Cargo.toml +++ b/crates/http_load_test/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true name = "aquatic_http_load_test" [dependencies] -aquatic_common = { workspace = true, features = ["glommio"] } +aquatic_common.workspace = true aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true diff --git a/crates/http_load_test/src/main.rs b/crates/http_load_test/src/main.rs index 21b881e..176fe66 100644 --- a/crates/http_load_test/src/main.rs +++ b/crates/http_load_test/src/main.rs @@ -3,8 +3,6 @@ use std::thread; use std::time::{Duration, Instant}; use ::glommio::LocalExecutorBuilder; -use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; -use aquatic_common::cpu_pinning::WorkerIndex; use rand::prelude::*; use rand_distr::Gamma; @@ -65,19 +63,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { None }; - for i in 0..config.num_workers { + for _ in 0..config.num_workers { let config = config.clone(); let opt_tls_config = opt_tls_config.clone(); let state = state.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.num_workers, - 0, - WorkerIndex::SocketWorker(i), - )?; - - LocalExecutorBuilder::new(placement) + LocalExecutorBuilder::default() .name("load-test") .spawn(move || async move { run_socket_thread(config, opt_tls_config, state) @@ -87,10 +78,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { .unwrap(); } - if config.cpu_pinning.active { - set_affinity_for_util_worker(&config.cpu_pinning, config.num_workers, 0)?; - } - monitor_statistics(state, &config); Ok(()) diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index ffc602c..72c1f92 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -24,7 +24,7 @@ prometheus = ["metrics", "aquatic_common/prometheus"] # Experimental io_uring support (Linux 6.0 or later required) io-uring = ["dep:io-uring"] # Experimental CPU pinning support -cpu-pinning = ["aquatic_common/hwloc"] +cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] aquatic_common.workspace = true diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index e42aad3..2eedf92 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -11,7 +11,7 @@ readme.workspace = true rust-version.workspace = true [features] -cpu-pinning = ["aquatic_common/hwloc"] +cpu-pinning = ["aquatic_common/cpu-pinning"] [lib] name = "aquatic_udp_load_test" diff --git a/crates/ws_load_test/Cargo.toml b/crates/ws_load_test/Cargo.toml index 90dbe94..932aebf 100644 --- a/crates/ws_load_test/Cargo.toml +++ b/crates/ws_load_test/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true name = "aquatic_ws_load_test" [dependencies] -aquatic_common = { workspace = true, features = ["glommio"] } +aquatic_common.workspace = true aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true diff --git a/crates/ws_load_test/src/main.rs b/crates/ws_load_test/src/main.rs index 91cd8ff..94e91e8 100644 --- a/crates/ws_load_test/src/main.rs +++ b/crates/ws_load_test/src/main.rs @@ -2,8 +2,6 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; -use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; -use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_ws_protocol::common::InfoHash; use glommio::LocalExecutorBuilder; use rand::prelude::*; @@ -59,19 +57,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for i in 0..config.num_workers { + for _ in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.num_workers, - 0, - WorkerIndex::SocketWorker(i), - )?; - - LocalExecutorBuilder::new(placement) + LocalExecutorBuilder::default() .name("load-test") .spawn(move || async move { run_socket_thread(config, tls_config, state).await.unwrap(); @@ -79,10 +70,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { .unwrap(); } - if config.cpu_pinning.active { - set_affinity_for_util_worker(&config.cpu_pinning, config.num_workers, 0)?; - } - monitor_statistics(state, &config); Ok(())