diff --git a/Cargo.lock b/Cargo.lock index f968137..c1aa85a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "ahash" version = "0.3.8" @@ -74,10 +86,10 @@ dependencies = [ name = "aquatic_common" version = "0.1.0" dependencies = [ + "affinity", "ahash 0.7.6", "anyhow", "arc-swap", - "core_affinity", "hashbrown 0.11.2", "hex", "indexmap-amortized", @@ -534,18 +546,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" -[[package]] -name = "core_affinity" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" -dependencies = [ - "kernel32-sys", - "libc", - "num_cpus", - "winapi 0.2.8", -] - [[package]] name = "cpufeatures" version = "0.2.1" @@ -718,6 +718,27 @@ dependencies = [ "regex", ] +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "failure" version = "0.1.8" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 083047d..b10a3f2 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -14,7 +14,7 @@ name = "aquatic_common" ahash = "0.7" anyhow = "1" arc-swap = "1" -core_affinity = "0.5" +affinity = "0.1" hashbrown = "0.11.2" hex = "0.4" indexmap-amortized = "1" diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index a7d508f..ddfa833 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -17,8 +17,8 @@ impl Default for CpuPinningMode { pub struct CpuPinningConfig { pub active: bool, pub mode: CpuPinningMode, - pub offset: usize, - pub multiple: usize, + pub virtual_per_physical_cpu: usize, + pub offset_cpus: usize, } impl Default for CpuPinningConfig { @@ -26,8 +26,8 @@ impl Default for CpuPinningConfig { Self { active: false, mode: Default::default(), - offset: 0, - multiple: 1, + virtual_per_physical_cpu: 2, + offset_cpus: 0, } } } @@ -49,41 +49,58 @@ pub enum WorkerIndex { } impl WorkerIndex { - pub fn get_cpu_index(self, config: &CpuPinningConfig, socket_workers: usize) -> usize { - let index = match self { - Self::Other => config.offset, - Self::SocketWorker(index) => config.multiple * (config.offset + 1 + index), + fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec { + let offset = match self { + Self::Other => config.virtual_per_physical_cpu * config.offset_cpus, + Self::SocketWorker(index) => { + config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index) + } Self::RequestWorker(index) => { - config.multiple * (config.offset + 1 + socket_workers + index) + config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index) } }; - let index = match config.mode { - CpuPinningMode::Ascending => index, + let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i); + + let virtual_cpus: Vec = match config.mode { + CpuPinningMode::Ascending => virtual_cpus.collect(), CpuPinningMode::Descending => { - let max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id).max()) - .flatten() - .unwrap_or(0); + let max_index = affinity::get_core_num() - 1; - max - index + virtual_cpus + .map(|i| max_index.checked_sub(i).unwrap_or(0)) + .collect() } }; - ::log::info!("Calculated CPU pin index {} for {:?}", index, self); + ::log::info!( + "Calculated virtual CPU pin indices {:?} for {:?}", + virtual_cpus, + self + ); - index + virtual_cpus } } +/// Note: don't call this when affinities were already set in the current or in +/// a parent thread. Doing so limits the number of cores that are seen and +/// messes up setting affinities. pub fn pin_current_if_configured_to( config: &CpuPinningConfig, socket_workers: usize, worker_index: WorkerIndex, ) { if config.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: worker_index.get_cpu_index(config, socket_workers), - }); + let indices = worker_index.get_cpu_indices(config, socket_workers); + + if let Err(err) = affinity::set_thread_affinity(indices.clone()) { + ::log::error!( + "Failed setting thread affinities {:?} for {:?}: {:#?}", + indices, + worker_index, + err + ); + } } } diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 9172caa..873ed83 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -25,12 +25,6 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -44,6 +38,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -57,12 +57,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -82,16 +76,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), ); - } - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -112,16 +103,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::RequestWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), ); - } - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -136,6 +124,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 8792603..0dd5b25 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -37,12 +37,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - pin_current_if_configured_to( - &config.cpu_pinning, - config.num_workers as usize, - WorkerIndex::Other, - ); - let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -68,21 +62,25 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = tls_config.clone(); let state = state.clone(); - let mut builder = LocalExecutorBuilder::default(); + LocalExecutorBuilder::default() + .spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), - ); - } - - builder - .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) .unwrap(); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + monitor_statistics(state, &config); Ok(()) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 28e7842..058f5e9 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -19,12 +19,6 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -38,6 +32,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -51,12 +51,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -73,16 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), ); - } - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -102,16 +93,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::RequestWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), ); - } - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -126,6 +114,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 1077878..16ac519 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -21,12 +21,6 @@ pub mod tasks; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -40,6 +34,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ::std::thread::spawn(move || run_inner(config, state)); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { @@ -53,12 +53,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let (request_sender, request_receiver) = unbounded(); @@ -141,6 +135,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + loop { ::std::thread::sleep(Duration::from_secs( config.cleaning.torrent_cleaning_interval, diff --git a/aquatic_ws/src/lib/glommio/mod.rs b/aquatic_ws/src/lib/glommio/mod.rs index dfa59a9..17c4e2b 100644 --- a/aquatic_ws/src/lib/glommio/mod.rs +++ b/aquatic_ws/src/lib/glommio/mod.rs @@ -21,12 +21,6 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -46,16 +40,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), ); - } - let executor = builder.spawn(|| async move { network::run_socket_worker( config, state, @@ -76,16 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let mut builder = LocalExecutorBuilder::default(); - - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::RequestWorker(i) - .get_cpu_index(&config.cpu_pinning, config.socket_workers), + let executor = LocalExecutorBuilder::default().spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), ); - } - let executor = builder.spawn(|| async move { handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) .await }); @@ -100,6 +88,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { ) .unwrap(); + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index a01f3de..b9542b4 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -17,12 +17,6 @@ pub mod mio; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - cfg_if!( if #[cfg(feature = "with-glommio")] { let state = glommio::common::State::default(); @@ -48,6 +42,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + for signal in &mut signals { match signal { SIGUSR1 => { diff --git a/aquatic_ws/src/lib/mio/mod.rs b/aquatic_ws/src/lib/mio/mod.rs index df791cb..0becdd8 100644 --- a/aquatic_ws/src/lib/mio/mod.rs +++ b/aquatic_ws/src/lib/mio/mod.rs @@ -22,16 +22,16 @@ use common::*; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config, state: State) -> anyhow::Result<()> { + start_workers(config.clone(), state.clone()).expect("couldn't start workers"); + + // TODO: privdrop here instead + pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, WorkerIndex::Other, ); - start_workers(config.clone(), state.clone()).expect("couldn't start workers"); - - // TODO: privdrop here instead - loop { ::std::thread::sleep(Duration::from_secs( config.cleaning.torrent_cleaning_interval, diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 1a8c342..05ed1a0 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -34,12 +34,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - pin_current_if_configured_to( - &config.cpu_pinning, - config.num_workers as usize, - WorkerIndex::Other, - ); - let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -63,21 +57,25 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = tls_config.clone(); let state = state.clone(); - let mut builder = LocalExecutorBuilder::default(); + LocalExecutorBuilder::default() + .spawn(move || async move { + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers, + WorkerIndex::SocketWorker(i), + ); - if config.cpu_pinning.active { - builder = builder.pin_to_cpu( - WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), - ); - } - - builder - .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) .unwrap(); } + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + monitor_statistics(state, &config); Ok(())