diff --git a/Cargo.lock b/Cargo.lock index 8b801f9..138db60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,7 @@ dependencies = [ "anyhow", "aquatic_toml_config", "arc-swap", + "duplicate", "glommio", "hashbrown 0.12.0", "hex", @@ -703,6 +704,16 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "duplicate" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a2535fcf1437c9de0d91a3921351c474258abdb6440cd03bb5a7cf0547e7214" +dependencies = [ + "heck", + "proc-macro-error", +] + [[package]] name = "either" version = "1.6.1" @@ -1065,6 +1076,12 @@ dependencies = [ "serde", ] +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1583,6 +1600,30 @@ dependencies = [ "nix", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.36" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 4c12b3a..40b9427 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -21,6 +21,7 @@ aquatic_toml_config = "0.2.0" ahash = "0.7" anyhow = "1" arc-swap = "1" +duplicate = "0.4" hashbrown = "0.12" hex = "0.4" indexmap-amortized = "1" diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index baae0cb..1e7bccc 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,7 +1,9 @@ +//! Experimental CPU pinning + use aquatic_toml_config::TomlConfig; use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum CpuPinningMode { Ascending, @@ -15,7 +17,7 @@ impl Default for CpuPinningMode { } #[cfg(feature = "with-glommio")] -#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub enum HyperThreadMapping { System, @@ -30,33 +32,54 @@ impl Default for HyperThreadMapping { } } -/// Experimental CPU pinning -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -pub struct CpuPinningConfig { - pub active: bool, - pub mode: CpuPinningMode, - #[cfg(feature = "with-glommio")] - pub hyperthread: HyperThreadMapping, - pub core_offset: usize, +pub trait CpuPinningConfig { + fn active(&self) -> bool; + fn mode(&self) -> CpuPinningMode; + fn hyperthread(&self) -> HyperThreadMapping; + fn core_offset(&self) -> usize; } -impl Default for CpuPinningConfig { - fn default() -> Self { - Self { - active: false, - mode: Default::default(), - #[cfg(feature = "with-glommio")] - hyperthread: Default::default(), - core_offset: 0, +// Do these shenanigans for compatibility with aquatic_toml_config +#[duplicate::duplicate_item( + mod_name struct_name direction; + [asc] [CpuPinningConfigAsc] [CpuPinningMode::Ascending]; + [desc] [CpuPinningConfigDesc] [CpuPinningMode::Descending]; +)] +pub mod mod_name { + use super::*; + + #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] + pub struct struct_name { + pub active: bool, + pub mode: CpuPinningMode, + #[cfg(feature = "with-glommio")] + pub hyperthread: HyperThreadMapping, + pub core_offset: usize, + } + + impl Default for struct_name { + fn default() -> Self { + Self { + active: false, + mode: direction, + #[cfg(feature = "with-glommio")] + hyperthread: Default::default(), + core_offset: 0, + } } } -} - -impl CpuPinningConfig { - pub fn default_for_load_test() -> Self { - Self { - mode: CpuPinningMode::Descending, - ..Default::default() + impl CpuPinningConfig for struct_name { + fn active(&self) -> bool { + self.active + } + fn mode(&self) -> CpuPinningMode { + self.mode + } + fn hyperthread(&self) -> HyperThreadMapping { + self.hyperthread + } + fn core_offset(&self) -> usize { + self.core_offset } } } @@ -69,24 +92,24 @@ pub enum WorkerIndex { } impl WorkerIndex { - pub fn get_core_index( + pub fn get_core_index( &self, - config: &CpuPinningConfig, + config: &C, socket_workers: usize, request_workers: usize, num_cores: usize, ) -> usize { let ascending_index = match self { - Self::SocketWorker(index) => config.core_offset + index, - Self::RequestWorker(index) => config.core_offset + socket_workers + index, - Self::Util => config.core_offset + socket_workers + request_workers, + Self::SocketWorker(index) => config.core_offset() + index, + Self::RequestWorker(index) => config.core_offset() + socket_workers + index, + Self::Util => config.core_offset() + socket_workers + request_workers, }; let max_core_index = num_cores - 1; let ascending_index = ascending_index.min(max_core_index); - match config.mode { + match config.mode() { CpuPinningMode::Ascending => ascending_index, CpuPinningMode::Descending => max_core_index - ascending_index, } @@ -124,8 +147,8 @@ pub mod glommio { .join(", ") } - fn get_worker_cpu_set( - config: &CpuPinningConfig, + fn get_worker_cpu_set( + config: &C, socket_workers: usize, request_workers: usize, worker_index: WorkerIndex, @@ -133,9 +156,9 @@ pub mod glommio { let num_cpu_cores = get_num_cpu_cores()?; let core_index = - worker_index.get_core_index(&config, socket_workers, request_workers, num_cpu_cores); + worker_index.get_core_index(config, socket_workers, request_workers, num_cpu_cores); - let too_many_workers = match (&config.hyperthread, &config.mode) { + let too_many_workers = match (&config.hyperthread(), &config.mode()) { ( HyperThreadMapping::Split | HyperThreadMapping::Subsequent, CpuPinningMode::Ascending, @@ -151,16 +174,16 @@ pub mod glommio { return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode")); } - let cpu_set = match config.hyperthread { + let cpu_set = match config.hyperthread() { HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index), - HyperThreadMapping::Split => match config.mode { + HyperThreadMapping::Split => match config.mode() { CpuPinningMode::Ascending => get_cpu_set()? .filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2), CpuPinningMode::Descending => get_cpu_set()? .filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2), }, HyperThreadMapping::Subsequent => { - let cpu_index_offset = match config.mode { + let cpu_index_offset = match config.mode() { // 0 -> 0 and 1 // 1 -> 2 and 3 // 2 -> 4 and 5 @@ -192,15 +215,15 @@ pub mod glommio { } } - pub fn get_worker_placement( - config: &CpuPinningConfig, + pub fn get_worker_placement( + config: &C, socket_workers: usize, request_workers: usize, worker_index: WorkerIndex, ) -> anyhow::Result { - if config.active { + if config.active() { let cpu_set = - get_worker_cpu_set(&config, socket_workers, request_workers, worker_index)?; + get_worker_cpu_set(config, socket_workers, request_workers, worker_index)?; Ok(Placement::Fenced(cpu_set)) } else { @@ -208,13 +231,13 @@ pub mod glommio { } } - pub fn set_affinity_for_util_worker( - config: &CpuPinningConfig, + pub fn set_affinity_for_util_worker( + config: &C, socket_workers: usize, request_workers: usize, ) -> anyhow::Result<()> { let worker_cpu_set = - get_worker_cpu_set(&config, socket_workers, request_workers, WorkerIndex::Util)?; + get_worker_cpu_set(config, socket_workers, request_workers, WorkerIndex::Util)?; unsafe { let mut set: libc::cpu_set_t = ::std::mem::zeroed(); @@ -244,15 +267,15 @@ pub mod glommio { /// /// Requires hwloc (`apt-get install libhwloc-dev`) #[cfg(feature = "with-hwloc")] -pub fn pin_current_if_configured_to( - config: &CpuPinningConfig, +pub fn pin_current_if_configured_to( + config: &C, socket_workers: usize, request_workers: usize, worker_index: WorkerIndex, ) { use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD}; - if config.active { + if config.active() { let mut topology = Topology::new(); let core_cpu_sets: Vec = topology diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index f122c12..c94cf1e 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig, cpu_pinning::asc::CpuPinningConfigAsc}; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -23,7 +23,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, - pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, + pub cpu_pinning: CpuPinningConfigAsc, } impl Default for Config { diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index be0e24d..994913d 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -20,7 +21,7 @@ pub struct Config { pub connection_creation_interval_ms: u64, pub duration: usize, pub torrents: TorrentConfig, - pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, + pub cpu_pinning: CpuPinningConfigDesc, } impl aquatic_cli_helpers::Config for Config { @@ -57,7 +58,7 @@ impl Default for Config { connection_creation_interval_ms: 10, duration: 0, torrents: TorrentConfig::default(), - cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig::default_for_load_test(), + cpu_pinning: Default::default(), } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 6e23599..01a0917 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -35,7 +35,7 @@ pub struct Config { pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, #[cfg(feature = "cpu-pinning")] - pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, + pub cpu_pinning: aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc, } impl Default for Config { diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index bb99dbe..e803eb8 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -4,7 +4,7 @@ use serde::Deserialize; use aquatic_cli_helpers::LogLevel; #[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; /// aquatic_udp_load_test configuration @@ -24,7 +24,7 @@ pub struct Config { pub network: NetworkConfig, pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] - pub cpu_pinning: CpuPinningConfig, + pub cpu_pinning: CpuPinningConfigDesc, } impl Default for Config { @@ -37,7 +37,7 @@ impl Default for Config { network: NetworkConfig::default(), requests: RequestConfig::default(), #[cfg(feature = "cpu-pinning")] - cpu_pinning: CpuPinningConfig::default_for_load_test(), + cpu_pinning: Default::default(), } } } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index fdff2b9..b1ea961 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::path::PathBuf; -use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::Deserialize; @@ -25,7 +25,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, - pub cpu_pinning: CpuPinningConfig, + pub cpu_pinning: CpuPinningConfigAsc, } impl Default for Config { diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 0fa311b..9949c65 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use aquatic_cli_helpers::LogLevel; -use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -16,7 +16,7 @@ pub struct Config { pub connection_creation_interval_ms: u64, pub duration: usize, pub torrents: TorrentConfig, - pub cpu_pinning: CpuPinningConfig, + pub cpu_pinning: CpuPinningConfigDesc, } impl aquatic_cli_helpers::Config for Config { @@ -35,7 +35,7 @@ impl Default for Config { connection_creation_interval_ms: 10, duration: 0, torrents: TorrentConfig::default(), - cpu_pinning: CpuPinningConfig::default_for_load_test(), + cpu_pinning: Default::default(), } } }