mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
aquatic_common: use hwloc for cpu pinning, for automatic core selection
This might not work very well on virtualized hosts, however..
This commit is contained in:
parent
c1f2d036c0
commit
593a46452f
3 changed files with 97 additions and 66 deletions
69
Cargo.lock
generated
69
Cargo.lock
generated
|
|
@ -17,18 +17,6 @@ version = "1.0.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "affinity"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"errno",
|
|
||||||
"libc",
|
|
||||||
"num_cpus",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ahash"
|
name = "ahash"
|
||||||
version = "0.3.8"
|
version = "0.3.8"
|
||||||
|
|
@ -86,12 +74,12 @@ dependencies = [
|
||||||
name = "aquatic_common"
|
name = "aquatic_common"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"affinity",
|
|
||||||
"ahash 0.7.6",
|
"ahash 0.7.6",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"hashbrown 0.11.2",
|
"hashbrown 0.11.2",
|
||||||
"hex",
|
"hex",
|
||||||
|
"hwloc",
|
||||||
"indexmap-amortized",
|
"indexmap-amortized",
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
|
|
@ -399,6 +387,12 @@ dependencies = [
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bitflags"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "1.3.2"
|
version = "1.3.2"
|
||||||
|
|
@ -502,7 +496,7 @@ version = "2.33.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
|
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags 1.3.2",
|
||||||
"textwrap",
|
"textwrap",
|
||||||
"unicode-width",
|
"unicode-width",
|
||||||
]
|
]
|
||||||
|
|
@ -962,7 +956,7 @@ version = "0.6.0"
|
||||||
source = "git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5#4e6b14772da2f4325271fbcf12d24cf91ed466e5"
|
source = "git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5#4e6b14772da2f4325271fbcf12d24cf91ed466e5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.7.6",
|
"ahash 0.7.6",
|
||||||
"bitflags",
|
"bitflags 1.3.2",
|
||||||
"bitmaps",
|
"bitmaps",
|
||||||
"buddy-alloc",
|
"buddy-alloc",
|
||||||
"cc",
|
"cc",
|
||||||
|
|
@ -1076,6 +1070,21 @@ version = "1.5.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
|
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hwloc"
|
||||||
|
version = "0.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2934f84993b8b4bcae9b6a4e5f0aca638462dda9c7b4f26a570241494f21e0f4"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 0.7.0",
|
||||||
|
"errno",
|
||||||
|
"kernel32-sys",
|
||||||
|
"libc",
|
||||||
|
"num",
|
||||||
|
"pkg-config",
|
||||||
|
"winapi 0.2.8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "idna"
|
name = "idna"
|
||||||
version = "0.2.3"
|
version = "0.2.3"
|
||||||
|
|
@ -1323,7 +1332,7 @@ version = "0.23.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
|
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags 1.3.2",
|
||||||
"cc",
|
"cc",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"libc",
|
"libc",
|
||||||
|
|
@ -1345,6 +1354,17 @@ dependencies = [
|
||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num"
|
||||||
|
version = "0.1.42"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4703ad64153382334aa8db57c637364c322d3372e097840c72000dabdcf6156e"
|
||||||
|
dependencies = [
|
||||||
|
"num-integer",
|
||||||
|
"num-iter",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-format"
|
name = "num-format"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
|
@ -1365,6 +1385,17 @@ dependencies = [
|
||||||
"num-traits",
|
"num-traits",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num-iter"
|
||||||
|
version = "0.1.42"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"num-integer",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-traits"
|
name = "num-traits"
|
||||||
version = "0.2.14"
|
version = "0.2.14"
|
||||||
|
|
@ -1424,7 +1455,7 @@ version = "0.10.38"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95"
|
checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags 1.3.2",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"foreign-types",
|
"foreign-types",
|
||||||
"libc",
|
"libc",
|
||||||
|
|
@ -1689,7 +1720,7 @@ version = "0.2.10"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
|
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags 1.3.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
@ -1837,7 +1868,7 @@ version = "2.4.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87"
|
checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags 1.3.2",
|
||||||
"core-foundation",
|
"core-foundation",
|
||||||
"core-foundation-sys",
|
"core-foundation-sys",
|
||||||
"libc",
|
"libc",
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ repository = "https://github.com/greatest-ape/aquatic"
|
||||||
name = "aquatic_common"
|
name = "aquatic_common"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
cpu-pinning = ["affinity", "libc"]
|
cpu-pinning = ["hwloc", "libc"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
ahash = "0.7"
|
ahash = "0.7"
|
||||||
|
|
@ -25,5 +25,6 @@ privdrop = "0.5"
|
||||||
rand = { version = "0.8", features = ["small_rng"] }
|
rand = { version = "0.8", features = ["small_rng"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
|
||||||
affinity = { version = "0.1", optional = true }
|
# cpu-pinning
|
||||||
|
hwloc = { version = "0.5", optional = true }
|
||||||
libc = { version = "0.2", optional = true }
|
libc = { version = "0.2", optional = true }
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
|
@ -17,8 +18,7 @@ impl Default for CpuPinningMode {
|
||||||
pub struct CpuPinningConfig {
|
pub struct CpuPinningConfig {
|
||||||
pub active: bool,
|
pub active: bool,
|
||||||
pub mode: CpuPinningMode,
|
pub mode: CpuPinningMode,
|
||||||
pub virtual_per_physical_cpu: usize,
|
pub core_offset: usize,
|
||||||
pub offset_cpus: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CpuPinningConfig {
|
impl Default for CpuPinningConfig {
|
||||||
|
|
@ -26,8 +26,7 @@ impl Default for CpuPinningConfig {
|
||||||
Self {
|
Self {
|
||||||
active: false,
|
active: false,
|
||||||
mode: Default::default(),
|
mode: Default::default(),
|
||||||
virtual_per_physical_cpu: 2,
|
core_offset: 0,
|
||||||
offset_cpus: 0,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -49,61 +48,61 @@ pub enum WorkerIndex {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerIndex {
|
impl WorkerIndex {
|
||||||
fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec<usize> {
|
fn get_core_index(
|
||||||
let offset = match self {
|
self,
|
||||||
Self::Other => config.virtual_per_physical_cpu * config.offset_cpus,
|
config: &CpuPinningConfig,
|
||||||
Self::SocketWorker(index) => {
|
socket_workers: usize,
|
||||||
config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index)
|
core_count: usize,
|
||||||
}
|
) -> usize {
|
||||||
Self::RequestWorker(index) => {
|
let ascending_index = match self {
|
||||||
config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index)
|
Self::Other => config.core_offset,
|
||||||
}
|
Self::SocketWorker(index) => config.core_offset + 1 + index,
|
||||||
|
Self::RequestWorker(index) => config.core_offset + 1 + socket_workers + index,
|
||||||
};
|
};
|
||||||
|
|
||||||
let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i);
|
match config.mode {
|
||||||
|
CpuPinningMode::Ascending => ascending_index,
|
||||||
let virtual_cpus: Vec<usize> = match config.mode {
|
CpuPinningMode::Descending => core_count - 1 - ascending_index,
|
||||||
CpuPinningMode::Ascending => virtual_cpus.collect(),
|
|
||||||
CpuPinningMode::Descending => {
|
|
||||||
let max_index = affinity::get_core_num() - 1;
|
|
||||||
|
|
||||||
virtual_cpus
|
|
||||||
.map(|i| max_index.checked_sub(i).unwrap_or(0))
|
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
::log::info!(
|
|
||||||
"Calculated virtual CPU pin indices {:?} for {:?}",
|
|
||||||
virtual_cpus,
|
|
||||||
self
|
|
||||||
);
|
|
||||||
|
|
||||||
virtual_cpus
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Note: don't call this when affinities were already set in the current or in
|
/// Pin current thread to a suitable core
|
||||||
/// a parent thread. Doing so limits the number of cores that are seen and
|
///
|
||||||
/// messes up setting affinities.
|
/// Requires hwloc (`apt-get install libhwloc-dev`)
|
||||||
pub fn pin_current_if_configured_to(
|
pub fn pin_current_if_configured_to(
|
||||||
config: &CpuPinningConfig,
|
config: &CpuPinningConfig,
|
||||||
socket_workers: usize,
|
socket_workers: usize,
|
||||||
worker_index: WorkerIndex,
|
worker_index: WorkerIndex,
|
||||||
) {
|
) {
|
||||||
if config.active {
|
if config.active {
|
||||||
let indices = worker_index.get_cpu_indices(config, socket_workers);
|
let mut topology = Topology::new();
|
||||||
|
|
||||||
if let Err(err) = affinity::set_thread_affinity(indices.clone()) {
|
let core_cpu_sets: Vec<CpuSet> = topology
|
||||||
::log::error!(
|
.objects_with_type(&ObjectType::Core)
|
||||||
"Failed setting thread affinities {:?} for {:?}: {:#?}",
|
.expect("hwloc: list cores")
|
||||||
indices,
|
.into_iter()
|
||||||
|
.map(|core| core.allowed_cpuset().expect("hwloc: get core cpu set"))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let core_index = worker_index.get_core_index(config, socket_workers, core_cpu_sets.len());
|
||||||
|
|
||||||
|
let cpu_set = core_cpu_sets
|
||||||
|
.get(core_index)
|
||||||
|
.expect(&format!("get cpu set for core {}", core_index))
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
topology
|
||||||
|
.set_cpubind(cpu_set, CPUBIND_THREAD)
|
||||||
|
.expect(&format!("bind thread to core {}", core_index));
|
||||||
|
|
||||||
|
::log::info!(
|
||||||
|
"Pinned worker {:?} to cpu core {}",
|
||||||
worker_index,
|
worker_index,
|
||||||
err
|
core_index
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Tell Linux that incoming messages should be handled by the socket worker
|
/// Tell Linux that incoming messages should be handled by the socket worker
|
||||||
/// with the same index as the CPU core receiving the interrupt.
|
/// with the same index as the CPU core receiving the interrupt.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue