diff --git a/Cargo.lock b/Cargo.lock index 33ba34b..c83d439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,6 +203,7 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_udp_protocol", + "core_affinity", "crossbeam-channel", "hashbrown 0.11.2", "mimalloc", diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index e6bc0ea..f437d60 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -13,6 +13,7 @@ name = "aquatic_udp_load_test" anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_udp_protocol = "0.1.0" +core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index 1b69015..d3ec752 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -27,6 +27,7 @@ pub struct Config { pub duration: usize, pub network: NetworkConfig, pub handler: HandlerConfig, + pub core_affinity: CoreAffinityConfig, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -96,6 +97,13 @@ pub struct HandlerConfig { pub additional_request_factor: f64, } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct CoreAffinityConfig { + /// Set core affinities, descending from last core + pub set_affinities: bool, +} + impl Default for Config { fn default() -> Self { Self { @@ -105,6 +113,7 @@ impl Default for Config { duration: 0, network: NetworkConfig::default(), handler: HandlerConfig::default(), + core_affinity: CoreAffinityConfig::default(), } } } @@ -183,3 +192,11 @@ pub struct SocketWorkerLocalStatistics { pub responses_scrape: usize, pub responses_error: usize, } + +impl Default for CoreAffinityConfig { + fn default() -> Self { + Self { + set_affinities: false, + } + } +} diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 1f8d9a4..e1fbe0c 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -33,6 +33,16 @@ pub fn main() { impl aquatic_cli_helpers::Config for Config {} fn run(config: Config) -> ::anyhow::Result<()> { + let affinity_max = core_affinity::get_core_ids() + .map(|ids| ids.iter().map(|id| id.id ).max()) + .flatten().unwrap_or(0); + + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: affinity_max } + ); + } + if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { @@ -92,17 +102,28 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); thread::spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: affinity_max - 1 - i as usize } + ); + } + run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) }); } - for _ in 0..config.num_request_workers { + for i in 0..config.num_request_workers { let config = config.clone(); let state = state.clone(); let request_senders = request_senders.clone(); let response_receiver = response_receiver.clone(); thread::spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: affinity_max - config.num_socket_workers as usize - 1 - i as usize } + ); + } run_handler_thread(&config, state, pareto, request_senders, response_receiver) }); }