aquatic_common: feature-gate cpu pinning, remove hwloc feature

This commit is contained in:
Joakim Frostegård 2024-02-03 23:16:45 +01:00
parent 1acf44c001
commit 437f2def7d
11 changed files with 13 additions and 215 deletions

1
Cargo.lock generated
View file

@ -158,7 +158,6 @@ dependencies = [
"arc-swap", "arc-swap",
"duplicate", "duplicate",
"git-testament", "git-testament",
"glommio",
"hashbrown 0.14.3", "hashbrown 0.14.3",
"hex", "hex",
"hwloc", "hwloc",

View file

@ -15,6 +15,8 @@ name = "aquatic_common"
[features] [features]
rustls = ["dep:rustls", "rustls-pemfile"] rustls = ["dep:rustls", "rustls-pemfile"]
prometheus = ["dep:metrics", "dep:metrics-util", "dep:metrics-exporter-prometheus", "dep:tokio"] prometheus = ["dep:metrics", "dep:metrics-util", "dep:metrics-exporter-prometheus", "dep:tokio"]
# Experimental CPU pinning support. Requires hwloc (apt-get install libhwloc-dev)
cpu-pinning = ["dep:hwloc"]
[dependencies] [dependencies]
aquatic_toml_config.workspace = true aquatic_toml_config.workspace = true
@ -45,6 +47,5 @@ metrics-util = { version = "0.16", optional = true }
metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] }
tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } tokio = { version = "1", optional = true, features = ["rt", "net", "time"] }
# other optional # cpu pinning feature
glommio = { version = "0.8", optional = true }
hwloc = { version = "0.5", optional = true } hwloc = { version = "0.5", optional = true }

View file

@ -16,27 +16,9 @@ impl Default for CpuPinningDirection {
} }
} }
#[cfg(feature = "glommio")]
#[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum HyperThreadMapping {
System,
Subsequent,
Split,
}
#[cfg(feature = "glommio")]
impl Default for HyperThreadMapping {
fn default() -> Self {
Self::System
}
}
pub trait CpuPinningConfig { pub trait CpuPinningConfig {
fn active(&self) -> bool; fn active(&self) -> bool;
fn direction(&self) -> CpuPinningDirection; fn direction(&self) -> CpuPinningDirection;
#[cfg(feature = "glommio")]
fn hyperthread(&self) -> HyperThreadMapping;
fn core_offset(&self) -> usize; fn core_offset(&self) -> usize;
} }
@ -54,8 +36,6 @@ pub mod mod_name {
pub struct struct_name { pub struct struct_name {
pub active: bool, pub active: bool,
pub direction: CpuPinningDirection, pub direction: CpuPinningDirection,
#[cfg(feature = "glommio")]
pub hyperthread: HyperThreadMapping,
pub core_offset: usize, pub core_offset: usize,
} }
@ -64,8 +44,6 @@ pub mod mod_name {
Self { Self {
active: false, active: false,
direction: cpu_pinning_direction, direction: cpu_pinning_direction,
#[cfg(feature = "glommio")]
hyperthread: Default::default(),
core_offset: 0, core_offset: 0,
} }
} }
@ -77,10 +55,6 @@ pub mod mod_name {
fn direction(&self) -> CpuPinningDirection { fn direction(&self) -> CpuPinningDirection {
self.direction self.direction
} }
#[cfg(feature = "glommio")]
fn hyperthread(&self) -> HyperThreadMapping {
self.hyperthread
}
fn core_offset(&self) -> usize { fn core_offset(&self) -> usize {
self.core_offset self.core_offset
} }
@ -119,158 +93,9 @@ impl WorkerIndex {
} }
} }
#[cfg(feature = "glommio")]
pub mod glommio {
use ::glommio::{CpuSet, Placement};
use super::*;
fn get_cpu_set() -> anyhow::Result<CpuSet> {
CpuSet::online().map_err(|err| anyhow::anyhow!("Couldn't get CPU set: {:#}", err))
}
fn get_num_cpu_cores() -> anyhow::Result<usize> {
get_cpu_set()?
.iter()
.map(|l| l.core)
.max()
.map(|index| index + 1)
.ok_or(anyhow::anyhow!("CpuSet is empty"))
}
fn logical_cpus_string(cpu_set: &CpuSet) -> String {
let mut logical_cpus = cpu_set.iter().map(|l| l.cpu).collect::<Vec<usize>>();
logical_cpus.sort_unstable();
logical_cpus
.into_iter()
.map(|cpu| cpu.to_string())
.collect::<Vec<String>>()
.join(", ")
}
fn get_worker_cpu_set<C: CpuPinningConfig>(
config: &C,
socket_workers: usize,
swarm_workers: usize,
worker_index: WorkerIndex,
) -> anyhow::Result<CpuSet> {
let num_cpu_cores = get_num_cpu_cores()?;
let core_index =
worker_index.get_core_index(config, socket_workers, swarm_workers, num_cpu_cores);
let too_many_workers = match (&config.hyperthread(), &config.direction()) {
(
HyperThreadMapping::Split | HyperThreadMapping::Subsequent,
CpuPinningDirection::Ascending,
) => core_index >= num_cpu_cores / 2,
(
HyperThreadMapping::Split | HyperThreadMapping::Subsequent,
CpuPinningDirection::Descending,
) => core_index < num_cpu_cores / 2,
(_, _) => false,
};
if too_many_workers {
return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode"));
}
let cpu_set = match config.hyperthread() {
HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index),
HyperThreadMapping::Split => match config.direction() {
CpuPinningDirection::Ascending => get_cpu_set()?
.filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2),
CpuPinningDirection::Descending => get_cpu_set()?
.filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2),
},
HyperThreadMapping::Subsequent => {
let cpu_index_offset = match config.direction() {
// 0 -> 0 and 1
// 1 -> 2 and 3
// 2 -> 4 and 5
CpuPinningDirection::Ascending => core_index * 2,
// 15 -> 14 and 15
// 14 -> 12 and 13
// 13 -> 10 and 11
CpuPinningDirection::Descending => {
num_cpu_cores - 2 * (num_cpu_cores - core_index)
}
};
get_cpu_set()?
.filter(|l| l.cpu == cpu_index_offset || l.cpu == cpu_index_offset + 1)
}
};
if cpu_set.is_empty() {
Err(anyhow::anyhow!(
"CPU pinning: produced empty CPU set for {:?}. Try decreasing number of workers",
worker_index
))
} else {
::log::info!(
"Logical CPUs for {:?}: {}",
worker_index,
logical_cpus_string(&cpu_set)
);
Ok(cpu_set)
}
}
pub fn get_worker_placement<C: CpuPinningConfig>(
config: &C,
socket_workers: usize,
swarm_workers: usize,
worker_index: WorkerIndex,
) -> anyhow::Result<Placement> {
if config.active() {
let cpu_set = get_worker_cpu_set(config, socket_workers, swarm_workers, worker_index)?;
Ok(Placement::Fenced(cpu_set))
} else {
Ok(Placement::Unbound)
}
}
pub fn set_affinity_for_util_worker<C: CpuPinningConfig>(
config: &C,
socket_workers: usize,
swarm_workers: usize,
) -> anyhow::Result<()> {
let worker_cpu_set =
get_worker_cpu_set(config, socket_workers, swarm_workers, WorkerIndex::Util)?;
unsafe {
let mut set: libc::cpu_set_t = ::std::mem::zeroed();
for cpu_location in worker_cpu_set {
libc::CPU_SET(cpu_location.cpu, &mut set);
}
let status = libc::pthread_setaffinity_np(
libc::pthread_self(),
::std::mem::size_of::<libc::cpu_set_t>(),
&set,
);
if status != 0 {
return Err(anyhow::Error::new(::std::io::Error::from_raw_os_error(
status,
)));
}
}
Ok(())
}
}
/// Pin current thread to a suitable core /// Pin current thread to a suitable core
/// ///
/// Requires hwloc (`apt-get install libhwloc-dev`) /// Requires hwloc (`apt-get install libhwloc-dev`)
#[cfg(feature = "hwloc")]
pub fn pin_current_if_configured_to<C: CpuPinningConfig>( pub fn pin_current_if_configured_to<C: CpuPinningConfig>(
config: &C, config: &C,
socket_workers: usize, socket_workers: usize,

View file

@ -6,6 +6,7 @@ use ahash::RandomState;
pub mod access_list; pub mod access_list;
pub mod cli; pub mod cli;
#[cfg(feature = "cpu-pinning")]
pub mod cpu_pinning; pub mod cpu_pinning;
pub mod privileges; pub mod privileges;
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]

View file

@ -1,8 +1,6 @@
use std::{net::SocketAddr, path::PathBuf}; use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{ use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
access_list::AccessListConfig, privileges::PrivilegeConfig,
};
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -14,7 +14,7 @@ rust-version.workspace = true
name = "aquatic_http_load_test" name = "aquatic_http_load_test"
[dependencies] [dependencies]
aquatic_common = { workspace = true, features = ["glommio"] } aquatic_common.workspace = true
aquatic_http_protocol.workspace = true aquatic_http_protocol.workspace = true
aquatic_toml_config.workspace = true aquatic_toml_config.workspace = true

View file

@ -3,8 +3,6 @@ use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use ::glommio::LocalExecutorBuilder; use ::glommio::LocalExecutorBuilder;
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex;
use rand::prelude::*; use rand::prelude::*;
use rand_distr::Gamma; use rand_distr::Gamma;
@ -65,19 +63,12 @@ fn run(config: Config) -> ::anyhow::Result<()> {
None None
}; };
for i in 0..config.num_workers { for _ in 0..config.num_workers {
let config = config.clone(); let config = config.clone();
let opt_tls_config = opt_tls_config.clone(); let opt_tls_config = opt_tls_config.clone();
let state = state.clone(); let state = state.clone();
let placement = get_worker_placement( LocalExecutorBuilder::default()
&config.cpu_pinning,
config.num_workers,
0,
WorkerIndex::SocketWorker(i),
)?;
LocalExecutorBuilder::new(placement)
.name("load-test") .name("load-test")
.spawn(move || async move { .spawn(move || async move {
run_socket_thread(config, opt_tls_config, state) run_socket_thread(config, opt_tls_config, state)
@ -87,10 +78,6 @@ fn run(config: Config) -> ::anyhow::Result<()> {
.unwrap(); .unwrap();
} }
if config.cpu_pinning.active {
set_affinity_for_util_worker(&config.cpu_pinning, config.num_workers, 0)?;
}
monitor_statistics(state, &config); monitor_statistics(state, &config);
Ok(()) Ok(())

View file

@ -24,7 +24,7 @@ prometheus = ["metrics", "aquatic_common/prometheus"]
# Experimental io_uring support (Linux 6.0 or later required) # Experimental io_uring support (Linux 6.0 or later required)
io-uring = ["dep:io-uring"] io-uring = ["dep:io-uring"]
# Experimental CPU pinning support # Experimental CPU pinning support
cpu-pinning = ["aquatic_common/hwloc"] cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies] [dependencies]
aquatic_common.workspace = true aquatic_common.workspace = true

View file

@ -11,7 +11,7 @@ readme.workspace = true
rust-version.workspace = true rust-version.workspace = true
[features] [features]
cpu-pinning = ["aquatic_common/hwloc"] cpu-pinning = ["aquatic_common/cpu-pinning"]
[lib] [lib]
name = "aquatic_udp_load_test" name = "aquatic_udp_load_test"

View file

@ -14,7 +14,7 @@ rust-version.workspace = true
name = "aquatic_ws_load_test" name = "aquatic_ws_load_test"
[dependencies] [dependencies]
aquatic_common = { workspace = true, features = ["glommio"] } aquatic_common.workspace = true
aquatic_toml_config.workspace = true aquatic_toml_config.workspace = true
aquatic_ws_protocol.workspace = true aquatic_ws_protocol.workspace = true

View file

@ -2,8 +2,6 @@ use std::sync::{atomic::Ordering, Arc};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex;
use aquatic_ws_protocol::common::InfoHash; use aquatic_ws_protocol::common::InfoHash;
use glommio::LocalExecutorBuilder; use glommio::LocalExecutorBuilder;
use rand::prelude::*; use rand::prelude::*;
@ -59,19 +57,12 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let tls_config = create_tls_config().unwrap(); let tls_config = create_tls_config().unwrap();
for i in 0..config.num_workers { for _ in 0..config.num_workers {
let config = config.clone(); let config = config.clone();
let tls_config = tls_config.clone(); let tls_config = tls_config.clone();
let state = state.clone(); let state = state.clone();
let placement = get_worker_placement( LocalExecutorBuilder::default()
&config.cpu_pinning,
config.num_workers,
0,
WorkerIndex::SocketWorker(i),
)?;
LocalExecutorBuilder::new(placement)
.name("load-test") .name("load-test")
.spawn(move || async move { .spawn(move || async move {
run_socket_thread(config, tls_config, state).await.unwrap(); run_socket_thread(config, tls_config, state).await.unwrap();
@ -79,10 +70,6 @@ fn run(config: Config) -> ::anyhow::Result<()> {
.unwrap(); .unwrap();
} }
if config.cpu_pinning.active {
set_affinity_for_util_worker(&config.cpu_pinning, config.num_workers, 0)?;
}
monitor_statistics(state, &config); monitor_statistics(state, &config);
Ok(()) Ok(())