diff --git a/Cargo.lock b/Cargo.lock index 73fc3bd..b66fecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,6 @@ dependencies = [ "aquatic_common", "aquatic_http_protocol", "cfg-if", - "core_affinity", "either", "futures-lite", "futures-rustls", @@ -123,6 +122,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_http_protocol", "futures-lite", "glommio", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index f6ea2ee..fffdb5c 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -21,7 +21,6 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" cfg-if = "1" -core_affinity = "0.5" either = "1" futures-lite = "1" futures-rustls = "0.22" diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index ef584e5..9172caa 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -5,7 +5,9 @@ use std::{ }; use aquatic_common::{ - access_list::update_access_list, privileges::drop_privileges_after_socket_binding, + access_list::update_access_list, + cpu_pinning::{pin_current_if_configured_to, WorkerIndex}, + privileges::drop_privileges_after_socket_binding, }; use common::{State, TlsConfig}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; @@ -23,11 +25,11 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let state = State::default(); @@ -55,11 +57,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - if config.cpu_pinning.active { - core_affinity::set_for_current(core_affinity::CoreId { - id: config.cpu_pinning.offset, - }); - } + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); let num_peers = config.socket_workers + config.request_workers; @@ -83,7 +85,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { @@ -110,7 +115,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu( + WorkerIndex::RequestWorker(i) + .get_cpu_index(&config.cpu_pinning, config.socket_workers), + ); } let executor = builder.spawn(|| async move { diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 25baab6..83161b1 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -12,6 +12,7 @@ name = "aquatic_http_load_test" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" futures-lite = "1" hashbrown = "0.11.2" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 1c8456a..a3db7d4 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,18 +1,26 @@ use std::net::SocketAddr; +use aquatic_cli_helpers::LogLevel; +use aquatic_common::cpu_pinning::CpuPinningConfig; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct Config { pub server_address: SocketAddr, - pub num_workers: u8, + pub log_level: LogLevel, + pub num_workers: usize, pub num_connections: usize, pub duration: usize, pub torrents: TorrentConfig, + pub cpu_pinning: CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config {} +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -36,10 +44,12 @@ impl Default for Config { fn default() -> Self { Self { server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, num_workers: 1, num_connections: 8, duration: 0, torrents: TorrentConfig::default(), + cpu_pinning: CpuPinningConfig::default_for_load_test(), } } } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index e719f77..2a1a4c5 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -3,6 +3,7 @@ use std::thread; use std::time::{Duration, Instant}; use ::glommio::LocalExecutorBuilder; +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use rand::prelude::*; use rand_distr::Pareto; @@ -36,6 +37,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); + pin_current_if_configured_to( + &config.cpu_pinning, + config.num_workers as usize, + WorkerIndex::Other, + ); + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); let mut rng = SmallRng::from_entropy(); @@ -56,12 +63,21 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for _ in 0..config.num_workers { + for i in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); - LocalExecutorBuilder::default() + let mut builder = LocalExecutorBuilder::default(); + + if config.cpu_pinning.active { + builder = builder.pin_to_cpu( + WorkerIndex::SocketWorker(i) + .get_cpu_index(&config.cpu_pinning, config.num_workers), + ); + } + + builder .spawn(|| async move { run_socket_thread(config, tls_config, state).await.unwrap(); })