Rewrite CpuPinningConfig implementation to support aquatic_toml_config

This commit is contained in:
Joakim Frostegård 2022-04-04 22:38:47 +02:00
parent e2ee050233
commit 6c149331dc
9 changed files with 127 additions and 61 deletions

41
Cargo.lock generated
View file

@ -75,6 +75,7 @@ dependencies = [
"anyhow", "anyhow",
"aquatic_toml_config", "aquatic_toml_config",
"arc-swap", "arc-swap",
"duplicate",
"glommio", "glommio",
"hashbrown 0.12.0", "hashbrown 0.12.0",
"hex", "hex",
@ -703,6 +704,16 @@ dependencies = [
"crypto-common", "crypto-common",
] ]
[[package]]
name = "duplicate"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a2535fcf1437c9de0d91a3921351c474258abdb6440cd03bb5a7cf0547e7214"
dependencies = [
"heck",
"proc-macro-error",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -1065,6 +1076,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -1583,6 +1600,30 @@ dependencies = [
"nix", "nix",
] ]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.36" version = "1.0.36"

View file

@ -21,6 +21,7 @@ aquatic_toml_config = "0.2.0"
ahash = "0.7" ahash = "0.7"
anyhow = "1" anyhow = "1"
arc-swap = "1" arc-swap = "1"
duplicate = "0.4"
hashbrown = "0.12" hashbrown = "0.12"
hex = "0.4" hex = "0.4"
indexmap-amortized = "1" indexmap-amortized = "1"

View file

@ -1,7 +1,9 @@
//! Experimental CPU pinning
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum CpuPinningMode { pub enum CpuPinningMode {
Ascending, Ascending,
@ -15,7 +17,7 @@ impl Default for CpuPinningMode {
} }
#[cfg(feature = "with-glommio")] #[cfg(feature = "with-glommio")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")] #[serde(rename_all = "kebab-case")]
pub enum HyperThreadMapping { pub enum HyperThreadMapping {
System, System,
@ -30,33 +32,54 @@ impl Default for HyperThreadMapping {
} }
} }
/// Experimental CPU pinning pub trait CpuPinningConfig {
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] fn active(&self) -> bool;
pub struct CpuPinningConfig { fn mode(&self) -> CpuPinningMode;
pub active: bool, fn hyperthread(&self) -> HyperThreadMapping;
pub mode: CpuPinningMode, fn core_offset(&self) -> usize;
#[cfg(feature = "with-glommio")]
pub hyperthread: HyperThreadMapping,
pub core_offset: usize,
} }
impl Default for CpuPinningConfig { // Do these shenanigans for compatibility with aquatic_toml_config
fn default() -> Self { #[duplicate::duplicate_item(
Self { mod_name struct_name direction;
active: false, [asc] [CpuPinningConfigAsc] [CpuPinningMode::Ascending];
mode: Default::default(), [desc] [CpuPinningConfigDesc] [CpuPinningMode::Descending];
#[cfg(feature = "with-glommio")] )]
hyperthread: Default::default(), pub mod mod_name {
core_offset: 0, use super::*;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
pub struct struct_name {
pub active: bool,
pub mode: CpuPinningMode,
#[cfg(feature = "with-glommio")]
pub hyperthread: HyperThreadMapping,
pub core_offset: usize,
}
impl Default for struct_name {
fn default() -> Self {
Self {
active: false,
mode: direction,
#[cfg(feature = "with-glommio")]
hyperthread: Default::default(),
core_offset: 0,
}
} }
} }
} impl CpuPinningConfig for struct_name {
fn active(&self) -> bool {
impl CpuPinningConfig { self.active
pub fn default_for_load_test() -> Self { }
Self { fn mode(&self) -> CpuPinningMode {
mode: CpuPinningMode::Descending, self.mode
..Default::default() }
fn hyperthread(&self) -> HyperThreadMapping {
self.hyperthread
}
fn core_offset(&self) -> usize {
self.core_offset
} }
} }
} }
@ -69,24 +92,24 @@ pub enum WorkerIndex {
} }
impl WorkerIndex { impl WorkerIndex {
pub fn get_core_index( pub fn get_core_index<C: CpuPinningConfig>(
&self, &self,
config: &CpuPinningConfig, config: &C,
socket_workers: usize, socket_workers: usize,
request_workers: usize, request_workers: usize,
num_cores: usize, num_cores: usize,
) -> usize { ) -> usize {
let ascending_index = match self { let ascending_index = match self {
Self::SocketWorker(index) => config.core_offset + index, Self::SocketWorker(index) => config.core_offset() + index,
Self::RequestWorker(index) => config.core_offset + socket_workers + index, Self::RequestWorker(index) => config.core_offset() + socket_workers + index,
Self::Util => config.core_offset + socket_workers + request_workers, Self::Util => config.core_offset() + socket_workers + request_workers,
}; };
let max_core_index = num_cores - 1; let max_core_index = num_cores - 1;
let ascending_index = ascending_index.min(max_core_index); let ascending_index = ascending_index.min(max_core_index);
match config.mode { match config.mode() {
CpuPinningMode::Ascending => ascending_index, CpuPinningMode::Ascending => ascending_index,
CpuPinningMode::Descending => max_core_index - ascending_index, CpuPinningMode::Descending => max_core_index - ascending_index,
} }
@ -124,8 +147,8 @@ pub mod glommio {
.join(", ") .join(", ")
} }
fn get_worker_cpu_set( fn get_worker_cpu_set<C: CpuPinningConfig>(
config: &CpuPinningConfig, config: &C,
socket_workers: usize, socket_workers: usize,
request_workers: usize, request_workers: usize,
worker_index: WorkerIndex, worker_index: WorkerIndex,
@ -133,9 +156,9 @@ pub mod glommio {
let num_cpu_cores = get_num_cpu_cores()?; let num_cpu_cores = get_num_cpu_cores()?;
let core_index = let core_index =
worker_index.get_core_index(&config, socket_workers, request_workers, num_cpu_cores); worker_index.get_core_index(config, socket_workers, request_workers, num_cpu_cores);
let too_many_workers = match (&config.hyperthread, &config.mode) { let too_many_workers = match (&config.hyperthread(), &config.mode()) {
( (
HyperThreadMapping::Split | HyperThreadMapping::Subsequent, HyperThreadMapping::Split | HyperThreadMapping::Subsequent,
CpuPinningMode::Ascending, CpuPinningMode::Ascending,
@ -151,16 +174,16 @@ pub mod glommio {
return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode")); return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode"));
} }
let cpu_set = match config.hyperthread { let cpu_set = match config.hyperthread() {
HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index), HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index),
HyperThreadMapping::Split => match config.mode { HyperThreadMapping::Split => match config.mode() {
CpuPinningMode::Ascending => get_cpu_set()? CpuPinningMode::Ascending => get_cpu_set()?
.filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2), .filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2),
CpuPinningMode::Descending => get_cpu_set()? CpuPinningMode::Descending => get_cpu_set()?
.filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2), .filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2),
}, },
HyperThreadMapping::Subsequent => { HyperThreadMapping::Subsequent => {
let cpu_index_offset = match config.mode { let cpu_index_offset = match config.mode() {
// 0 -> 0 and 1 // 0 -> 0 and 1
// 1 -> 2 and 3 // 1 -> 2 and 3
// 2 -> 4 and 5 // 2 -> 4 and 5
@ -192,15 +215,15 @@ pub mod glommio {
} }
} }
pub fn get_worker_placement( pub fn get_worker_placement<C: CpuPinningConfig>(
config: &CpuPinningConfig, config: &C,
socket_workers: usize, socket_workers: usize,
request_workers: usize, request_workers: usize,
worker_index: WorkerIndex, worker_index: WorkerIndex,
) -> anyhow::Result<Placement> { ) -> anyhow::Result<Placement> {
if config.active { if config.active() {
let cpu_set = let cpu_set =
get_worker_cpu_set(&config, socket_workers, request_workers, worker_index)?; get_worker_cpu_set(config, socket_workers, request_workers, worker_index)?;
Ok(Placement::Fenced(cpu_set)) Ok(Placement::Fenced(cpu_set))
} else { } else {
@ -208,13 +231,13 @@ pub mod glommio {
} }
} }
pub fn set_affinity_for_util_worker( pub fn set_affinity_for_util_worker<C: CpuPinningConfig>(
config: &CpuPinningConfig, config: &C,
socket_workers: usize, socket_workers: usize,
request_workers: usize, request_workers: usize,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let worker_cpu_set = let worker_cpu_set =
get_worker_cpu_set(&config, socket_workers, request_workers, WorkerIndex::Util)?; get_worker_cpu_set(config, socket_workers, request_workers, WorkerIndex::Util)?;
unsafe { unsafe {
let mut set: libc::cpu_set_t = ::std::mem::zeroed(); let mut set: libc::cpu_set_t = ::std::mem::zeroed();
@ -244,15 +267,15 @@ pub mod glommio {
/// ///
/// Requires hwloc (`apt-get install libhwloc-dev`) /// Requires hwloc (`apt-get install libhwloc-dev`)
#[cfg(feature = "with-hwloc")] #[cfg(feature = "with-hwloc")]
pub fn pin_current_if_configured_to( pub fn pin_current_if_configured_to<C: CpuPinningConfig>(
config: &CpuPinningConfig, config: &C,
socket_workers: usize, socket_workers: usize,
request_workers: usize, request_workers: usize,
worker_index: WorkerIndex, worker_index: WorkerIndex,
) { ) {
use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD}; use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD};
if config.active { if config.active() {
let mut topology = Topology::new(); let mut topology = Topology::new();
let core_cpu_sets: Vec<CpuSet> = topology let core_cpu_sets: Vec<CpuSet> = topology

View file

@ -1,6 +1,6 @@
use std::{net::SocketAddr, path::PathBuf}; use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig, cpu_pinning::asc::CpuPinningConfigAsc};
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use serde::Deserialize; use serde::Deserialize;
@ -23,7 +23,7 @@ pub struct Config {
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig, pub access_list: AccessListConfig,
pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, pub cpu_pinning: CpuPinningConfigAsc,
} }
impl Default for Config { impl Default for Config {

View file

@ -1,6 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use aquatic_cli_helpers::LogLevel; use aquatic_cli_helpers::LogLevel;
use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use serde::Deserialize; use serde::Deserialize;
@ -20,7 +21,7 @@ pub struct Config {
pub connection_creation_interval_ms: u64, pub connection_creation_interval_ms: u64,
pub duration: usize, pub duration: usize,
pub torrents: TorrentConfig, pub torrents: TorrentConfig,
pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, pub cpu_pinning: CpuPinningConfigDesc,
} }
impl aquatic_cli_helpers::Config for Config { impl aquatic_cli_helpers::Config for Config {
@ -57,7 +58,7 @@ impl Default for Config {
connection_creation_interval_ms: 10, connection_creation_interval_ms: 10,
duration: 0, duration: 0,
torrents: TorrentConfig::default(), torrents: TorrentConfig::default(),
cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig::default_for_load_test(), cpu_pinning: Default::default(),
} }
} }
} }

View file

@ -35,7 +35,7 @@ pub struct Config {
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig, pub access_list: AccessListConfig,
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, pub cpu_pinning: aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc,
} }
impl Default for Config { impl Default for Config {

View file

@ -4,7 +4,7 @@ use serde::Deserialize;
use aquatic_cli_helpers::LogLevel; use aquatic_cli_helpers::LogLevel;
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::CpuPinningConfig; use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
/// aquatic_udp_load_test configuration /// aquatic_udp_load_test configuration
@ -24,7 +24,7 @@ pub struct Config {
pub network: NetworkConfig, pub network: NetworkConfig,
pub requests: RequestConfig, pub requests: RequestConfig,
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
pub cpu_pinning: CpuPinningConfig, pub cpu_pinning: CpuPinningConfigDesc,
} }
impl Default for Config { impl Default for Config {
@ -37,7 +37,7 @@ impl Default for Config {
network: NetworkConfig::default(), network: NetworkConfig::default(),
requests: RequestConfig::default(), requests: RequestConfig::default(),
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
cpu_pinning: CpuPinningConfig::default_for_load_test(), cpu_pinning: Default::default(),
} }
} }
} }

View file

@ -1,7 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use aquatic_common::cpu_pinning::CpuPinningConfig; use aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::Deserialize; use serde::Deserialize;
@ -25,7 +25,7 @@ pub struct Config {
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig, pub access_list: AccessListConfig,
pub cpu_pinning: CpuPinningConfig, pub cpu_pinning: CpuPinningConfigAsc,
} }
impl Default for Config { impl Default for Config {

View file

@ -1,7 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use aquatic_cli_helpers::LogLevel; use aquatic_cli_helpers::LogLevel;
use aquatic_common::cpu_pinning::CpuPinningConfig; use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use serde::Deserialize; use serde::Deserialize;
@ -16,7 +16,7 @@ pub struct Config {
pub connection_creation_interval_ms: u64, pub connection_creation_interval_ms: u64,
pub duration: usize, pub duration: usize,
pub torrents: TorrentConfig, pub torrents: TorrentConfig,
pub cpu_pinning: CpuPinningConfig, pub cpu_pinning: CpuPinningConfigDesc,
} }
impl aquatic_cli_helpers::Config for Config { impl aquatic_cli_helpers::Config for Config {
@ -35,7 +35,7 @@ impl Default for Config {
connection_creation_interval_ms: 10, connection_creation_interval_ms: 10,
duration: 0, duration: 0,
torrents: TorrentConfig::default(), torrents: TorrentConfig::default(),
cpu_pinning: CpuPinningConfig::default_for_load_test(), cpu_pinning: Default::default(),
} }
} }
} }