diff --git a/Cargo.lock b/Cargo.lock index 1c49d63..33ba34b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,6 +160,7 @@ dependencies = [ "aquatic_common", "aquatic_udp_protocol", "cfg-if", + "core_affinity", "crossbeam-channel", "futures-lite", "glommio", @@ -497,6 +498,18 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "core_affinity" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" +dependencies = [ + "kernel32-sys", + "libc", + "num_cpus", + "winapi 0.2.8", +] + [[package]] name = "cpufeatures" version = "0.2.1" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a79294c..43ebe06 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -23,6 +23,7 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" +core_affinity = "0.5" crossbeam-channel = "0.5" hashbrown = "0.11.2" hex = "0.4" diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 981095e..5e7b6f6 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -23,6 +23,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, + pub core_affinity: CoreAffinityConfig, } impl aquatic_cli_helpers::Config for Config { @@ -103,6 +104,13 @@ pub struct PrivilegeConfig { pub user: String, } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct CoreAffinityConfig { + pub set_affinities: bool, + pub offset: usize, +} + impl Default for Config { fn default() -> Self { Self { @@ -116,6 +124,7 @@ impl Default for Config { cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), + core_affinity: CoreAffinityConfig::default(), } } } @@ -174,3 +183,12 @@ impl Default for PrivilegeConfig { } } } + +impl Default for CoreAffinityConfig { + fn default() -> Self { + Self { + set_affinities: false, + offset: 0, + } + } +} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 0da8ab9..a011d8e 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -17,6 +17,12 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> anyhow::Result<()> { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset } + ); + } + let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -26,13 +32,19 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut executors = Vec::new(); - for _ in 0..(config.socket_workers) { + for i in 0..(config.socket_workers) { let config = config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let executor = LocalExecutorBuilder::default().spawn(|| async move { + let mut builder = LocalExecutorBuilder::default(); + + if config.core_affinity.set_affinities { + builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + i); + } + + let executor = builder.spawn(|| async move { network::run_socket_worker( config, request_mesh_builder, @@ -50,7 +62,13 @@ pub fn run(config: Config) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let executor = LocalExecutorBuilder::default().spawn(|| async move { + let mut builder = LocalExecutorBuilder::default(); + + if config.core_affinity.set_affinities { + builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); + } + + let executor = builder.spawn(|| async move { handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await }); diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index bf863ee..ad59297 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -24,6 +24,12 @@ use crate::config::Config; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset } + ); + } + let state = State::default(); update_access_list(&config, &state.access_list); @@ -86,6 +92,12 @@ pub fn start_workers( Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset + 1 + i } + ); + } + handlers::run_request_worker(state, config, request_receiver, response_sender) }) .with_context(|| "spawn request worker")?; @@ -101,6 +113,12 @@ pub fn start_workers( Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset + 1 + config.request_workers + i } + ); + } + network::run_socket_worker( state, config, @@ -119,10 +137,18 @@ pub fn start_workers( Builder::new() .name("statistics-collector".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + .spawn(move || { + if config.core_affinity.set_affinities { + core_affinity::set_for_current( + core_affinity::CoreId { id: config.core_affinity.offset } + ); + } - tasks::gather_and_print_statistics(&state, &config); + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::gather_and_print_statistics(&state, &config); + } }) .with_context(|| "spawn statistics worker")?; }