From b54694bbc04d4a2a107a1715775712085bb97149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 7 Nov 2021 11:46:48 +0100 Subject: [PATCH] aquatic_ws: improve cpu pinning --- Cargo.lock | 1 - aquatic_ws/Cargo.toml | 1 - aquatic_ws/src/lib/glommio/mod.rs | 25 +++++++++++++++-------- aquatic_ws/src/lib/lib.rs | 15 ++++++++------ aquatic_ws/src/lib/mio/mod.rs | 33 ++++++++++++++++++++++++++++--- 5 files changed, 56 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b66fecb..48a85da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,7 +241,6 @@ dependencies = [ "aquatic_ws_protocol", "async-tungstenite", "cfg-if", - "core_affinity", "crossbeam-channel", "either", "futures", diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 98844d5..b76299c 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -26,7 +26,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" either = "1" hashbrown = { version = "0.11.2", features = ["serde"] } log = "0.4" diff --git a/aquatic_ws/src/lib/glommio/mod.rs b/aquatic_ws/src/lib/glommio/mod.rs index 0c84499..dfa59a9 100644 --- a/aquatic_ws/src/lib/glommio/mod.rs +++ b/aquatic_ws/src/lib/glommio/mod.rs @@ -9,7 +9,10 @@ use std::{ }; use crate::config::Config; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::{ + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, + privileges::drop_privileges_after_socket_binding, +}; use self::common::*; @@ -18,11 +21,11 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_peers = config.socket_workers + config.request_workers; @@ -46,7 +49,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { @@ -73,7 +79,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu( + WorkerIndex::RequestWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 57e10a7..a01f3de 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -1,4 +1,7 @@ -use aquatic_common::access_list::update_access_list; +use aquatic_common::{ + access_list::update_access_list, + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, +}; use cfg_if::cfg_if; use signal_hook::{consts::SIGUSR1, iterator::Signals}; @@ -14,11 +17,11 @@ pub mod mio; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); cfg_if!( if #[cfg(feature = "with-glommio")] { diff --git a/aquatic_ws/src/lib/mio/mod.rs b/aquatic_ws/src/lib/mio/mod.rs index f793823..df791cb 100644 --- a/aquatic_ws/src/lib/mio/mod.rs +++ b/aquatic_ws/src/lib/mio/mod.rs @@ -5,6 +5,7 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use histogram::Histogram; use mio::{Poll, Waker}; use native_tls::{Identity, TlsAcceptor}; @@ -21,6 +22,12 @@ use common::*; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config, state: State) -> anyhow::Result<()> { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + start_workers(config.clone(), state.clone()).expect("couldn't start workers"); // TODO: privdrop here instead @@ -69,6 +76,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); + network::run_socket_worker( config, state, @@ -121,6 +134,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); + handlers::run_request_worker( config, state, @@ -137,10 +156,18 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { Builder::new() .name("statistics".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + .spawn(move || { + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); - print_statistics(&state); + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + print_statistics(&state); + } }) .expect("spawn statistics thread"); }