diff --git a/Cargo.lock b/Cargo.lock index 1c49d63..c83d439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,6 +160,7 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "cfg-if", + "core_affinity", "crossbeam-channel", "futures-lite", "glommio", @@ -202,6 +203,7 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_udp_protocol", + "core_affinity", "crossbeam-channel", "hashbrown 0.11.2", "mimalloc", @@ -497,6 +499,18 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "core_affinity" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" +dependencies = [ + "kernel32-sys", + "libc", + "num_cpus", + "winapi 0.2.8", +] + [[package]] name = "cpufeatures" version = "0.2.1" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a79294c..43ebe06 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -23,6 +23,7 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" +core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" hex = "0.4" diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 981095e..5e7b6f6 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -23,6 +23,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, + pub core_affinity: CoreAffinityConfig, } impl aquatic_cli_helpers::Config for Config { @@ -103,6 +104,13 @@ pub struct PrivilegeConfig { pub user: String, } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct CoreAffinityConfig { + pub set_affinities: bool, + pub offset: usize, +} + impl Default for Config { fn default() -> Self { Self { @@ -116,6 +124,7 @@ impl Default for Config { cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), + core_affinity: CoreAffinityConfig::default(), } } } @@ -174,3 +183,12 @@ impl Default for PrivilegeConfig { } } } + +impl Default for CoreAffinityConfig { + fn default() -> Self { + Self { + set_affinities: false, + offset: 0, + } + } +} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 0da8ab9..3d60544 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -17,6 +17,12 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> anyhow::Result<()> { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset } + ); + } + let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -26,13 +32,19 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut executors = Vec::new(); - for _ in 0..(config.socket_workers) { + for i in 0..(config.socket_workers) { let config = config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let executor = LocalExecutorBuilder::default().spawn(|| async move { + let mut builder = LocalExecutorBuilder::default(); + + if config.core_affinity.set_affinities { + builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + i); + } + + let executor = builder.spawn(|| async move { network::run_socket_worker( config, request_mesh_builder, @@ -45,12 +57,18 @@ pub fn run(config: Config) -> anyhow::Result<()> { executors.push(executor); } - for _ in 0..(config.request_workers) { + for i in 0..(config.request_workers) { let config = config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let executor = LocalExecutorBuilder::default().spawn(|| async move { + let mut builder = LocalExecutorBuilder::default(); + + if config.core_affinity.set_affinities { + builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); + } + + let executor = builder.spawn(|| async move { handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await }); diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index bf863ee..ad59297 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -24,6 +24,12 @@ use crate::config::Config; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset } + ); + } + let state = State::default(); update_access_list(&config, &state.access_list); @@ -86,6 +92,12 @@ pub fn start_workers( Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset + 1 + i } + ); + } + handlers::run_request_worker(state, config, request_receiver, response_sender) }) .with_context(|| "spawn request worker")?; @@ -101,6 +113,12 @@ pub fn start_workers( Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset + 1 + config.request_workers + i } + ); + } + network::run_socket_worker( state, config, @@ -119,10 +137,18 @@ pub fn start_workers( Builder::new() .name("statistics-collector".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + .spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset } + ); + } - tasks::gather_and_print_statistics(&state, &config); + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::gather_and_print_statistics(&state, &config); + } }) .with_context(|| "spawn statistics worker")?; } diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index e6bc0ea..f437d60 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -13,6 +13,7 @@ name = "aquatic_udp_load_test" anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_udp_protocol = "0.1.0" +core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index 1b69015..d3ec752 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -27,6 +27,7 @@ pub struct Config { pub duration: usize, pub network: NetworkConfig, pub handler: HandlerConfig, + pub core_affinity: CoreAffinityConfig, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -96,6 +97,13 @@ pub struct HandlerConfig { pub additional_request_factor: f64, } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct CoreAffinityConfig { + /// Set core affinities, descending from last core + pub set_affinities: bool, +} + impl Default for Config { fn default() -> Self { Self { @@ -105,6 +113,7 @@ impl Default for Config { duration: 0, network: NetworkConfig::default(), handler: HandlerConfig::default(), + core_affinity: CoreAffinityConfig::default(), } } } @@ -183,3 +192,11 @@ pub struct SocketWorkerLocalStatistics { pub responses_scrape: usize, pub responses_error: usize, } + +impl Default for CoreAffinityConfig { + fn default() -> Self { + Self { + set_affinities: false, + } + } +} diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 1f8d9a4..e1fbe0c 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -33,6 +33,16 @@ pub fn main() { impl aquatic_cli_helpers::Config for Config {} fn run(config: Config) -> ::anyhow::Result<()> { + let affinity_max = core_affinity::get_core_ids() + .map(|ids| ids.iter().map(|id| id.id ).max()) + .flatten().unwrap_or(0); + + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: affinity_max } + ); + } + if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { @@ -92,17 +102,28 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); thread::spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: affinity_max - 1 - i as usize } + ); + } + run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) }); } - for _ in 0..config.num_request_workers { + for i in 0..config.num_request_workers { let config = config.clone(); let state = state.clone(); let request_senders = request_senders.clone(); let response_receiver = response_receiver.clone(); thread::spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: affinity_max - config.num_socket_workers as usize - 1 - i as usize } + ); + } run_handler_thread(&config, state, pareto, request_senders, response_receiver) }); }