cpu pinning: add hyperthread mapping modes (glommio only)

This commit is contained in:
Joakim Frostegård 2022-03-31 15:06:44 +02:00
parent fb607ac0c2
commit 3dc9068dd2

View file

@ -14,11 +14,29 @@ impl Default for CpuPinningMode {
} }
} }
#[cfg(feature = "with-glommio")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum HyperThreadMapping {
System,
Subsequent,
Split,
}
#[cfg(feature = "with-glommio")]
impl Default for HyperThreadMapping {
fn default() -> Self {
Self::System
}
}
/// Experimental CPU pinning /// Experimental CPU pinning
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
pub struct CpuPinningConfig { pub struct CpuPinningConfig {
pub active: bool, pub active: bool,
pub mode: CpuPinningMode, pub mode: CpuPinningMode,
#[cfg(feature = "with-glommio")]
pub hyperthread: HyperThreadMapping,
pub core_offset: usize, pub core_offset: usize,
} }
@ -27,6 +45,8 @@ impl Default for CpuPinningConfig {
Self { Self {
active: false, active: false,
mode: Default::default(), mode: Default::default(),
#[cfg(feature = "with-glommio")]
hyperthread: Default::default(),
core_offset: 0, core_offset: 0,
} }
} }
@ -88,9 +108,22 @@ pub mod glommio {
.iter() .iter()
.map(|l| l.core) .map(|l| l.core)
.max() .max()
.map(|index| index + 1)
.ok_or(anyhow::anyhow!("CpuSet is empty")) .ok_or(anyhow::anyhow!("CpuSet is empty"))
} }
fn logical_cpus_string(cpu_set: &CpuSet) -> String {
let mut logical_cpus = cpu_set.iter().map(|l| l.cpu).collect::<Vec<usize>>();
logical_cpus.sort_unstable();
logical_cpus
.into_iter()
.map(|cpu| cpu.to_string())
.collect::<Vec<String>>()
.join(", ")
}
fn get_worker_cpu_set( fn get_worker_cpu_set(
config: &CpuPinningConfig, config: &CpuPinningConfig,
socket_workers: usize, socket_workers: usize,
@ -102,7 +135,61 @@ pub mod glommio {
let core_index = let core_index =
worker_index.get_core_index(&config, socket_workers, request_workers, num_cpu_cores); worker_index.get_core_index(&config, socket_workers, request_workers, num_cpu_cores);
Ok(get_cpu_set()?.filter(|l| l.core == core_index)) let too_many_workers = match (&config.hyperthread, &config.mode) {
(
HyperThreadMapping::Split | HyperThreadMapping::Subsequent,
CpuPinningMode::Ascending,
) => core_index >= num_cpu_cores / 2,
(
HyperThreadMapping::Split | HyperThreadMapping::Subsequent,
CpuPinningMode::Descending,
) => core_index < num_cpu_cores / 2,
(_, _) => false,
};
if too_many_workers {
return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode"));
}
let cpu_set = match config.hyperthread {
HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index),
HyperThreadMapping::Split => match config.mode {
CpuPinningMode::Ascending => get_cpu_set()?
.filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2),
CpuPinningMode::Descending => get_cpu_set()?
.filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2),
},
HyperThreadMapping::Subsequent => {
let cpu_index_offset = match config.mode {
// 0 -> 0 and 1
// 1 -> 2 and 3
// 2 -> 4 and 5
CpuPinningMode::Ascending => core_index * 2,
// 15 -> 14 and 15
// 14 -> 12 and 13
// 13 -> 10 and 11
CpuPinningMode::Descending => num_cpu_cores - 2 * (num_cpu_cores - core_index),
};
get_cpu_set()?
.filter(|l| l.cpu == cpu_index_offset || l.cpu == cpu_index_offset + 1)
}
};
if cpu_set.is_empty() {
Err(anyhow::anyhow!(
"CPU pinning: produced empty CPU set for {:?}. Try decreasing number of workers",
worker_index
))
} else {
::log::info!(
"Logical CPUs for {:?}: {}",
worker_index,
logical_cpus_string(&cpu_set)
);
Ok(cpu_set)
}
} }
pub fn get_worker_placement( pub fn get_worker_placement(
@ -129,13 +216,11 @@ pub mod glommio {
let worker_cpu_set = let worker_cpu_set =
get_worker_cpu_set(&config, socket_workers, request_workers, WorkerIndex::Util)?; get_worker_cpu_set(&config, socket_workers, request_workers, WorkerIndex::Util)?;
let logical_cpus: Vec<usize> = worker_cpu_set.iter().map(|l| l.cpu).collect();
unsafe { unsafe {
let mut set: libc::cpu_set_t = ::std::mem::zeroed(); let mut set: libc::cpu_set_t = ::std::mem::zeroed();
for cpu in logical_cpus { for cpu_location in worker_cpu_set {
libc::CPU_SET(cpu, &mut set); libc::CPU_SET(cpu_location.cpu, &mut set);
} }
let status = libc::pthread_setaffinity_np( let status = libc::pthread_setaffinity_np(