mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
Merge pull request #24 from greatest-ape/cpu-pinning-fixes
Cpu pinning fixes
This commit is contained in:
commit
48bfaaac03
23 changed files with 378 additions and 177 deletions
53
Cargo.lock
generated
53
Cargo.lock
generated
|
|
@ -17,6 +17,18 @@ version = "1.0.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "affinity"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"errno",
|
||||||
|
"libc",
|
||||||
|
"num_cpus",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ahash"
|
name = "ahash"
|
||||||
version = "0.3.8"
|
version = "0.3.8"
|
||||||
|
|
@ -74,6 +86,7 @@ dependencies = [
|
||||||
name = "aquatic_common"
|
name = "aquatic_common"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"affinity",
|
||||||
"ahash 0.7.6",
|
"ahash 0.7.6",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
|
|
@ -95,7 +108,6 @@ dependencies = [
|
||||||
"aquatic_common",
|
"aquatic_common",
|
||||||
"aquatic_http_protocol",
|
"aquatic_http_protocol",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"core_affinity",
|
|
||||||
"either",
|
"either",
|
||||||
"futures-lite",
|
"futures-lite",
|
||||||
"futures-rustls",
|
"futures-rustls",
|
||||||
|
|
@ -122,6 +134,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"aquatic_cli_helpers",
|
"aquatic_cli_helpers",
|
||||||
|
"aquatic_common",
|
||||||
"aquatic_http_protocol",
|
"aquatic_http_protocol",
|
||||||
"futures-lite",
|
"futures-lite",
|
||||||
"glommio",
|
"glommio",
|
||||||
|
|
@ -166,7 +179,6 @@ dependencies = [
|
||||||
"aquatic_common",
|
"aquatic_common",
|
||||||
"aquatic_udp_protocol",
|
"aquatic_udp_protocol",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"core_affinity",
|
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"futures-lite",
|
"futures-lite",
|
||||||
"glommio",
|
"glommio",
|
||||||
|
|
@ -206,8 +218,8 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"aquatic_cli_helpers",
|
"aquatic_cli_helpers",
|
||||||
|
"aquatic_common",
|
||||||
"aquatic_udp_protocol",
|
"aquatic_udp_protocol",
|
||||||
"core_affinity",
|
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"hashbrown 0.11.2",
|
"hashbrown 0.11.2",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
|
|
@ -241,7 +253,6 @@ dependencies = [
|
||||||
"aquatic_ws_protocol",
|
"aquatic_ws_protocol",
|
||||||
"async-tungstenite",
|
"async-tungstenite",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"core_affinity",
|
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"either",
|
"either",
|
||||||
"futures",
|
"futures",
|
||||||
|
|
@ -273,6 +284,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"aquatic_cli_helpers",
|
"aquatic_cli_helpers",
|
||||||
|
"aquatic_common",
|
||||||
"aquatic_ws_protocol",
|
"aquatic_ws_protocol",
|
||||||
"async-tungstenite",
|
"async-tungstenite",
|
||||||
"futures",
|
"futures",
|
||||||
|
|
@ -534,18 +546,6 @@ version = "0.8.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
|
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "core_affinity"
|
|
||||||
version = "0.5.10"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f"
|
|
||||||
dependencies = [
|
|
||||||
"kernel32-sys",
|
|
||||||
"libc",
|
|
||||||
"num_cpus",
|
|
||||||
"winapi 0.2.8",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cpufeatures"
|
name = "cpufeatures"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
|
|
@ -718,6 +718,27 @@ dependencies = [
|
||||||
"regex",
|
"regex",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "errno"
|
||||||
|
version = "0.2.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1"
|
||||||
|
dependencies = [
|
||||||
|
"errno-dragonfly",
|
||||||
|
"libc",
|
||||||
|
"winapi 0.3.9",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "errno-dragonfly"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "failure"
|
name = "failure"
|
||||||
version = "0.1.8"
|
version = "0.1.8"
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ name = "aquatic_common"
|
||||||
ahash = "0.7"
|
ahash = "0.7"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
|
affinity = "0.1"
|
||||||
hashbrown = "0.11.2"
|
hashbrown = "0.11.2"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
indexmap-amortized = "1"
|
indexmap-amortized = "1"
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,106 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum CpuPinningMode {
|
||||||
|
Ascending,
|
||||||
|
Descending,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for CpuPinningMode {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Ascending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct CpuPinningConfig {
|
pub struct CpuPinningConfig {
|
||||||
pub active: bool,
|
pub active: bool,
|
||||||
pub offset: usize,
|
pub mode: CpuPinningMode,
|
||||||
|
pub virtual_per_physical_cpu: usize,
|
||||||
|
pub offset_cpus: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CpuPinningConfig {
|
impl Default for CpuPinningConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
active: false,
|
active: false,
|
||||||
offset: 0,
|
mode: Default::default(),
|
||||||
|
virtual_per_physical_cpu: 2,
|
||||||
|
offset_cpus: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CpuPinningConfig {
|
||||||
|
pub fn default_for_load_test() -> Self {
|
||||||
|
Self {
|
||||||
|
mode: CpuPinningMode::Descending,
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
pub enum WorkerIndex {
|
||||||
|
SocketWorker(usize),
|
||||||
|
RequestWorker(usize),
|
||||||
|
Other,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerIndex {
|
||||||
|
fn get_cpu_indices(self, config: &CpuPinningConfig, socket_workers: usize) -> Vec<usize> {
|
||||||
|
let offset = match self {
|
||||||
|
Self::Other => config.virtual_per_physical_cpu * config.offset_cpus,
|
||||||
|
Self::SocketWorker(index) => {
|
||||||
|
config.virtual_per_physical_cpu * (config.offset_cpus + 1 + index)
|
||||||
|
}
|
||||||
|
Self::RequestWorker(index) => {
|
||||||
|
config.virtual_per_physical_cpu * (config.offset_cpus + 1 + socket_workers + index)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let virtual_cpus = (0..config.virtual_per_physical_cpu).map(|i| offset + i);
|
||||||
|
|
||||||
|
let virtual_cpus: Vec<usize> = match config.mode {
|
||||||
|
CpuPinningMode::Ascending => virtual_cpus.collect(),
|
||||||
|
CpuPinningMode::Descending => {
|
||||||
|
let max_index = affinity::get_core_num() - 1;
|
||||||
|
|
||||||
|
virtual_cpus
|
||||||
|
.map(|i| max_index.checked_sub(i).unwrap_or(0))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
::log::info!(
|
||||||
|
"Calculated virtual CPU pin indices {:?} for {:?}",
|
||||||
|
virtual_cpus,
|
||||||
|
self
|
||||||
|
);
|
||||||
|
|
||||||
|
virtual_cpus
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Note: don't call this when affinities were already set in the current or in
|
||||||
|
/// a parent thread. Doing so limits the number of cores that are seen and
|
||||||
|
/// messes up setting affinities.
|
||||||
|
pub fn pin_current_if_configured_to(
|
||||||
|
config: &CpuPinningConfig,
|
||||||
|
socket_workers: usize,
|
||||||
|
worker_index: WorkerIndex,
|
||||||
|
) {
|
||||||
|
if config.active {
|
||||||
|
let indices = worker_index.get_cpu_indices(config, socket_workers);
|
||||||
|
|
||||||
|
if let Err(err) = affinity::set_thread_affinity(indices.clone()) {
|
||||||
|
::log::error!(
|
||||||
|
"Failed setting thread affinities {:?} for {:?}: {:#?}",
|
||||||
|
indices,
|
||||||
|
worker_index,
|
||||||
|
err
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ aquatic_cli_helpers = "0.1.0"
|
||||||
aquatic_common = "0.1.0"
|
aquatic_common = "0.1.0"
|
||||||
aquatic_http_protocol = "0.1.0"
|
aquatic_http_protocol = "0.1.0"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
core_affinity = "0.5"
|
|
||||||
either = "1"
|
either = "1"
|
||||||
futures-lite = "1"
|
futures-lite = "1"
|
||||||
futures-rustls = "0.22"
|
futures-rustls = "0.22"
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,9 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use aquatic_common::{
|
use aquatic_common::{
|
||||||
access_list::update_access_list, privileges::drop_privileges_after_socket_binding,
|
access_list::update_access_list,
|
||||||
|
cpu_pinning::{pin_current_if_configured_to, WorkerIndex},
|
||||||
|
privileges::drop_privileges_after_socket_binding,
|
||||||
};
|
};
|
||||||
use common::{State, TlsConfig};
|
use common::{State, TlsConfig};
|
||||||
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
|
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
|
||||||
|
|
@ -23,12 +25,6 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
|
||||||
const SHARED_CHANNEL_SIZE: usize = 1024;
|
const SHARED_CHANNEL_SIZE: usize = 1024;
|
||||||
|
|
||||||
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = State::default();
|
let state = State::default();
|
||||||
|
|
||||||
update_access_list(&config.access_list, &state.access_list)?;
|
update_access_list(&config.access_list, &state.access_list)?;
|
||||||
|
|
@ -42,6 +38,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
::std::thread::spawn(move || run_inner(config, state));
|
::std::thread::spawn(move || run_inner(config, state));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for signal in &mut signals {
|
for signal in &mut signals {
|
||||||
match signal {
|
match signal {
|
||||||
SIGUSR1 => {
|
SIGUSR1 => {
|
||||||
|
|
@ -55,12 +57,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let num_peers = config.socket_workers + config.request_workers;
|
let num_peers = config.socket_workers + config.request_workers;
|
||||||
|
|
||||||
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
|
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
|
||||||
|
|
@ -80,13 +76,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
let num_bound_sockets = num_bound_sockets.clone();
|
let num_bound_sockets = num_bound_sockets.clone();
|
||||||
|
|
||||||
let mut builder = LocalExecutorBuilder::default();
|
let executor = LocalExecutorBuilder::default().spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::SocketWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
if config.cpu_pinning.active {
|
|
||||||
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
let executor = builder.spawn(|| async move {
|
|
||||||
network::run_socket_worker(
|
network::run_socket_worker(
|
||||||
config,
|
config,
|
||||||
state,
|
state,
|
||||||
|
|
@ -107,13 +103,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
let request_mesh_builder = request_mesh_builder.clone();
|
let request_mesh_builder = request_mesh_builder.clone();
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
|
|
||||||
let mut builder = LocalExecutorBuilder::default();
|
let executor = LocalExecutorBuilder::default().spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::RequestWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
if config.cpu_pinning.active {
|
|
||||||
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
let executor = builder.spawn(|| async move {
|
|
||||||
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
@ -128,6 +124,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for executor in executors {
|
for executor in executors {
|
||||||
executor
|
executor
|
||||||
.expect("failed to spawn local executor")
|
.expect("failed to spawn local executor")
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ name = "aquatic_http_load_test"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
aquatic_cli_helpers = "0.1.0"
|
aquatic_cli_helpers = "0.1.0"
|
||||||
|
aquatic_common = "0.1.0"
|
||||||
aquatic_http_protocol = "0.1.0"
|
aquatic_http_protocol = "0.1.0"
|
||||||
futures-lite = "1"
|
futures-lite = "1"
|
||||||
hashbrown = "0.11.2"
|
hashbrown = "0.11.2"
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,26 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use aquatic_cli_helpers::LogLevel;
|
||||||
|
use aquatic_common::cpu_pinning::CpuPinningConfig;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub server_address: SocketAddr,
|
pub server_address: SocketAddr,
|
||||||
pub num_workers: u8,
|
pub log_level: LogLevel,
|
||||||
|
pub num_workers: usize,
|
||||||
pub num_connections: usize,
|
pub num_connections: usize,
|
||||||
pub duration: usize,
|
pub duration: usize,
|
||||||
pub torrents: TorrentConfig,
|
pub torrents: TorrentConfig,
|
||||||
|
pub cpu_pinning: CpuPinningConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl aquatic_cli_helpers::Config for Config {}
|
impl aquatic_cli_helpers::Config for Config {
|
||||||
|
fn get_log_level(&self) -> Option<LogLevel> {
|
||||||
|
Some(self.log_level)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
|
@ -36,10 +44,12 @@ impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
server_address: "127.0.0.1:3000".parse().unwrap(),
|
server_address: "127.0.0.1:3000".parse().unwrap(),
|
||||||
|
log_level: LogLevel::Error,
|
||||||
num_workers: 1,
|
num_workers: 1,
|
||||||
num_connections: 8,
|
num_connections: 8,
|
||||||
duration: 0,
|
duration: 0,
|
||||||
torrents: TorrentConfig::default(),
|
torrents: TorrentConfig::default(),
|
||||||
|
cpu_pinning: CpuPinningConfig::default_for_load_test(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use ::glommio::LocalExecutorBuilder;
|
use ::glommio::LocalExecutorBuilder;
|
||||||
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use rand_distr::Pareto;
|
use rand_distr::Pareto;
|
||||||
|
|
||||||
|
|
@ -56,18 +57,30 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
|
|
||||||
let tls_config = create_tls_config().unwrap();
|
let tls_config = create_tls_config().unwrap();
|
||||||
|
|
||||||
for _ in 0..config.num_workers {
|
for i in 0..config.num_workers {
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
let tls_config = tls_config.clone();
|
let tls_config = tls_config.clone();
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
|
|
||||||
LocalExecutorBuilder::default()
|
LocalExecutorBuilder::default()
|
||||||
.spawn(|| async move {
|
.spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.num_workers,
|
||||||
|
WorkerIndex::SocketWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
run_socket_thread(config, tls_config, state).await.unwrap();
|
run_socket_thread(config, tls_config, state).await.unwrap();
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.num_workers as usize,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
monitor_statistics(state, &config);
|
monitor_statistics(state, &config);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ aquatic_cli_helpers = "0.1.0"
|
||||||
aquatic_common = "0.1.0"
|
aquatic_common = "0.1.0"
|
||||||
aquatic_udp_protocol = "0.1.0"
|
aquatic_udp_protocol = "0.1.0"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
core_affinity = "0.5"
|
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
mimalloc = { version = "0.1", default-features = false }
|
mimalloc = { version = "0.1", default-features = false }
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use std::sync::{atomic::AtomicUsize, Arc};
|
use std::sync::{atomic::AtomicUsize, Arc};
|
||||||
|
|
||||||
use aquatic_common::access_list::update_access_list;
|
use aquatic_common::access_list::update_access_list;
|
||||||
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
||||||
use glommio::channels::channel_mesh::MeshBuilder;
|
use glommio::channels::channel_mesh::MeshBuilder;
|
||||||
use glommio::prelude::*;
|
use glommio::prelude::*;
|
||||||
|
|
@ -18,12 +19,6 @@ pub mod network;
|
||||||
pub const SHARED_CHANNEL_SIZE: usize = 4096;
|
pub const SHARED_CHANNEL_SIZE: usize = 4096;
|
||||||
|
|
||||||
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = State::default();
|
let state = State::default();
|
||||||
|
|
||||||
update_access_list(&config.access_list, &state.access_list)?;
|
update_access_list(&config.access_list, &state.access_list)?;
|
||||||
|
|
@ -37,6 +32,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
::std::thread::spawn(move || run_inner(config, state));
|
::std::thread::spawn(move || run_inner(config, state));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for signal in &mut signals {
|
for signal in &mut signals {
|
||||||
match signal {
|
match signal {
|
||||||
SIGUSR1 => {
|
SIGUSR1 => {
|
||||||
|
|
@ -50,12 +51,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let num_peers = config.socket_workers + config.request_workers;
|
let num_peers = config.socket_workers + config.request_workers;
|
||||||
|
|
||||||
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
|
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
|
||||||
|
|
@ -72,13 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
let num_bound_sockets = num_bound_sockets.clone();
|
let num_bound_sockets = num_bound_sockets.clone();
|
||||||
|
|
||||||
let mut builder = LocalExecutorBuilder::default();
|
let executor = LocalExecutorBuilder::default().spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::SocketWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
if config.cpu_pinning.active {
|
|
||||||
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
let executor = builder.spawn(|| async move {
|
|
||||||
network::run_socket_worker(
|
network::run_socket_worker(
|
||||||
config,
|
config,
|
||||||
state,
|
state,
|
||||||
|
|
@ -98,13 +93,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
let request_mesh_builder = request_mesh_builder.clone();
|
let request_mesh_builder = request_mesh_builder.clone();
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
|
|
||||||
let mut builder = LocalExecutorBuilder::default();
|
let executor = LocalExecutorBuilder::default().spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::RequestWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
if config.cpu_pinning.active {
|
|
||||||
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
let executor = builder.spawn(|| async move {
|
|
||||||
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
@ -119,6 +114,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for executor in executors {
|
for executor in executors {
|
||||||
executor
|
executor
|
||||||
.expect("failed to spawn local executor")
|
.expect("failed to spawn local executor")
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::thread::Builder;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::unbounded;
|
||||||
|
|
||||||
|
|
@ -20,12 +21,6 @@ pub mod tasks;
|
||||||
use common::State;
|
use common::State;
|
||||||
|
|
||||||
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let state = State::default();
|
let state = State::default();
|
||||||
|
|
||||||
update_access_list(&config.access_list, &state.access_list)?;
|
update_access_list(&config.access_list, &state.access_list)?;
|
||||||
|
|
@ -39,6 +34,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
::std::thread::spawn(move || run_inner(config, state));
|
::std::thread::spawn(move || run_inner(config, state));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for signal in &mut signals {
|
for signal in &mut signals {
|
||||||
match signal {
|
match signal {
|
||||||
SIGUSR1 => {
|
SIGUSR1 => {
|
||||||
|
|
@ -52,12 +53,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
|
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
|
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
let (request_sender, request_receiver) = unbounded();
|
let (request_sender, request_receiver) = unbounded();
|
||||||
|
|
@ -72,11 +67,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("request-{:02}", i + 1))
|
.name(format!("request-{:02}", i + 1))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
if config.cpu_pinning.active {
|
pin_current_if_configured_to(
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
&config.cpu_pinning,
|
||||||
id: config.cpu_pinning.offset + 1 + i,
|
config.socket_workers,
|
||||||
});
|
WorkerIndex::RequestWorker(i),
|
||||||
}
|
);
|
||||||
|
|
||||||
handlers::run_request_worker(state, config, request_receiver, response_sender)
|
handlers::run_request_worker(state, config, request_receiver, response_sender)
|
||||||
})
|
})
|
||||||
|
|
@ -93,11 +88,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("socket-{:02}", i + 1))
|
.name(format!("socket-{:02}", i + 1))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
if config.cpu_pinning.active {
|
pin_current_if_configured_to(
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
&config.cpu_pinning,
|
||||||
id: config.cpu_pinning.offset + 1 + config.request_workers + i,
|
config.socket_workers,
|
||||||
});
|
WorkerIndex::SocketWorker(i),
|
||||||
}
|
);
|
||||||
|
|
||||||
network::run_socket_worker(
|
network::run_socket_worker(
|
||||||
state,
|
state,
|
||||||
|
|
@ -118,11 +113,11 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("statistics-collector".to_string())
|
.name("statistics-collector".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
if config.cpu_pinning.active {
|
pin_current_if_configured_to(
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
&config.cpu_pinning,
|
||||||
id: config.cpu_pinning.offset,
|
config.socket_workers,
|
||||||
});
|
WorkerIndex::Other,
|
||||||
}
|
);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
|
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
|
||||||
|
|
@ -140,6 +135,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
::std::thread::sleep(Duration::from_secs(
|
::std::thread::sleep(Duration::from_secs(
|
||||||
config.cleaning.torrent_cleaning_interval,
|
config.cleaning.torrent_cleaning_interval,
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,8 @@ name = "aquatic_udp_load_test"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
aquatic_cli_helpers = "0.1.0"
|
aquatic_cli_helpers = "0.1.0"
|
||||||
|
aquatic_common = "0.1.0"
|
||||||
aquatic_udp_protocol = "0.1.0"
|
aquatic_udp_protocol = "0.1.0"
|
||||||
core_affinity = "0.5"
|
|
||||||
crossbeam-channel = "0.5"
|
crossbeam-channel = "0.5"
|
||||||
hashbrown = "0.11.2"
|
hashbrown = "0.11.2"
|
||||||
mimalloc = { version = "0.1", default-features = false }
|
mimalloc = { version = "0.1", default-features = false }
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::{atomic::AtomicUsize, Arc};
|
use std::sync::{atomic::AtomicUsize, Arc};
|
||||||
|
|
||||||
|
use aquatic_cli_helpers::LogLevel;
|
||||||
|
use aquatic_common::cpu_pinning::CpuPinningConfig;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
@ -15,6 +17,7 @@ pub struct ThreadId(pub u8);
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
/// Server address
|
/// Server address
|
||||||
pub server_address: SocketAddr,
|
pub server_address: SocketAddr,
|
||||||
|
pub log_level: LogLevel,
|
||||||
/// Number of sockets and socket worker threads
|
/// Number of sockets and socket worker threads
|
||||||
///
|
///
|
||||||
/// Sockets will bind to one port each, and with
|
/// Sockets will bind to one port each, and with
|
||||||
|
|
@ -27,7 +30,7 @@ pub struct Config {
|
||||||
pub duration: usize,
|
pub duration: usize,
|
||||||
pub network: NetworkConfig,
|
pub network: NetworkConfig,
|
||||||
pub handler: HandlerConfig,
|
pub handler: HandlerConfig,
|
||||||
pub core_affinity: CoreAffinityConfig,
|
pub cpu_pinning: CpuPinningConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
|
@ -97,23 +100,17 @@ pub struct HandlerConfig {
|
||||||
pub additional_request_factor: f64,
|
pub additional_request_factor: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
#[serde(default)]
|
|
||||||
pub struct CoreAffinityConfig {
|
|
||||||
/// Set core affinities, descending from last core
|
|
||||||
pub set_affinities: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
server_address: "127.0.0.1:3000".parse().unwrap(),
|
server_address: "127.0.0.1:3000".parse().unwrap(),
|
||||||
|
log_level: LogLevel::Error,
|
||||||
num_socket_workers: 1,
|
num_socket_workers: 1,
|
||||||
num_request_workers: 1,
|
num_request_workers: 1,
|
||||||
duration: 0,
|
duration: 0,
|
||||||
network: NetworkConfig::default(),
|
network: NetworkConfig::default(),
|
||||||
handler: HandlerConfig::default(),
|
handler: HandlerConfig::default(),
|
||||||
core_affinity: CoreAffinityConfig::default(),
|
cpu_pinning: CpuPinningConfig::default_for_load_test(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -192,11 +189,3 @@ pub struct SocketWorkerLocalStatistics {
|
||||||
pub responses_scrape: usize,
|
pub responses_scrape: usize,
|
||||||
pub responses_error: usize,
|
pub responses_error: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CoreAffinityConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
set_affinities: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::sync::{atomic::Ordering, Arc};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::unbounded;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
|
@ -30,18 +31,13 @@ pub fn main() {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl aquatic_cli_helpers::Config for Config {}
|
impl aquatic_cli_helpers::Config for Config {
|
||||||
|
fn get_log_level(&self) -> Option<aquatic_cli_helpers::LogLevel> {
|
||||||
|
Some(self.log_level)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn run(config: Config) -> ::anyhow::Result<()> {
|
fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let affinity_max = core_affinity::get_core_ids()
|
|
||||||
.map(|ids| ids.iter().map(|id| id.id).max())
|
|
||||||
.flatten()
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
if config.core_affinity.set_affinities {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max });
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape
|
if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape
|
||||||
== 0
|
== 0
|
||||||
{
|
{
|
||||||
|
|
@ -50,6 +46,12 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
|
|
||||||
println!("Starting client with config: {:#?}", config);
|
println!("Starting client with config: {:#?}", config);
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.num_socket_workers as usize,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents);
|
let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents);
|
||||||
|
|
||||||
for _ in 0..config.handler.number_of_torrents {
|
for _ in 0..config.handler.number_of_torrents {
|
||||||
|
|
@ -101,11 +103,11 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
if config.core_affinity.set_affinities {
|
pin_current_if_configured_to(
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
&config.cpu_pinning,
|
||||||
id: affinity_max - 1 - i as usize,
|
config.num_socket_workers as usize,
|
||||||
});
|
WorkerIndex::SocketWorker(i as usize),
|
||||||
}
|
);
|
||||||
|
|
||||||
run_socket_thread(state, response_sender, receiver, &config, addr, thread_id)
|
run_socket_thread(state, response_sender, receiver, &config, addr, thread_id)
|
||||||
});
|
});
|
||||||
|
|
@ -118,11 +120,11 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let response_receiver = response_receiver.clone();
|
let response_receiver = response_receiver.clone();
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
if config.core_affinity.set_affinities {
|
pin_current_if_configured_to(
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
&config.cpu_pinning,
|
||||||
id: affinity_max - config.num_socket_workers as usize - 1 - i as usize,
|
config.num_socket_workers as usize,
|
||||||
});
|
WorkerIndex::RequestWorker(i as usize),
|
||||||
}
|
);
|
||||||
run_handler_thread(&config, state, pareto, request_senders, response_receiver)
|
run_handler_thread(&config, state, pareto, request_senders, response_receiver)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ aquatic_cli_helpers = "0.1.0"
|
||||||
aquatic_common = "0.1.0"
|
aquatic_common = "0.1.0"
|
||||||
aquatic_ws_protocol = "0.1.0"
|
aquatic_ws_protocol = "0.1.0"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
core_affinity = "0.5"
|
|
||||||
either = "1"
|
either = "1"
|
||||||
hashbrown = { version = "0.11.2", features = ["serde"] }
|
hashbrown = { version = "0.11.2", features = ["serde"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,10 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
use aquatic_common::{
|
||||||
|
cpu_pinning::{pin_current_if_configured_to, WorkerIndex},
|
||||||
|
privileges::drop_privileges_after_socket_binding,
|
||||||
|
};
|
||||||
|
|
||||||
use self::common::*;
|
use self::common::*;
|
||||||
|
|
||||||
|
|
@ -18,12 +21,6 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
|
||||||
const SHARED_CHANNEL_SIZE: usize = 1024;
|
const SHARED_CHANNEL_SIZE: usize = 1024;
|
||||||
|
|
||||||
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let num_peers = config.socket_workers + config.request_workers;
|
let num_peers = config.socket_workers + config.request_workers;
|
||||||
|
|
||||||
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
|
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
|
||||||
|
|
@ -43,13 +40,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
let num_bound_sockets = num_bound_sockets.clone();
|
let num_bound_sockets = num_bound_sockets.clone();
|
||||||
|
|
||||||
let mut builder = LocalExecutorBuilder::default();
|
let executor = LocalExecutorBuilder::default().spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::SocketWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
if config.cpu_pinning.active {
|
|
||||||
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
let executor = builder.spawn(|| async move {
|
|
||||||
network::run_socket_worker(
|
network::run_socket_worker(
|
||||||
config,
|
config,
|
||||||
state,
|
state,
|
||||||
|
|
@ -70,13 +67,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
let request_mesh_builder = request_mesh_builder.clone();
|
let request_mesh_builder = request_mesh_builder.clone();
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
|
|
||||||
let mut builder = LocalExecutorBuilder::default();
|
let executor = LocalExecutorBuilder::default().spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::RequestWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
if config.cpu_pinning.active {
|
|
||||||
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
|
|
||||||
}
|
|
||||||
|
|
||||||
let executor = builder.spawn(|| async move {
|
|
||||||
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
@ -91,6 +88,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for executor in executors {
|
for executor in executors {
|
||||||
executor
|
executor
|
||||||
.expect("failed to spawn local executor")
|
.expect("failed to spawn local executor")
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
use aquatic_common::access_list::update_access_list;
|
use aquatic_common::{
|
||||||
|
access_list::update_access_list,
|
||||||
|
cpu_pinning::{pin_current_if_configured_to, WorkerIndex},
|
||||||
|
};
|
||||||
use cfg_if::cfg_if;
|
use cfg_if::cfg_if;
|
||||||
use signal_hook::{consts::SIGUSR1, iterator::Signals};
|
use signal_hook::{consts::SIGUSR1, iterator::Signals};
|
||||||
|
|
||||||
|
|
@ -14,12 +17,6 @@ pub mod mio;
|
||||||
pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
|
pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
|
||||||
|
|
||||||
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
if config.cpu_pinning.active {
|
|
||||||
core_affinity::set_for_current(core_affinity::CoreId {
|
|
||||||
id: config.cpu_pinning.offset,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_if!(
|
cfg_if!(
|
||||||
if #[cfg(feature = "with-glommio")] {
|
if #[cfg(feature = "with-glommio")] {
|
||||||
let state = glommio::common::State::default();
|
let state = glommio::common::State::default();
|
||||||
|
|
@ -45,6 +42,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
for signal in &mut signals {
|
for signal in &mut signals {
|
||||||
match signal {
|
match signal {
|
||||||
SIGUSR1 => {
|
SIGUSR1 => {
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ use std::thread::Builder;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use histogram::Histogram;
|
use histogram::Histogram;
|
||||||
use mio::{Poll, Waker};
|
use mio::{Poll, Waker};
|
||||||
use native_tls::{Identity, TlsAcceptor};
|
use native_tls::{Identity, TlsAcceptor};
|
||||||
|
|
@ -25,6 +26,12 @@ pub fn run(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
|
|
||||||
// TODO: privdrop here instead
|
// TODO: privdrop here instead
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
::std::thread::sleep(Duration::from_secs(
|
::std::thread::sleep(Duration::from_secs(
|
||||||
config.cleaning.torrent_cleaning_interval,
|
config.cleaning.torrent_cleaning_interval,
|
||||||
|
|
@ -69,6 +76,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("socket-{:02}", i + 1))
|
.name(format!("socket-{:02}", i + 1))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::SocketWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
network::run_socket_worker(
|
network::run_socket_worker(
|
||||||
config,
|
config,
|
||||||
state,
|
state,
|
||||||
|
|
@ -121,6 +134,12 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("request-{:02}", i + 1))
|
.name(format!("request-{:02}", i + 1))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::RequestWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
handlers::run_request_worker(
|
handlers::run_request_worker(
|
||||||
config,
|
config,
|
||||||
state,
|
state,
|
||||||
|
|
@ -137,10 +156,18 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("statistics".to_string())
|
.name("statistics".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || {
|
||||||
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.socket_workers,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
print_statistics(&state);
|
loop {
|
||||||
|
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
|
||||||
|
|
||||||
|
print_statistics(&state);
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.expect("spawn statistics thread");
|
.expect("spawn statistics thread");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ name = "aquatic_ws_load_test"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
async-tungstenite = "0.15"
|
async-tungstenite = "0.15"
|
||||||
aquatic_cli_helpers = "0.1.0"
|
aquatic_cli_helpers = "0.1.0"
|
||||||
|
aquatic_common = "0.1.0"
|
||||||
aquatic_ws_protocol = "0.1.0"
|
aquatic_ws_protocol = "0.1.0"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-rustls = "0.22"
|
futures-rustls = "0.22"
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,26 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use aquatic_cli_helpers::LogLevel;
|
||||||
|
use aquatic_common::cpu_pinning::CpuPinningConfig;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub server_address: SocketAddr,
|
pub server_address: SocketAddr,
|
||||||
pub num_workers: u8,
|
pub log_level: LogLevel,
|
||||||
|
pub num_workers: usize,
|
||||||
pub num_connections: usize,
|
pub num_connections: usize,
|
||||||
pub duration: usize,
|
pub duration: usize,
|
||||||
pub torrents: TorrentConfig,
|
pub torrents: TorrentConfig,
|
||||||
|
pub cpu_pinning: CpuPinningConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl aquatic_cli_helpers::Config for Config {}
|
impl aquatic_cli_helpers::Config for Config {
|
||||||
|
fn get_log_level(&self) -> Option<LogLevel> {
|
||||||
|
Some(self.log_level)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
|
@ -37,10 +45,12 @@ impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
server_address: "127.0.0.1:3000".parse().unwrap(),
|
server_address: "127.0.0.1:3000".parse().unwrap(),
|
||||||
|
log_level: LogLevel::Error,
|
||||||
num_workers: 1,
|
num_workers: 1,
|
||||||
num_connections: 16,
|
num_connections: 16,
|
||||||
duration: 0,
|
duration: 0,
|
||||||
torrents: TorrentConfig::default(),
|
torrents: TorrentConfig::default(),
|
||||||
|
cpu_pinning: CpuPinningConfig::default_for_load_test(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use glommio::LocalExecutorBuilder;
|
use glommio::LocalExecutorBuilder;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use rand_distr::Pareto;
|
use rand_distr::Pareto;
|
||||||
|
|
@ -51,18 +52,30 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
|
|
||||||
let tls_config = create_tls_config().unwrap();
|
let tls_config = create_tls_config().unwrap();
|
||||||
|
|
||||||
for _ in 0..config.num_workers {
|
for i in 0..config.num_workers {
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
let tls_config = tls_config.clone();
|
let tls_config = tls_config.clone();
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
|
|
||||||
LocalExecutorBuilder::default()
|
LocalExecutorBuilder::default()
|
||||||
.spawn(|| async move {
|
.spawn(move || async move {
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.num_workers,
|
||||||
|
WorkerIndex::SocketWorker(i),
|
||||||
|
);
|
||||||
|
|
||||||
run_socket_thread(config, tls_config, state).await.unwrap();
|
run_socket_thread(config, tls_config, state).await.unwrap();
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pin_current_if_configured_to(
|
||||||
|
&config.cpu_pinning,
|
||||||
|
config.num_workers as usize,
|
||||||
|
WorkerIndex::Other,
|
||||||
|
);
|
||||||
|
|
||||||
monitor_statistics(state, &config);
|
monitor_statistics(state, &config);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,13 @@
|
||||||
#!/bin/sh
|
#!/bin/bash
|
||||||
|
|
||||||
. ./scripts/env-native-cpu-without-avx-512
|
. ./scripts/env-native-cpu-without-avx-512
|
||||||
|
|
||||||
cargo run --release --bin aquatic_udp -- $@
|
if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then
|
||||||
|
echo "Usage: $0 [mio|glommio] [ARGS]"
|
||||||
|
else
|
||||||
|
if [ "$1" = "mio" ]; then
|
||||||
|
cargo run --release --bin aquatic_udp -- "${@:2}"
|
||||||
|
else
|
||||||
|
cargo run --release --features "with-glommio" --no-default-features --bin aquatic_udp -- "${@:2}"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,14 @@
|
||||||
#!/bin/sh
|
#!/bin/bash
|
||||||
|
|
||||||
. ./scripts/env-native-cpu-without-avx-512
|
. ./scripts/env-native-cpu-without-avx-512
|
||||||
|
|
||||||
cargo run --release --bin aquatic_ws -- $@
|
if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then
|
||||||
|
echo "Usage: $0 [mio|glommio] [ARGS]"
|
||||||
|
else
|
||||||
|
if [ "$1" = "mio" ]; then
|
||||||
|
cargo run --release --bin aquatic_ws -- "${@:2}"
|
||||||
|
else
|
||||||
|
cargo run --release --features "with-glommio" --no-default-features --bin aquatic_ws -- "${@:2}"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue