From 593a46452f6f741ecc7470e7738ccfd1b487f387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 19 Nov 2021 00:17:14 +0100 Subject: [PATCH] aquatic_common: use hwloc for cpu pinning, for automatic core selection This might not work very well on virtualized hosts, however.. --- Cargo.lock | 69 +++++++++++++++++------- aquatic_common/Cargo.toml | 5 +- aquatic_common/src/cpu_pinning.rs | 89 +++++++++++++++---------------- 3 files changed, 97 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2e45b4..d95700c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,18 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "affinity" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" -dependencies = [ - "cfg-if", - "errno", - "libc", - "num_cpus", -] - [[package]] name = "ahash" version = "0.3.8" @@ -86,12 +74,12 @@ dependencies = [ name = "aquatic_common" version = "0.1.0" dependencies = [ - "affinity", "ahash 0.7.6", "anyhow", "arc-swap", "hashbrown 0.11.2", "hex", + "hwloc", "indexmap-amortized", "libc", "log", @@ -399,6 +387,12 @@ dependencies = [ "serde_bytes", ] +[[package]] +name = "bitflags" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" + [[package]] name = "bitflags" version = "1.3.2" @@ -502,7 +496,7 @@ version = "2.33.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" dependencies = [ - "bitflags", + "bitflags 1.3.2", "textwrap", "unicode-width", ] @@ -962,7 +956,7 @@ version = "0.6.0" source = "git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5#4e6b14772da2f4325271fbcf12d24cf91ed466e5" dependencies = [ "ahash 0.7.6", - "bitflags", + "bitflags 1.3.2", "bitmaps", "buddy-alloc", "cc", @@ -1076,6 +1070,21 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +[[package]] +name = "hwloc" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2934f84993b8b4bcae9b6a4e5f0aca638462dda9c7b4f26a570241494f21e0f4" +dependencies = [ + "bitflags 0.7.0", + "errno", + "kernel32-sys", + "libc", + "num", + "pkg-config", + "winapi 0.2.8", +] + [[package]] name = "idna" version = "0.2.3" @@ -1323,7 +1332,7 @@ version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cc", "cfg-if", "libc", @@ -1345,6 +1354,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "num" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4703ad64153382334aa8db57c637364c322d3372e097840c72000dabdcf6156e" +dependencies = [ + "num-integer", + "num-iter", + "num-traits", +] + [[package]] name = "num-format" version = "0.4.0" @@ -1365,6 +1385,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.14" @@ -1424,7 +1455,7 @@ version = "0.10.38" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" dependencies = [ - "bitflags", + "bitflags 1.3.2", "cfg-if", "foreign-types", "libc", @@ -1689,7 +1720,7 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -1837,7 +1868,7 @@ version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index b076f77..951b65f 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/greatest-ape/aquatic" name = "aquatic_common" [features] -cpu-pinning = ["affinity", "libc"] +cpu-pinning = ["hwloc", "libc"] [dependencies] ahash = "0.7" @@ -25,5 +25,6 @@ privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -affinity = { version = "0.1", optional = true } +# cpu-pinning +hwloc = { version = "0.5", optional = true } libc = { version = "0.2", optional = true } \ No newline at end of file diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index 43f506b..a4065df 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,3 +1,4 @@ +use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -17,8 +18,7 @@ impl Default for CpuPinningMode { pub struct CpuPinningConfig { pub active: bool, pub mode: CpuPinningMode, - pub virtual_per_physical_cpu: usize, - pub offset_cpus: usize, + pub core_offset: usize, } impl Default for CpuPinningConfig { @@ -26,8 +26,7 @@ impl Default for CpuPinningConfig { Self { active: false, mode: Default::default(), - virtual_per_physical_cpu: 2, - offset_cpus: 0, + core_offset: 0, } } } @@ -49,59 +48,59 @@ pub enum WorkerIndex { } impl WorkerIndex { - fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec { - let offset = match self { - Self::Other => config.virtual_per_physical_cpu * config.offset_cpus, - Self::SocketWorker(index) => { - config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index) - } - Self::RequestWorker(index) => { - config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index) - } + fn get_core_index( + self, + config: &CpuPinningConfig, + socket_workers: usize, + core_count: usize, + ) -> usize { + let ascending_index = match self { + Self::Other => config.core_offset, + Self::SocketWorker(index) => config.core_offset + 1 + index, + Self::RequestWorker(index) => config.core_offset + 1 + socket_workers + index, }; - let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i); - - let virtual_cpus: Vec = match config.mode { - CpuPinningMode::Ascending => virtual_cpus.collect(), - CpuPinningMode::Descending => { - let max_index = affinity::get_core_num() - 1; - - virtual_cpus - .map(|i| max_index.checked_sub(i).unwrap_or(0)) - .collect() - } - }; - - ::log::info!( - "Calculated virtual CPU pin indices {:?} for {:?}", - virtual_cpus, - self - ); - - virtual_cpus + match config.mode { + CpuPinningMode::Ascending => ascending_index, + CpuPinningMode::Descending => core_count - 1 - ascending_index, + } } } -/// Note: don't call this when affinities were already set in the current or in -/// a parent thread. Doing so limits the number of cores that are seen and -/// messes up setting affinities. +/// Pin current thread to a suitable core +/// +/// Requires hwloc (`apt-get install libhwloc-dev`) pub fn pin_current_if_configured_to( config: &CpuPinningConfig, socket_workers: usize, worker_index: WorkerIndex, ) { if config.active { - let indices = worker_index.get_cpu_indices(config, socket_workers); + let mut topology = Topology::new(); - if let Err(err) = affinity::set_thread_affinity(indices.clone()) { - ::log::error!( - "Failed setting thread affinities {:?} for {:?}: {:#?}", - indices, - worker_index, - err - ); - } + let core_cpu_sets: Vec = topology + .objects_with_type(&ObjectType::Core) + .expect("hwloc: list cores") + .into_iter() + .map(|core| core.allowed_cpuset().expect("hwloc: get core cpu set")) + .collect(); + + let core_index = worker_index.get_core_index(config, socket_workers, core_cpu_sets.len()); + + let cpu_set = core_cpu_sets + .get(core_index) + .expect(&format!("get cpu set for core {}", core_index)) + .to_owned(); + + topology + .set_cpubind(cpu_set, CPUBIND_THREAD) + .expect(&format!("bind thread to core {}", core_index)); + + ::log::info!( + "Pinned worker {:?} to cpu core {}", + worker_index, + core_index + ); } }