cpu pinning: set affinity to multiple hyperthreads, fix issues

This commit is contained in:
Joakim Frostegård 2021-11-07 13:31:03 +01:00
parent 03192d2afb
commit e86410291a
11 changed files with 181 additions and 165 deletions

47
Cargo.lock generated
View file

@ -17,6 +17,18 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "affinity"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b"
dependencies = [
"cfg-if",
"errno",
"libc",
"num_cpus",
]
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.3.8" version = "0.3.8"
@ -74,10 +86,10 @@ dependencies = [
name = "aquatic_common" name = "aquatic_common"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"affinity",
"ahash 0.7.6", "ahash 0.7.6",
"anyhow", "anyhow",
"arc-swap", "arc-swap",
"core_affinity",
"hashbrown 0.11.2", "hashbrown 0.11.2",
"hex", "hex",
"indexmap-amortized", "indexmap-amortized",
@ -534,18 +546,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "core_affinity"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f"
dependencies = [
"kernel32-sys",
"libc",
"num_cpus",
"winapi 0.2.8",
]
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.1" version = "0.2.1"
@ -718,6 +718,27 @@ dependencies = [
"regex", "regex",
] ]
[[package]]
name = "errno"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1"
dependencies = [
"errno-dragonfly",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "errno-dragonfly"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
dependencies = [
"cc",
"libc",
]
[[package]] [[package]]
name = "failure" name = "failure"
version = "0.1.8" version = "0.1.8"

View file

@ -14,7 +14,7 @@ name = "aquatic_common"
ahash = "0.7" ahash = "0.7"
anyhow = "1" anyhow = "1"
arc-swap = "1" arc-swap = "1"
core_affinity = "0.5" affinity = "0.1"
hashbrown = "0.11.2" hashbrown = "0.11.2"
hex = "0.4" hex = "0.4"
indexmap-amortized = "1" indexmap-amortized = "1"

View file

@ -17,8 +17,8 @@ impl Default for CpuPinningMode {
pub struct CpuPinningConfig { pub struct CpuPinningConfig {
pub active: bool, pub active: bool,
pub mode: CpuPinningMode, pub mode: CpuPinningMode,
pub offset: usize, pub virtual_per_physical_cpu: usize,
pub multiple: usize, pub offset_cpus: usize,
} }
impl Default for CpuPinningConfig { impl Default for CpuPinningConfig {
@ -26,8 +26,8 @@ impl Default for CpuPinningConfig {
Self { Self {
active: false, active: false,
mode: Default::default(), mode: Default::default(),
offset: 0, virtual_per_physical_cpu: 2,
multiple: 1, offset_cpus: 0,
} }
} }
} }
@ -49,41 +49,58 @@ pub enum WorkerIndex {
} }
impl WorkerIndex { impl WorkerIndex {
pub fn get_cpu_index(self, config: &CpuPinningConfig, socket_workers: usize) -> usize { fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec<usize> {
let index = match self { let offset = match self {
Self::Other => config.offset, Self::Other => config.virtual_per_physical_cpu * config.offset_cpus,
Self::SocketWorker(index) => config.multiple * (config.offset + 1 + index), Self::SocketWorker(index) => {
config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index)
}
Self::RequestWorker(index) => { Self::RequestWorker(index) => {
config.multiple * (config.offset + 1 + socket_workers + index) config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index)
} }
}; };
let index = match config.mode { let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i);
CpuPinningMode::Ascending => index,
let virtual_cpus: Vec<usize> = match config.mode {
CpuPinningMode::Ascending => virtual_cpus.collect(),
CpuPinningMode::Descending => { CpuPinningMode::Descending => {
let max = core_affinity::get_core_ids() let max_index = affinity::get_core_num() - 1;
.map(|ids| ids.iter().map(|id| id.id).max())
.flatten()
.unwrap_or(0);
max - index virtual_cpus
.map(|i| max_index.checked_sub(i).unwrap_or(0))
.collect()
} }
}; };
::log::info!("Calculated CPU pin index {} for {:?}", index, self); ::log::info!(
"Calculated virtual CPU pin indices {:?} for {:?}",
virtual_cpus,
self
);
index virtual_cpus
} }
} }
/// Note: don't call this when affinities were already set in the current or in
/// a parent thread. Doing so limits the number of cores that are seen and
/// messes up setting affinities.
pub fn pin_current_if_configured_to( pub fn pin_current_if_configured_to(
config: &CpuPinningConfig, config: &CpuPinningConfig,
socket_workers: usize, socket_workers: usize,
worker_index: WorkerIndex, worker_index: WorkerIndex,
) { ) {
if config.active { if config.active {
core_affinity::set_for_current(core_affinity::CoreId { let indices = worker_index.get_cpu_indices(config, socket_workers);
id: worker_index.get_cpu_index(config, socket_workers),
}); if let Err(err) = affinity::set_thread_affinity(indices.clone()) {
::log::error!(
"Failed setting thread affinities {:?} for {:?}: {:#?}",
indices,
worker_index,
err
);
}
} }
} }

View file

@ -25,12 +25,6 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
const SHARED_CHANNEL_SIZE: usize = 1024; const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let state = State::default(); let state = State::default();
update_access_list(&config.access_list, &state.access_list)?; update_access_list(&config.access_list, &state.access_list)?;
@ -44,6 +38,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
::std::thread::spawn(move || run_inner(config, state)); ::std::thread::spawn(move || run_inner(config, state));
} }
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals { for signal in &mut signals {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {
@ -57,12 +57,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
} }
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let num_peers = config.socket_workers + config.request_workers; let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@ -82,16 +76,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone(); let num_bound_sockets = num_bound_sockets.clone();
let mut builder = LocalExecutorBuilder::default(); let executor = LocalExecutorBuilder::default().spawn(move || async move {
pin_current_if_configured_to(
if config.cpu_pinning.active { &config.cpu_pinning,
builder = builder.pin_to_cpu( config.socket_workers,
WorkerIndex::SocketWorker(i) WorkerIndex::SocketWorker(i),
.get_cpu_index(&config.cpu_pinning, config.socket_workers),
); );
}
let executor = builder.spawn(|| async move {
network::run_socket_worker( network::run_socket_worker(
config, config,
state, state,
@ -112,16 +103,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let request_mesh_builder = request_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let mut builder = LocalExecutorBuilder::default(); let executor = LocalExecutorBuilder::default().spawn(move || async move {
pin_current_if_configured_to(
if config.cpu_pinning.active { &config.cpu_pinning,
builder = builder.pin_to_cpu( config.socket_workers,
WorkerIndex::RequestWorker(i) WorkerIndex::RequestWorker(i),
.get_cpu_index(&config.cpu_pinning, config.socket_workers),
); );
}
let executor = builder.spawn(|| async move {
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await .await
}); });
@ -136,6 +124,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
) )
.unwrap(); .unwrap();
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for executor in executors { for executor in executors {
executor executor
.expect("failed to spawn local executor") .expect("failed to spawn local executor")

View file

@ -37,12 +37,6 @@ fn run(config: Config) -> ::anyhow::Result<()> {
println!("Starting client with config: {:#?}", config); println!("Starting client with config: {:#?}", config);
pin_current_if_configured_to(
&config.cpu_pinning,
config.num_workers as usize,
WorkerIndex::Other,
);
let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents);
let mut rng = SmallRng::from_entropy(); let mut rng = SmallRng::from_entropy();
@ -68,21 +62,25 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let tls_config = tls_config.clone(); let tls_config = tls_config.clone();
let state = state.clone(); let state = state.clone();
let mut builder = LocalExecutorBuilder::default(); LocalExecutorBuilder::default()
.spawn(move || async move {
if config.cpu_pinning.active { pin_current_if_configured_to(
builder = builder.pin_to_cpu( &config.cpu_pinning,
WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), config.num_workers,
WorkerIndex::SocketWorker(i),
); );
}
builder
.spawn(|| async move {
run_socket_thread(config, tls_config, state).await.unwrap(); run_socket_thread(config, tls_config, state).await.unwrap();
}) })
.unwrap(); .unwrap();
} }
pin_current_if_configured_to(
&config.cpu_pinning,
config.num_workers as usize,
WorkerIndex::Other,
);
monitor_statistics(state, &config); monitor_statistics(state, &config);
Ok(()) Ok(())

View file

@ -19,12 +19,6 @@ pub mod network;
pub const SHARED_CHANNEL_SIZE: usize = 4096; pub const SHARED_CHANNEL_SIZE: usize = 4096;
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let state = State::default(); let state = State::default();
update_access_list(&config.access_list, &state.access_list)?; update_access_list(&config.access_list, &state.access_list)?;
@ -38,6 +32,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
::std::thread::spawn(move || run_inner(config, state)); ::std::thread::spawn(move || run_inner(config, state));
} }
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals { for signal in &mut signals {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {
@ -51,12 +51,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
} }
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let num_peers = config.socket_workers + config.request_workers; let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@ -73,16 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone(); let num_bound_sockets = num_bound_sockets.clone();
let mut builder = LocalExecutorBuilder::default(); let executor = LocalExecutorBuilder::default().spawn(move || async move {
pin_current_if_configured_to(
if config.cpu_pinning.active { &config.cpu_pinning,
builder = builder.pin_to_cpu( config.socket_workers,
WorkerIndex::SocketWorker(i) WorkerIndex::SocketWorker(i),
.get_cpu_index(&config.cpu_pinning, config.socket_workers),
); );
}
let executor = builder.spawn(|| async move {
network::run_socket_worker( network::run_socket_worker(
config, config,
state, state,
@ -102,16 +93,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let request_mesh_builder = request_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let mut builder = LocalExecutorBuilder::default(); let executor = LocalExecutorBuilder::default().spawn(move || async move {
pin_current_if_configured_to(
if config.cpu_pinning.active { &config.cpu_pinning,
builder = builder.pin_to_cpu( config.socket_workers,
WorkerIndex::RequestWorker(i) WorkerIndex::RequestWorker(i),
.get_cpu_index(&config.cpu_pinning, config.socket_workers),
); );
}
let executor = builder.spawn(|| async move {
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await .await
}); });
@ -126,6 +114,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
) )
.unwrap(); .unwrap();
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for executor in executors { for executor in executors {
executor executor
.expect("failed to spawn local executor") .expect("failed to spawn local executor")

View file

@ -21,12 +21,6 @@ pub mod tasks;
use common::State; use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let state = State::default(); let state = State::default();
update_access_list(&config.access_list, &state.access_list)?; update_access_list(&config.access_list, &state.access_list)?;
@ -40,6 +34,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
::std::thread::spawn(move || run_inner(config, state)); ::std::thread::spawn(move || run_inner(config, state));
} }
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals { for signal in &mut signals {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {
@ -53,12 +53,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
} }
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let (request_sender, request_receiver) = unbounded(); let (request_sender, request_receiver) = unbounded();
@ -141,6 +135,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
) )
.unwrap(); .unwrap();
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop { loop {
::std::thread::sleep(Duration::from_secs( ::std::thread::sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval, config.cleaning.torrent_cleaning_interval,

View file

@ -21,12 +21,6 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
const SHARED_CHANNEL_SIZE: usize = 1024; const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
let num_peers = config.socket_workers + config.request_workers; let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@ -46,16 +40,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone(); let num_bound_sockets = num_bound_sockets.clone();
let mut builder = LocalExecutorBuilder::default(); let executor = LocalExecutorBuilder::default().spawn(move || async move {
pin_current_if_configured_to(
if config.cpu_pinning.active { &config.cpu_pinning,
builder = builder.pin_to_cpu( config.socket_workers,
WorkerIndex::SocketWorker(i) WorkerIndex::SocketWorker(i),
.get_cpu_index(&config.cpu_pinning, config.socket_workers),
); );
}
let executor = builder.spawn(|| async move {
network::run_socket_worker( network::run_socket_worker(
config, config,
state, state,
@ -76,16 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let request_mesh_builder = request_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let mut builder = LocalExecutorBuilder::default(); let executor = LocalExecutorBuilder::default().spawn(move || async move {
pin_current_if_configured_to(
if config.cpu_pinning.active { &config.cpu_pinning,
builder = builder.pin_to_cpu( config.socket_workers,
WorkerIndex::RequestWorker(i) WorkerIndex::RequestWorker(i),
.get_cpu_index(&config.cpu_pinning, config.socket_workers),
); );
}
let executor = builder.spawn(|| async move {
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await .await
}); });
@ -100,6 +88,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
) )
.unwrap(); .unwrap();
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for executor in executors { for executor in executors {
executor executor
.expect("failed to spawn local executor") .expect("failed to spawn local executor")

View file

@ -17,12 +17,6 @@ pub mod mio;
pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
cfg_if!( cfg_if!(
if #[cfg(feature = "with-glommio")] { if #[cfg(feature = "with-glommio")] {
let state = glommio::common::State::default(); let state = glommio::common::State::default();
@ -48,6 +42,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
); );
} }
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals { for signal in &mut signals {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {

View file

@ -22,16 +22,16 @@ use common::*;
pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
pub fn run(config: Config, state: State) -> anyhow::Result<()> { pub fn run(config: Config, state: State) -> anyhow::Result<()> {
start_workers(config.clone(), state.clone()).expect("couldn't start workers");
// TODO: privdrop here instead
pin_current_if_configured_to( pin_current_if_configured_to(
&config.cpu_pinning, &config.cpu_pinning,
config.socket_workers, config.socket_workers,
WorkerIndex::Other, WorkerIndex::Other,
); );
start_workers(config.clone(), state.clone()).expect("couldn't start workers");
// TODO: privdrop here instead
loop { loop {
::std::thread::sleep(Duration::from_secs( ::std::thread::sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval, config.cleaning.torrent_cleaning_interval,

View file

@ -34,12 +34,6 @@ fn run(config: Config) -> ::anyhow::Result<()> {
println!("Starting client with config: {:#?}", config); println!("Starting client with config: {:#?}", config);
pin_current_if_configured_to(
&config.cpu_pinning,
config.num_workers as usize,
WorkerIndex::Other,
);
let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents);
let mut rng = SmallRng::from_entropy(); let mut rng = SmallRng::from_entropy();
@ -63,21 +57,25 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let tls_config = tls_config.clone(); let tls_config = tls_config.clone();
let state = state.clone(); let state = state.clone();
let mut builder = LocalExecutorBuilder::default(); LocalExecutorBuilder::default()
.spawn(move || async move {
if config.cpu_pinning.active { pin_current_if_configured_to(
builder = builder.pin_to_cpu( &config.cpu_pinning,
WorkerIndex::SocketWorker(i).get_cpu_index(&config.cpu_pinning, config.num_workers), config.num_workers,
WorkerIndex::SocketWorker(i),
); );
}
builder
.spawn(|| async move {
run_socket_thread(config, tls_config, state).await.unwrap(); run_socket_thread(config, tls_config, state).await.unwrap();
}) })
.unwrap(); .unwrap();
} }
pin_current_if_configured_to(
&config.cpu_pinning,
config.num_workers as usize,
WorkerIndex::Other,
);
monitor_statistics(state, &config); monitor_statistics(state, &config);
Ok(()) Ok(())