Improve CPU pinning

This commit is contained in:
Joakim Frostegård 2022-03-30 22:33:23 +02:00
parent 5057ba73bd
commit fb607ac0c2
20 changed files with 219 additions and 143 deletions

View file

@ -12,7 +12,8 @@ readme = "../README.md"
name = "aquatic_common"
[features]
cpu-pinning = ["hwloc", "libc"]
with-glommio = ["glommio"]
with-hwloc = ["hwloc"]
[dependencies]
aquatic_toml_config = "0.2.0"
@ -23,11 +24,12 @@ arc-swap = "1"
hashbrown = "0.12"
hex = "0.4"
indexmap-amortized = "1"
libc = "0.2"
log = "0.4"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
# cpu-pinning
# Optional
glommio = { version = "0.7", optional = true }
hwloc = { version = "0.5", optional = true }
libc = { version = "0.2", optional = true }

View file

@ -1,5 +1,4 @@
use aquatic_toml_config::TomlConfig;
use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]
@ -15,6 +14,7 @@ impl Default for CpuPinningMode {
}
}
/// Experimental CPU pinning
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
pub struct CpuPinningConfig {
pub active: bool,
@ -45,37 +45,128 @@ impl CpuPinningConfig {
pub enum WorkerIndex {
SocketWorker(usize),
RequestWorker(usize),
Other,
Util,
}
impl WorkerIndex {
fn get_core_index(
self,
pub fn get_core_index(
&self,
config: &CpuPinningConfig,
socket_workers: usize,
core_count: usize,
request_workers: usize,
num_cores: usize,
) -> usize {
let ascending_index = match self {
Self::Other => config.core_offset,
Self::SocketWorker(index) => config.core_offset + 1 + index,
Self::RequestWorker(index) => config.core_offset + 1 + socket_workers + index,
Self::SocketWorker(index) => config.core_offset + index,
Self::RequestWorker(index) => config.core_offset + socket_workers + index,
Self::Util => config.core_offset + socket_workers + request_workers,
};
let max_core_index = num_cores - 1;
let ascending_index = ascending_index.min(max_core_index);
match config.mode {
CpuPinningMode::Ascending => ascending_index,
CpuPinningMode::Descending => core_count - 1 - ascending_index,
CpuPinningMode::Descending => max_core_index - ascending_index,
}
}
}
#[cfg(feature = "with-glommio")]
pub mod glommio {
use ::glommio::{CpuSet, Placement};
use super::*;
fn get_cpu_set() -> anyhow::Result<CpuSet> {
CpuSet::online().map_err(|err| anyhow::anyhow!("Couldn't get CPU set: {:#}", err))
}
fn get_num_cpu_cores() -> anyhow::Result<usize> {
get_cpu_set()?
.iter()
.map(|l| l.core)
.max()
.ok_or(anyhow::anyhow!("CpuSet is empty"))
}
fn get_worker_cpu_set(
config: &CpuPinningConfig,
socket_workers: usize,
request_workers: usize,
worker_index: WorkerIndex,
) -> anyhow::Result<CpuSet> {
let num_cpu_cores = get_num_cpu_cores()?;
let core_index =
worker_index.get_core_index(&config, socket_workers, request_workers, num_cpu_cores);
Ok(get_cpu_set()?.filter(|l| l.core == core_index))
}
pub fn get_worker_placement(
config: &CpuPinningConfig,
socket_workers: usize,
request_workers: usize,
worker_index: WorkerIndex,
) -> anyhow::Result<Placement> {
if config.active {
let cpu_set =
get_worker_cpu_set(&config, socket_workers, request_workers, worker_index)?;
Ok(Placement::Fenced(cpu_set))
} else {
Ok(Placement::Unbound)
}
}
pub fn set_affinity_for_util_worker(
config: &CpuPinningConfig,
socket_workers: usize,
request_workers: usize,
) -> anyhow::Result<()> {
let worker_cpu_set =
get_worker_cpu_set(&config, socket_workers, request_workers, WorkerIndex::Util)?;
let logical_cpus: Vec<usize> = worker_cpu_set.iter().map(|l| l.cpu).collect();
unsafe {
let mut set: libc::cpu_set_t = ::std::mem::zeroed();
for cpu in logical_cpus {
libc::CPU_SET(cpu, &mut set);
}
let status = libc::pthread_setaffinity_np(
libc::pthread_self(),
::std::mem::size_of::<libc::cpu_set_t>(),
&set,
);
if status != 0 {
return Err(anyhow::Error::new(::std::io::Error::from_raw_os_error(
status,
)));
}
}
Ok(())
}
}
/// Pin current thread to a suitable core
///
/// Requires hwloc (`apt-get install libhwloc-dev`)
#[cfg(feature = "with-hwloc")]
pub fn pin_current_if_configured_to(
config: &CpuPinningConfig,
socket_workers: usize,
request_workers: usize,
worker_index: WorkerIndex,
) {
use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD};
if config.active {
let mut topology = Topology::new();
@ -86,7 +177,10 @@ pub fn pin_current_if_configured_to(
.map(|core| core.allowed_cpuset().expect("hwloc: get core cpu set"))
.collect();
let core_index = worker_index.get_core_index(config, socket_workers, core_cpu_sets.len());
let num_cores = core_cpu_sets.len();
let core_index =
worker_index.get_core_index(config, socket_workers, request_workers, num_cores);
let cpu_set = core_cpu_sets
.get(core_index)

View file

@ -5,7 +5,6 @@ use ahash::RandomState;
use rand::Rng;
pub mod access_list;
#[cfg(feature = "cpu-pinning")]
pub mod cpu_pinning;
pub mod privileges;