diff --git a/CHANGELOG.md b/CHANGELOG.md index 560f800..2ed4377 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,10 +57,12 @@ a lot of memory if many torrents are tracked * Improve announce performance by avoiding having to filter response peers * In announce response statistics, don't include announcing peer +* Remove CPU pinning support #### Fixed * Fix bug where clean up after closing connections wasn't always done +* Quit whole application if any worker thread quits ### aquatic_ws @@ -78,6 +80,7 @@ * Only consider announce and scrape responses as signs of connection still being alive. Previously, all messages sent to peer were considered. * Decrease default max_peer_age and max_connection_idle config values +* Remove CPU pinning support #### Fixed @@ -87,6 +90,7 @@ * Actually close connections that are too slow to send responses to * If peers announce with AnnounceEvent::Stopped, allow them to later announce on same torrent with different peer_id +* Quit whole application if any worker thread quits ## 0.8.0 - 2023-03-17 diff --git a/Cargo.lock b/Cargo.lock index 59407d7..41caaab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,9 +78,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" +checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" [[package]] name = "anstyle-parse" @@ -136,16 +136,16 @@ dependencies = [ "aquatic_udp_load_test", "clap 4.4.18", "humanize-bytes", - "indexmap 2.1.0", + "indexmap 2.2.2", "indoc", - "itertools 0.12.0", + "itertools 0.12.1", "nonblock", "num-format", "once_cell", "regex", "serde", "tempfile", - "toml 0.8.8", + "toml 0.8.9", ] [[package]] @@ -158,19 +158,22 @@ dependencies = [ "arc-swap", "duplicate", "git-testament", - "glommio", "hashbrown 0.14.3", "hex", "hwloc", - "indexmap 2.1.0", + "indexmap 2.2.2", "libc", "log", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "privdrop", "rand", "rustls", "rustls-pemfile", "serde", - "simple_logger", + "simplelog", + "tokio", "toml 0.5.11", ] @@ -196,7 +199,7 @@ dependencies = [ "log", "memchr", "metrics", - "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "once_cell", "privdrop", @@ -309,8 +312,6 @@ dependencies = [ "libc", "log", "metrics", - "metrics-exporter-prometheus", - "metrics-util", "mimalloc", "mio", "num-format", @@ -324,7 +325,6 @@ dependencies = [ "tempfile", "time", "tinytemplate", - "tokio", ] [[package]] @@ -377,10 +377,9 @@ dependencies = [ "glommio", "hashbrown 0.14.3", "httparse", - "indexmap 2.1.0", + "indexmap 2.2.2", "log", "metrics", - "metrics-exporter-prometheus", "metrics-util", "mimalloc", "privdrop", @@ -537,9 +536,9 @@ checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "bitmaps" -version = "3.2.0" +version = "3.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703642b98a00b3b90513279a8ede3fcfa479c126c5fb46e78f3051522f021403" +checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6" [[package]] name = "blake3" @@ -717,16 +716,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "colored" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" -dependencies = [ - "lazy_static", - "windows-sys 0.48.0", -] - [[package]] name = "compact_str" version = "0.7.1" @@ -1484,9 +1473,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -1518,9 +1507,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" +checksum = "a9febecd4aebbe9c7c23c8e536e966805fdf09944c8a915e7991ee51acb67087" dependencies = [ "bitflags 1.3.2", "libc", @@ -1554,9 +1543,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" dependencies = [ "either", ] @@ -1658,9 +1647,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -1805,9 +1794,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -1898,6 +1887,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-format" version = "0.4.4" @@ -2363,9 +2358,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno 0.3.8", @@ -2400,15 +2395,15 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" [[package]] name = "rustls-webpki" -version = "0.102.1" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring", "rustls-pki-types", @@ -2489,9 +2484,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.112" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", @@ -2560,15 +2555,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] -name = "simple_logger" -version = "4.3.3" +name = "simplelog" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e7e46c8c90251d47d08b28b8a419ffb4aede0f87c2eea95e17d1d5bacbf3ef1" +checksum = "acee08041c5de3d5048c8b3f6f13fafb3026b24ba43c6a695a0c76179b844369" dependencies = [ - "colored", "log", + "termcolor", "time", - "windows-sys 0.48.0", ] [[package]] @@ -2722,6 +2716,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + [[package]] name = "textwrap" version = "0.16.0" @@ -2750,13 +2753,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -2772,10 +2776,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -2806,9 +2811,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "libc", @@ -2829,9 +2834,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35" +checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" dependencies = [ "serde", "serde_spanned", @@ -2850,11 +2855,11 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.2", "serde", "serde_spanned", "toml_datetime", @@ -3275,9 +3280,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.35" +version = "0.5.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1931d78a9c73861da0134f453bb1f790ce49b2e30eba8410b4b79bac72b46a2d" +checksum = "a7cad8365489051ae9f054164e459304af2e7e9bb407c958076c8bf4aef52da5" dependencies = [ "memchr", ] diff --git a/TODO.md b/TODO.md index 829af77..351a248 100644 --- a/TODO.md +++ b/TODO.md @@ -2,20 +2,11 @@ ## High priority -* http and ws - * add task to generate prometheus exports on regular interval to clean up - data. this is important if peer_clients is activated - * aquatic_bench * Opentracker "slow to get up to speed", is it due to getting faster once inserts are rarely needed since most ip-port combinations have been sent? In that case, a shorter duration (e.g., 30 seconds) would be a good idea. -* general - * Replace panic sentinel with checking threads like in udp implementation. - It seems to be broken - - ## Medium priority * stagger cleaning tasks? @@ -52,11 +43,6 @@ * move additional request sending to for each received response, maybe with probability 0.2 -* aquatic_ws - * large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes - * 20 byte parsing: consider using something like ArrayString<80> to avoid - heap allocations - # Not important * aquatic_http: @@ -82,5 +68,3 @@ ## aquatic_udp_protocol * Use `bytes` crate: seems to worsen performance somewhat -* Zerocopy (https://docs.rs/zerocopy/0.3.0/zerocopy/index.html) for requests - and responses. Doesn't improve performance diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2dafac8..2bd41a0 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -14,6 +14,9 @@ name = "aquatic_common" [features] rustls = ["dep:rustls", "rustls-pemfile"] +prometheus = ["dep:metrics", "dep:metrics-util", "dep:metrics-exporter-prometheus", "dep:tokio"] +# Experimental CPU pinning support. Requires hwloc (apt-get install libhwloc-dev) +cpu-pinning = ["dep:hwloc"] [dependencies] aquatic_toml_config.workspace = true @@ -31,11 +34,18 @@ log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -simple_logger = { version = "4", features = ["stderr"] } +simplelog = { version = "0.12" } toml = "0.5" -# Optional -glommio = { version = "0.8", optional = true } -hwloc = { version = "0.5", optional = true } +# rustls feature rustls = { version = "0.22", optional = true } rustls-pemfile = { version = "2", optional = true } + +# prometheus feature +metrics = { version = "0.22", optional = true } +metrics-util = { version = "0.16", optional = true } +metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } +tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } + +# cpu pinning feature +hwloc = { version = "0.5", optional = true } \ No newline at end of file diff --git a/crates/common/src/cli.rs b/crates/common/src/cli.rs index d935285..16521d3 100644 --- a/crates/common/src/cli.rs +++ b/crates/common/src/cli.rs @@ -6,7 +6,7 @@ use aquatic_toml_config::TomlConfig; use git_testament::{git_testament, CommitKind}; use log::LevelFilter; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use simple_logger::SimpleLogger; +use simplelog::{ColorChoice, TermLogger, TerminalMode, ThreadLogMode}; /// Log level. Available values are off, error, warn, info, debug and trace. #[derive(Debug, Clone, Copy, PartialEq, TomlConfig, Serialize, Deserialize)] @@ -203,6 +203,19 @@ where } fn start_logger(log_level: LogLevel) -> ::anyhow::Result<()> { + let mut builder = simplelog::ConfigBuilder::new(); + + builder + .set_thread_mode(ThreadLogMode::Both) + .set_thread_level(LevelFilter::Error) + .set_target_level(LevelFilter::Error) + .set_location_level(LevelFilter::Off); + + let config = match builder.set_time_offset_to_local() { + Ok(builder) => builder.build(), + Err(builder) => builder.build(), + }; + let level_filter = match log_level { LogLevel::Off => LevelFilter::Off, LogLevel::Error => LevelFilter::Error, @@ -212,11 +225,13 @@ fn start_logger(log_level: LogLevel) -> ::anyhow::Result<()> { LogLevel::Trace => LevelFilter::Trace, }; - SimpleLogger::new() - .with_level(level_filter) - .with_utc_timestamps() - .init() - .context("Couldn't initialize logger")?; + TermLogger::init( + level_filter, + config, + TerminalMode::Stderr, + ColorChoice::Auto, + ) + .context("Couldn't initialize logger")?; Ok(()) } diff --git a/crates/common/src/cpu_pinning.rs b/crates/common/src/cpu_pinning.rs index 529f15c..30c9918 100644 --- a/crates/common/src/cpu_pinning.rs +++ b/crates/common/src/cpu_pinning.rs @@ -16,27 +16,9 @@ impl Default for CpuPinningDirection { } } -#[cfg(feature = "glommio")] -#[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub enum HyperThreadMapping { - System, - Subsequent, - Split, -} - -#[cfg(feature = "glommio")] -impl Default for HyperThreadMapping { - fn default() -> Self { - Self::System - } -} - pub trait CpuPinningConfig { fn active(&self) -> bool; fn direction(&self) -> CpuPinningDirection; - #[cfg(feature = "glommio")] - fn hyperthread(&self) -> HyperThreadMapping; fn core_offset(&self) -> usize; } @@ -54,8 +36,6 @@ pub mod mod_name { pub struct struct_name { pub active: bool, pub direction: CpuPinningDirection, - #[cfg(feature = "glommio")] - pub hyperthread: HyperThreadMapping, pub core_offset: usize, } @@ -64,8 +44,6 @@ pub mod mod_name { Self { active: false, direction: cpu_pinning_direction, - #[cfg(feature = "glommio")] - hyperthread: Default::default(), core_offset: 0, } } @@ -77,10 +55,6 @@ pub mod mod_name { fn direction(&self) -> CpuPinningDirection { self.direction } - #[cfg(feature = "glommio")] - fn hyperthread(&self) -> HyperThreadMapping { - self.hyperthread - } fn core_offset(&self) -> usize { self.core_offset } @@ -119,158 +93,9 @@ impl WorkerIndex { } } -#[cfg(feature = "glommio")] -pub mod glommio { - use ::glommio::{CpuSet, Placement}; - - use super::*; - - fn get_cpu_set() -> anyhow::Result { - CpuSet::online().map_err(|err| anyhow::anyhow!("Couldn't get CPU set: {:#}", err)) - } - - fn get_num_cpu_cores() -> anyhow::Result { - get_cpu_set()? - .iter() - .map(|l| l.core) - .max() - .map(|index| index + 1) - .ok_or(anyhow::anyhow!("CpuSet is empty")) - } - - fn logical_cpus_string(cpu_set: &CpuSet) -> String { - let mut logical_cpus = cpu_set.iter().map(|l| l.cpu).collect::>(); - - logical_cpus.sort_unstable(); - - logical_cpus - .into_iter() - .map(|cpu| cpu.to_string()) - .collect::>() - .join(", ") - } - - fn get_worker_cpu_set( - config: &C, - socket_workers: usize, - swarm_workers: usize, - worker_index: WorkerIndex, - ) -> anyhow::Result { - let num_cpu_cores = get_num_cpu_cores()?; - - let core_index = - worker_index.get_core_index(config, socket_workers, swarm_workers, num_cpu_cores); - - let too_many_workers = match (&config.hyperthread(), &config.direction()) { - ( - HyperThreadMapping::Split | HyperThreadMapping::Subsequent, - CpuPinningDirection::Ascending, - ) => core_index >= num_cpu_cores / 2, - ( - HyperThreadMapping::Split | HyperThreadMapping::Subsequent, - CpuPinningDirection::Descending, - ) => core_index < num_cpu_cores / 2, - (_, _) => false, - }; - - if too_many_workers { - return Err(anyhow::anyhow!("CPU pinning: total number of workers (including the single utility worker) can not exceed number of virtual CPUs / 2 - core_offset in this hyperthread mapping mode")); - } - - let cpu_set = match config.hyperthread() { - HyperThreadMapping::System => get_cpu_set()?.filter(|l| l.core == core_index), - HyperThreadMapping::Split => match config.direction() { - CpuPinningDirection::Ascending => get_cpu_set()? - .filter(|l| l.cpu == core_index || l.cpu == core_index + num_cpu_cores / 2), - CpuPinningDirection::Descending => get_cpu_set()? - .filter(|l| l.cpu == core_index || l.cpu == core_index - num_cpu_cores / 2), - }, - HyperThreadMapping::Subsequent => { - let cpu_index_offset = match config.direction() { - // 0 -> 0 and 1 - // 1 -> 2 and 3 - // 2 -> 4 and 5 - CpuPinningDirection::Ascending => core_index * 2, - // 15 -> 14 and 15 - // 14 -> 12 and 13 - // 13 -> 10 and 11 - CpuPinningDirection::Descending => { - num_cpu_cores - 2 * (num_cpu_cores - core_index) - } - }; - - get_cpu_set()? - .filter(|l| l.cpu == cpu_index_offset || l.cpu == cpu_index_offset + 1) - } - }; - - if cpu_set.is_empty() { - Err(anyhow::anyhow!( - "CPU pinning: produced empty CPU set for {:?}. Try decreasing number of workers", - worker_index - )) - } else { - ::log::info!( - "Logical CPUs for {:?}: {}", - worker_index, - logical_cpus_string(&cpu_set) - ); - - Ok(cpu_set) - } - } - - pub fn get_worker_placement( - config: &C, - socket_workers: usize, - swarm_workers: usize, - worker_index: WorkerIndex, - ) -> anyhow::Result { - if config.active() { - let cpu_set = get_worker_cpu_set(config, socket_workers, swarm_workers, worker_index)?; - - Ok(Placement::Fenced(cpu_set)) - } else { - Ok(Placement::Unbound) - } - } - - pub fn set_affinity_for_util_worker( - config: &C, - socket_workers: usize, - swarm_workers: usize, - ) -> anyhow::Result<()> { - let worker_cpu_set = - get_worker_cpu_set(config, socket_workers, swarm_workers, WorkerIndex::Util)?; - - unsafe { - let mut set: libc::cpu_set_t = ::std::mem::zeroed(); - - for cpu_location in worker_cpu_set { - libc::CPU_SET(cpu_location.cpu, &mut set); - } - - let status = libc::pthread_setaffinity_np( - libc::pthread_self(), - ::std::mem::size_of::(), - &set, - ); - - if status != 0 { - return Err(anyhow::Error::new(::std::io::Error::from_raw_os_error( - status, - ))); - } - } - - Ok(()) - } -} - /// Pin current thread to a suitable core /// /// Requires hwloc (`apt-get install libhwloc-dev`) -#[cfg(feature = "hwloc")] pub fn pin_current_if_configured_to( config: &C, socket_workers: usize, diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 6ef4240..ea5b928 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,12 +1,12 @@ +use std::fmt::Display; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::time::Instant; use ahash::RandomState; pub mod access_list; pub mod cli; +#[cfg(feature = "cpu-pinning")] pub mod cpu_pinning; pub mod privileges; #[cfg(feature = "rustls")] @@ -56,42 +56,6 @@ impl ServerStartInstant { #[derive(Debug, Clone, Copy)] pub struct SecondsSinceServerStart(u32); -pub struct PanicSentinelWatcher(Arc); - -impl PanicSentinelWatcher { - pub fn create_with_sentinel() -> (Self, PanicSentinel) { - let triggered = Arc::new(AtomicBool::new(false)); - let sentinel = PanicSentinel(triggered.clone()); - - (Self(triggered), sentinel) - } - - pub fn panic_was_triggered(&self) -> bool { - self.0.load(Ordering::SeqCst) - } -} - -/// Raises SIGTERM when dropped -/// -/// Pass to threads to have panics in them cause whole program to exit. -#[derive(Clone)] -pub struct PanicSentinel(Arc); - -impl Drop for PanicSentinel { - fn drop(&mut self) { - if ::std::thread::panicking() { - let already_triggered = self.0.fetch_or(true, Ordering::SeqCst); - - if !already_triggered && unsafe { libc::raise(15) } == -1 { - panic!( - "Could not raise SIGTERM: {:#}", - ::std::io::Error::last_os_error() - ) - } - } - } -} - /// SocketAddr that is not an IPv6-mapped IPv4 address #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub struct CanonicalSocketAddr(SocketAddr); @@ -138,3 +102,80 @@ impl CanonicalSocketAddr { self.0.is_ipv4() } } + +#[cfg(feature = "prometheus")] +pub fn spawn_prometheus_endpoint( + addr: SocketAddr, + timeout: Option<::std::time::Duration>, + timeout_mask: Option, +) -> anyhow::Result<::std::thread::JoinHandle>> { + use std::thread::Builder; + use std::time::Duration; + + use anyhow::Context; + + let handle = Builder::new() + .name("prometheus".into()) + .spawn(move || { + use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; + + let rt = ::tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("build prometheus tokio runtime")?; + + rt.block_on(async { + let mask = timeout_mask.unwrap_or(MetricKindMask::ALL); + + let (recorder, exporter) = PrometheusBuilder::new() + .idle_timeout(mask, timeout) + .with_http_listener(addr) + .build() + .context("build prometheus recorder and exporter")?; + + let recorder_handle = recorder.handle(); + + ::metrics::set_global_recorder(recorder).context("set global metrics recorder")?; + + ::tokio::spawn(async move { + let mut interval = ::tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Periodically render metrics to make sure + // idles are cleaned up + recorder_handle.render(); + } + }); + + exporter.await.context("run prometheus exporter") + }) + }) + .context("spawn prometheus endpoint")?; + + Ok(handle) +} + +pub enum WorkerType { + Swarm(usize), + Socket(usize), + Statistics, + Signals, + #[cfg(feature = "prometheus")] + Prometheus, +} + +impl Display for WorkerType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)), + Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), + Self::Statistics => f.write_str("Statistics worker"), + Self::Signals => f.write_str("Signals worker"), + #[cfg(feature = "prometheus")] + Self::Prometheus => f.write_str("Prometheus worker"), + } + } +} diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 3967ce2..c9439ad 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -19,11 +19,11 @@ name = "aquatic_http" [features] default = ["prometheus"] -prometheus = ["metrics", "metrics-exporter-prometheus"] +prometheus = ["aquatic_common/prometheus", "metrics", "dep:metrics-util"] metrics = ["dep:metrics"] [dependencies] -aquatic_common = { workspace = true, features = ["rustls", "glommio"] } +aquatic_common = { workspace = true, features = ["rustls"] } aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true @@ -40,8 +40,6 @@ httparse = "1" itoa = "1" libc = "0.2" log = "0.4" -metrics = { version = "0.22", optional = true } -metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } memchr = "2" privdrop = "0.5" @@ -54,6 +52,10 @@ slotmap = "1" socket2 = { version = "0.5", features = ["all"] } thiserror = "1" +# metrics feature +metrics = { version = "0.22", optional = true } +metrics-util = { version = "0.16", optional = true } + [dev-dependencies] quickcheck = "1" quickcheck_macros = "1" diff --git a/crates/http/src/config.rs b/crates/http/src/config.rs index f7a4666..68c0e3d 100644 --- a/crates/http/src/config.rs +++ b/crates/http/src/config.rs @@ -1,9 +1,6 @@ use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::{ - access_list::AccessListConfig, cpu_pinning::asc::CpuPinningConfigAsc, - privileges::PrivilegeConfig, -}; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_toml_config::TomlConfig; use serde::{Deserialize, Serialize}; @@ -43,7 +40,6 @@ pub struct Config { /// emitting of an error-level log message, while successful updates of the /// access list result in emitting of an info-level log message. pub access_list: AccessListConfig, - pub cpu_pinning: CpuPinningConfigAsc, #[cfg(feature = "metrics")] pub metrics: MetricsConfig, } @@ -59,7 +55,6 @@ impl Default for Config { cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), - cpu_pinning: Default::default(), #[cfg(feature = "metrics")] metrics: Default::default(), } diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 3bb0484..a7300d0 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -1,22 +1,17 @@ use anyhow::Context; use aquatic_common::{ - access_list::update_access_list, - cpu_pinning::{ - glommio::{get_worker_placement, set_affinity_for_util_worker}, - WorkerIndex, - }, - privileges::PrivilegeDropper, - rustls_config::create_rustls_config, - PanicSentinelWatcher, ServerStartInstant, + access_list::update_access_list, privileges::PrivilegeDropper, + rustls_config::create_rustls_config, ServerStartInstant, WorkerType, }; use arc_swap::ArcSwap; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{ - consts::{SIGTERM, SIGUSR1}, - iterator::Signals, +use signal_hook::{consts::SIGUSR1, iterator::Signals}; +use std::{ + sync::Arc, + thread::{sleep, Builder, JoinHandle}, + time::Duration, }; -use std::sync::Arc; use crate::config::Config; @@ -30,32 +25,16 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - - #[cfg(feature = "prometheus")] - if config.metrics.run_prometheus_endpoint { - use metrics_exporter_prometheus::PrometheusBuilder; - - PrometheusBuilder::new() - .with_http_listener(config.metrics.prometheus_endpoint_address) - .install() - .with_context(|| { - format!( - "Install prometheus endpoint on {}", - config.metrics.prometheus_endpoint_address - ) - })?; - } + let mut signals = Signals::new([SIGUSR1])?; let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.swarm_workers; - - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let request_mesh_builder = MeshBuilder::partial( + config.socket_workers + config.swarm_workers, + SHARED_CHANNEL_SIZE, + ); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let opt_tls_config = if config.network.enable_tls { @@ -69,111 +48,134 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let server_start_instant = ServerStartInstant::new(); - let mut executors = Vec::new(); + let mut join_handles = Vec::new(); for i in 0..(config.socket_workers) { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let opt_tls_config = opt_tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let priv_dropper = priv_dropper.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SocketWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name(&format!("socket-{:02}", i + 1)); - - let executor = builder - .spawn(move || async move { - workers::socket::run_socket_worker( - sentinel, - config, - state, - opt_tls_config, - request_mesh_builder, - priv_dropper, - server_start_instant, - i, - ) - .await + let handle = Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + LocalExecutorBuilder::default() + .make() + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))? + .run(workers::socket::run_socket_worker( + config, + state, + opt_tls_config, + request_mesh_builder, + priv_dropper, + server_start_instant, + i, + )) }) - .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + .context("spawn socket worker")?; - executors.push(executor); + join_handles.push((WorkerType::Socket(i), handle)); } for i in 0..(config.swarm_workers) { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SwarmWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name(&format!("swarm-{:02}", i + 1)); - - let executor = builder - .spawn(move || async move { - workers::swarm::run_swarm_worker( - sentinel, - config, - state, - request_mesh_builder, - server_start_instant, - i, - ) - .await + let handle = Builder::new() + .name(format!("swarm-{:02}", i + 1)) + .spawn(move || { + LocalExecutorBuilder::default() + .make() + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))? + .run(workers::swarm::run_swarm_worker( + config, + state, + request_mesh_builder, + server_start_instant, + i, + )) }) - .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + .context("spawn swarm worker")?; - executors.push(executor); + join_handles.push((WorkerType::Swarm(i), handle)); } - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, + #[cfg(feature = "prometheus")] + if config.metrics.run_prometheus_endpoint { + let idle_timeout = config + .cleaning + .connection_cleaning_interval + .max(config.cleaning.torrent_cleaning_interval) + .max(config.metrics.torrent_count_update_interval) + * 2; + + let handle = aquatic_common::spawn_prometheus_endpoint( + config.metrics.prometheus_endpoint_address, + Some(Duration::from_secs(idle_timeout)), + Some(metrics_util::MetricKindMask::GAUGE), )?; + + join_handles.push((WorkerType::Prometheus, handle)); } - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); + // Spawn signal handler thread + { + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); - if let Some(tls_config) = opt_tls_config.as_ref() { - match create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - ) { - Ok(config) => { - tls_config.store(Arc::new(config)); + if let Some(tls_config) = opt_tls_config.as_ref() { + match create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) { + Ok(config) => { + tls_config.store(Arc::new(config)); - ::log::info!("successfully updated tls config"); + ::log::info!("successfully updated tls config"); + } + Err(err) => { + ::log::error!("could not update tls config: {:#}", err) + } + } + } } - Err(err) => ::log::error!("could not update tls config: {:#}", err), + _ => unreachable!(), + } + } + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + } + + loop { + for (i, (_, handle)) in join_handles.iter().enumerate() { + if handle.is_finished() { + let (worker_type, handle) = join_handles.remove(i); + + match handle.join() { + Ok(Ok(())) => { + return Err(anyhow::anyhow!("{} stopped", worker_type)); + } + Ok(Err(err)) => { + return Err(err.context(format!("{} stopped", worker_type))); + } + Err(_) => { + return Err(anyhow::anyhow!("{} panicked", worker_type)); } } } - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } else { - return Ok(()); - } - } - _ => unreachable!(), } - } - Ok(()) + sleep(Duration::from_secs(5)); + } } diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index a289607..0f2fd44 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -10,7 +10,7 @@ use std::time::Duration; use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; +use aquatic_common::{CanonicalSocketAddr, ServerStartInstant}; use arc_swap::ArcSwap; use futures_lite::future::race; use futures_lite::StreamExt; @@ -32,7 +32,6 @@ struct ConnectionHandle { #[allow(clippy::too_many_arguments)] pub async fn run_socket_worker( - _sentinel: PanicSentinel, config: Config, state: State, opt_tls_config: Option>>, @@ -40,13 +39,16 @@ pub async fn run_socket_worker( priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, worker_index: usize, -) { +) -> anyhow::Result<()> { let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); + let listener = create_tcp_listener(&config, priv_dropper).context("create tcp listener")?; - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let (request_senders, _) = request_mesh_builder + .join(Role::Producer) + .await + .map_err(|err| anyhow::anyhow!("join request mesh: {:#}", err))?; let request_senders = Rc::new(request_senders); let connection_handles = Rc::new(RefCell::new(HopSlotMap::with_key())); @@ -145,6 +147,8 @@ pub async fn run_socket_worker( } } } + + Ok(()) } async fn clean_connections( diff --git a/crates/http/src/workers/swarm/mod.rs b/crates/http/src/workers/swarm/mod.rs index d50797e..f8d7f34 100644 --- a/crates/http/src/workers/swarm/mod.rs +++ b/crates/http/src/workers/swarm/mod.rs @@ -11,7 +11,7 @@ use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; -use aquatic_common::{PanicSentinel, ServerStartInstant, ValidUntil}; +use aquatic_common::{ServerStartInstant, ValidUntil}; use crate::common::*; use crate::config::Config; @@ -19,14 +19,16 @@ use crate::config::Config; use self::storage::TorrentMaps; pub async fn run_swarm_worker( - _sentinel: PanicSentinel, config: Config, state: State, request_mesh_builder: MeshBuilder, server_start_instant: ServerStartInstant, worker_index: usize, -) { - let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); +) -> anyhow::Result<()> { + let (_, mut request_receivers) = request_mesh_builder + .join(Role::Consumer) + .await + .map_err(|err| anyhow::anyhow!("join request mesh: {:#}", err))?; let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index))); let access_list = state.access_list; @@ -82,6 +84,8 @@ pub async fn run_swarm_worker( for handle in handles { handle.await; } + + Ok(()) } async fn handle_request_stream( diff --git a/crates/http_load_test/Cargo.toml b/crates/http_load_test/Cargo.toml index 52c942f..b15c260 100644 --- a/crates/http_load_test/Cargo.toml +++ b/crates/http_load_test/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true name = "aquatic_http_load_test" [dependencies] -aquatic_common = { workspace = true, features = ["glommio"] } +aquatic_common.workspace = true aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true diff --git a/crates/http_load_test/src/config.rs b/crates/http_load_test/src/config.rs index b41d58a..a372f68 100644 --- a/crates/http_load_test/src/config.rs +++ b/crates/http_load_test/src/config.rs @@ -1,7 +1,6 @@ use std::net::SocketAddr; use aquatic_common::cli::LogLevel; -use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -25,7 +24,6 @@ pub struct Config { pub keep_alive: bool, pub enable_tls: bool, pub torrents: TorrentConfig, - pub cpu_pinning: CpuPinningConfigDesc, } impl aquatic_common::cli::Config for Config { @@ -47,7 +45,6 @@ impl Default for Config { keep_alive: true, enable_tls: true, torrents: TorrentConfig::default(), - cpu_pinning: Default::default(), } } } diff --git a/crates/http_load_test/src/main.rs b/crates/http_load_test/src/main.rs index 21b881e..176fe66 100644 --- a/crates/http_load_test/src/main.rs +++ b/crates/http_load_test/src/main.rs @@ -3,8 +3,6 @@ use std::thread; use std::time::{Duration, Instant}; use ::glommio::LocalExecutorBuilder; -use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; -use aquatic_common::cpu_pinning::WorkerIndex; use rand::prelude::*; use rand_distr::Gamma; @@ -65,19 +63,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { None }; - for i in 0..config.num_workers { + for _ in 0..config.num_workers { let config = config.clone(); let opt_tls_config = opt_tls_config.clone(); let state = state.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.num_workers, - 0, - WorkerIndex::SocketWorker(i), - )?; - - LocalExecutorBuilder::new(placement) + LocalExecutorBuilder::default() .name("load-test") .spawn(move || async move { run_socket_thread(config, opt_tls_config, state) @@ -87,10 +78,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { .unwrap(); } - if config.cpu_pinning.active { - set_affinity_for_util_worker(&config.cpu_pinning, config.num_workers, 0)?; - } - monitor_statistics(state, &config); Ok(()) diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 3828132..72c1f92 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -20,11 +20,11 @@ name = "aquatic_udp" [features] default = ["prometheus"] # Export prometheus metrics -prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"] +prometheus = ["metrics", "aquatic_common/prometheus"] # Experimental io_uring support (Linux 6.0 or later required) io-uring = ["dep:io-uring"] # Experimental CPU pinning support -cpu-pinning = ["aquatic_common/hwloc"] +cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] aquatic_common.workspace = true @@ -58,9 +58,6 @@ tinytemplate = "1" # prometheus feature metrics = { version = "0.22", optional = true } -metrics-util = { version = "0.16", optional = true } -metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } -tokio = { version = "1", optional = true, features = ["rt", "net", "time"] } # io-uring feature io-uring = { version = "0.6", optional = true } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 639e2a9..cf6a588 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,11 +3,11 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::fmt::Display; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; +use aquatic_common::WorkerType; use crossbeam_channel::{bounded, unbounded}; use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; @@ -162,58 +162,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { #[cfg(feature = "prometheus")] if config.statistics.active() && config.statistics.run_prometheus_endpoint { - let config = config.clone(); - - let handle = Builder::new() - .name("prometheus".into()) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - - use metrics_exporter_prometheus::PrometheusBuilder; - use metrics_util::MetricKindMask; - - let rt = ::tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("build prometheus tokio runtime")?; - - rt.block_on(async { - let (recorder, exporter) = PrometheusBuilder::new() - .idle_timeout( - MetricKindMask::ALL, - Some(Duration::from_secs(config.statistics.interval * 2)), - ) - .with_http_listener(config.statistics.prometheus_endpoint_address) - .build() - .context("build prometheus recorder and exporter")?; - - let recorder_handle = recorder.handle(); - - ::metrics::set_global_recorder(recorder) - .context("set global metrics recorder")?; - - ::tokio::spawn(async move { - let mut interval = ::tokio::time::interval(Duration::from_secs(5)); - - loop { - interval.tick().await; - - // Periodically render metrics to make sure - // idles are cleaned up - recorder_handle.render(); - } - }); - - exporter.await.context("run prometheus exporter") - }) - }) - .with_context(|| "spawn prometheus exporter worker")?; + let handle = aquatic_common::spawn_prometheus_endpoint( + config.statistics.prometheus_endpoint_address, + Some(Duration::from_secs( + config.cleaning.torrent_cleaning_interval * 2, + )), + None, + )?; join_handles.push((WorkerType::Prometheus, handle)); } @@ -279,25 +234,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sleep(Duration::from_secs(5)); } } - -enum WorkerType { - Swarm(usize), - Socket(usize), - Statistics, - Signals, - #[cfg(feature = "prometheus")] - Prometheus, -} - -impl Display for WorkerType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)), - Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), - Self::Statistics => f.write_str("Statistics worker"), - Self::Signals => f.write_str("Signals worker"), - #[cfg(feature = "prometheus")] - Self::Prometheus => f.write_str("Prometheus worker"), - } - } -} diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index e42aad3..2eedf92 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -11,7 +11,7 @@ readme.workspace = true rust-version.workspace = true [features] -cpu-pinning = ["aquatic_common/hwloc"] +cpu-pinning = ["aquatic_common/cpu-pinning"] [lib] name = "aquatic_udp_load_test" diff --git a/crates/ws/Cargo.toml b/crates/ws/Cargo.toml index 662bc61..164b631 100644 --- a/crates/ws/Cargo.toml +++ b/crates/ws/Cargo.toml @@ -19,14 +19,14 @@ name = "aquatic_ws" [features] default = ["prometheus", "mimalloc"] -prometheus = ["metrics", "metrics-exporter-prometheus"] -metrics = ["dep:metrics", "metrics-util"] +prometheus = ["metrics", "aquatic_common/prometheus"] +metrics = ["dep:metrics", "dep:metrics-util"] # Use mimalloc allocator for much better performance. Requires cmake and a # C/C++ compiler mimalloc = ["dep:mimalloc"] [dependencies] -aquatic_common = { workspace = true, features = ["rustls", "glommio"] } +aquatic_common = { workspace = true, features = ["rustls"] } aquatic_peer_id.workspace = true aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true @@ -43,9 +43,6 @@ hashbrown = { version = "0.14", features = ["serde"] } httparse = "1" indexmap = "2" log = "0.4" -metrics = { version = "0.22", optional = true } -metrics-util = { version = "0.16", optional = true } -metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false, optional = true } privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } @@ -58,6 +55,10 @@ slotmap = "1" socket2 = { version = "0.5", features = ["all"] } tungstenite = "0.21" +# metrics feature +metrics = { version = "0.22", optional = true } +metrics-util = { version = "0.16", optional = true } + [dev-dependencies] quickcheck = "1" quickcheck_macros = "1" diff --git a/crates/ws/src/config.rs b/crates/ws/src/config.rs index e2a9376..d6f092e 100644 --- a/crates/ws/src/config.rs +++ b/crates/ws/src/config.rs @@ -1,7 +1,6 @@ use std::net::SocketAddr; use std::path::PathBuf; -use aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::Deserialize; @@ -47,7 +46,6 @@ pub struct Config { pub access_list: AccessListConfig, #[cfg(feature = "metrics")] pub metrics: MetricsConfig, - pub cpu_pinning: CpuPinningConfigAsc, } impl Default for Config { @@ -63,7 +61,6 @@ impl Default for Config { access_list: AccessListConfig::default(), #[cfg(feature = "metrics")] metrics: Default::default(), - cpu_pinning: Default::default(), } } } diff --git a/crates/ws/src/lib.rs b/crates/ws/src/lib.rs index 806267f..5d08534 100644 --- a/crates/ws/src/lib.rs +++ b/crates/ws/src/lib.rs @@ -3,19 +3,15 @@ pub mod config; pub mod workers; use std::sync::Arc; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; -use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; -use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; -use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::{ServerStartInstant, WorkerType}; use arc_swap::ArcSwap; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{ - consts::{SIGTERM, SIGUSR1}, - iterator::Signals, -}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; use aquatic_common::access_list::update_access_list; use aquatic_common::privileges::PrivilegeDropper; @@ -35,45 +31,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { )); } - let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - - #[cfg(feature = "prometheus")] - if config.metrics.run_prometheus_endpoint { - use metrics_exporter_prometheus::PrometheusBuilder; - - let idle_timeout = config - .cleaning - .connection_cleaning_interval - .max(config.cleaning.torrent_cleaning_interval) - .max(config.metrics.torrent_count_update_interval) - * 2; - - PrometheusBuilder::new() - .idle_timeout( - metrics_util::MetricKindMask::GAUGE, - Some(Duration::from_secs(idle_timeout)), - ) - .with_http_listener(config.metrics.prometheus_endpoint_address) - .install() - .with_context(|| { - format!( - "Install prometheus endpoint on {}", - config.metrics.prometheus_endpoint_address - ) - })?; - } + let mut signals = Signals::new([SIGUSR1])?; let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.swarm_workers; + let num_mesh_peers = config.socket_workers + config.swarm_workers; - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); - let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); - let control_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); + let request_mesh_builder = MeshBuilder::partial(num_mesh_peers, SHARED_IN_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_mesh_peers, SHARED_IN_CHANNEL_SIZE * 16); + let control_mesh_builder = MeshBuilder::partial(num_mesh_peers, SHARED_IN_CHANNEL_SIZE * 16); - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let opt_tls_config = if config.network.enable_tls { @@ -98,10 +67,9 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let server_start_instant = ServerStartInstant::new(); - let mut executors = Vec::new(); + let mut join_handles = Vec::new(); for i in 0..(config.socket_workers) { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let opt_tls_config = opt_tls_config.clone(); @@ -110,120 +78,138 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let response_mesh_builder = response_mesh_builder.clone(); let priv_dropper = priv_dropper.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SocketWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name(&format!("socket-{:02}", i + 1)); - - let executor = builder - .spawn(move || async move { - workers::socket::run_socket_worker( - sentinel, - config, - state, - opt_tls_config, - control_mesh_builder, - request_mesh_builder, - response_mesh_builder, - priv_dropper, - server_start_instant, - i, - ) - .await + let handle = Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + LocalExecutorBuilder::default() + .make() + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))? + .run(workers::socket::run_socket_worker( + config, + state, + opt_tls_config, + control_mesh_builder, + request_mesh_builder, + response_mesh_builder, + priv_dropper, + server_start_instant, + i, + )) }) - .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + .context("spawn socket worker")?; - executors.push(executor); + join_handles.push((WorkerType::Socket(i), handle)); } - ::log::info!("spawned socket workers"); - for i in 0..(config.swarm_workers) { - let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let control_mesh_builder = control_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SwarmWorker(i), - )?; - let builder = LocalExecutorBuilder::new(placement).name(&format!("swarm-{:02}", i + 1)); - - let executor = builder - .spawn(move || async move { - workers::swarm::run_swarm_worker( - sentinel, - config, - state, - control_mesh_builder, - request_mesh_builder, - response_mesh_builder, - server_start_instant, - i, - ) - .await + let handle = Builder::new() + .name(format!("swarm-{:02}", i + 1)) + .spawn(move || { + LocalExecutorBuilder::default() + .make() + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))? + .run(workers::swarm::run_swarm_worker( + config, + state, + control_mesh_builder, + request_mesh_builder, + response_mesh_builder, + server_start_instant, + i, + )) }) - .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; + .context("spawn swarm worker")?; - executors.push(executor); + join_handles.push((WorkerType::Socket(i), handle)); } - ::log::info!("spawned swarm workers"); + #[cfg(feature = "prometheus")] + if config.metrics.run_prometheus_endpoint { + let idle_timeout = config + .cleaning + .connection_cleaning_interval + .max(config.cleaning.torrent_cleaning_interval) + .max(config.metrics.torrent_count_update_interval) + * 2; - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, + let handle = aquatic_common::spawn_prometheus_endpoint( + config.metrics.prometheus_endpoint_address, + Some(Duration::from_secs(idle_timeout)), + Some(metrics_util::MetricKindMask::GAUGE), )?; + + join_handles.push((WorkerType::Prometheus, handle)); } - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); + // Spawn signal handler thread + { + let handle: JoinHandle> = Builder::new() + .name("signals".into()) + .spawn(move || { + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); - if let Some(tls_config) = opt_tls_config.as_ref() { - match ::std::fs::read(&config.network.tls_certificate_path) { - Ok(data) if &data == opt_tls_cert_data.as_ref().unwrap() => { - ::log::info!("skipping tls config update: certificate identical to currently loaded"); - } - Ok(data) => { - match create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - ) { - Ok(config) => { - tls_config.store(Arc::new(config)); - opt_tls_cert_data = Some(data); + if let Some(tls_config) = opt_tls_config.as_ref() { + match ::std::fs::read(&config.network.tls_certificate_path) { + Ok(data) if &data == opt_tls_cert_data.as_ref().unwrap() => { + ::log::info!("skipping tls config update: certificate identical to currently loaded"); + } + Ok(data) => { + match create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) { + Ok(config) => { + tls_config.store(Arc::new(config)); + opt_tls_cert_data = Some(data); - ::log::info!("successfully updated tls config"); + ::log::info!("successfully updated tls config"); + } + Err(err) => ::log::error!("could not update tls config: {:#}", err), + } + } + Err(err) => ::log::error!("couldn't read tls certificate file: {:#}", err), } - Err(err) => ::log::error!("could not update tls config: {:#}", err), } } - Err(err) => ::log::error!("couldn't read tls certificate file: {:#}", err), + _ => unreachable!(), + } + } + + Ok(()) + }) + .context("spawn signal worker")?; + + join_handles.push((WorkerType::Signals, handle)); + } + + loop { + for (i, (_, handle)) in join_handles.iter().enumerate() { + if handle.is_finished() { + let (worker_type, handle) = join_handles.remove(i); + + match handle.join() { + Ok(Ok(())) => { + return Err(anyhow::anyhow!("{} stopped", worker_type)); + } + Ok(Err(err)) => { + return Err(err.context(format!("{} stopped", worker_type))); + } + Err(_) => { + return Err(anyhow::anyhow!("{} panicked", worker_type)); } } } - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } else { - return Ok(()); - } - } - _ => unreachable!(), } - } - Ok(()) + sleep(Duration::from_secs(5)); + } } diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index 8d79679..c633d63 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -7,7 +7,7 @@ use std::time::Duration; use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{PanicSentinel, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use aquatic_ws_protocol::common::InfoHash; use aquatic_ws_protocol::incoming::InMessage; use aquatic_ws_protocol::outgoing::OutMessage; @@ -50,7 +50,6 @@ struct ConnectionHandle { #[allow(clippy::too_many_arguments)] pub async fn run_socket_worker( - _sentinel: PanicSentinel, config: Config, state: State, opt_tls_config: Option>>, @@ -60,26 +59,41 @@ pub async fn run_socket_worker( priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, worker_index: usize, -) { +) -> anyhow::Result<()> { #[cfg(feature = "metrics")] WORKER_INDEX.with(|index| index.set(worker_index)); let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); + let listener = create_tcp_listener(&config, priv_dropper).context("create tcp listener")?; ::log::info!("created tcp listener"); let (control_message_senders, _) = control_message_mesh_builder .join(Role::Producer) .await - .unwrap(); - let control_message_senders = Rc::new(control_message_senders); + .map_err(|err| anyhow::anyhow!("join control message mesh: {:#}", err))?; + let (in_message_senders, _) = in_message_mesh_builder + .join(Role::Producer) + .await + .map_err(|err| anyhow::anyhow!("join in message mesh: {:#}", err))?; + let (_, mut out_message_receivers) = out_message_mesh_builder + .join(Role::Consumer) + .await + .map_err(|err| anyhow::anyhow!("join out message mesh: {:#}", err))?; - let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); + let control_message_senders = Rc::new(control_message_senders); let in_message_senders = Rc::new(in_message_senders); + let out_message_consumer_id = ConsumerId( + out_message_receivers + .consumer_id() + .unwrap() + .try_into() + .unwrap(), + ); + let tq_prioritized = executor().create_task_queue( Shares::Static(100), Latency::Matters(Duration::from_millis(1)), @@ -88,16 +102,6 @@ pub async fn run_socket_worker( let tq_regular = executor().create_task_queue(Shares::Static(1), Latency::NotImportant, "regular"); - let (_, mut out_message_receivers) = - out_message_mesh_builder.join(Role::Consumer).await.unwrap(); - let out_message_consumer_id = ConsumerId( - out_message_receivers - .consumer_id() - .unwrap() - .try_into() - .unwrap(), - ); - ::log::info!("joined channels"); let connection_handles = Rc::new(RefCell::new(ConnectionHandles::default())); @@ -114,14 +118,14 @@ pub async fn run_socket_worker( }), tq_prioritized, ) - .unwrap(); + .map_err(|err| anyhow::anyhow!("spawn connection cleaning task: {:#}", err))?; for (_, out_message_receiver) in out_message_receivers.streams() { spawn_local_into( receive_out_messages(out_message_receiver, connection_handles.clone()), tq_regular, ) - .unwrap() + .map_err(|err| anyhow::anyhow!("spawn out message receiving task: {:#}", err))? .detach(); } @@ -197,6 +201,8 @@ pub async fn run_socket_worker( } } } + + Ok(()) } async fn clean_connections( diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index f885b97..419ee56 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -13,7 +13,7 @@ use glommio::prelude::*; use glommio::timer::TimerActionRepeat; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{PanicSentinel, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use crate::common::*; use crate::config::Config; @@ -21,9 +21,7 @@ use crate::SHARED_IN_CHANNEL_SIZE; use self::storage::TorrentMaps; -#[allow(clippy::too_many_arguments)] pub async fn run_swarm_worker( - _sentinel: PanicSentinel, config: Config, state: State, control_message_mesh_builder: MeshBuilder, @@ -31,14 +29,19 @@ pub async fn run_swarm_worker( out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, server_start_instant: ServerStartInstant, worker_index: usize, -) { +) -> anyhow::Result<()> { let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) .await - .unwrap(); - - let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap(); - let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap(); + .map_err(|err| anyhow::anyhow!("join control message mesh: {:#}", err))?; + let (_, mut in_message_receivers) = in_message_mesh_builder + .join(Role::Consumer) + .await + .map_err(|err| anyhow::anyhow!("join in message mesh: {:#}", err))?; + let (out_message_senders, _) = out_message_mesh_builder + .join(Role::Producer) + .await + .map_err(|err| anyhow::anyhow!("join out message mesh: {:#}", err))?; let out_message_senders = Rc::new(out_message_senders); @@ -89,6 +92,8 @@ pub async fn run_swarm_worker( for handle in handles { handle.await; } + + Ok(()) } async fn handle_control_message_stream(torrents: Rc>, mut stream: S) diff --git a/crates/ws_load_test/Cargo.toml b/crates/ws_load_test/Cargo.toml index 90dbe94..932aebf 100644 --- a/crates/ws_load_test/Cargo.toml +++ b/crates/ws_load_test/Cargo.toml @@ -14,7 +14,7 @@ rust-version.workspace = true name = "aquatic_ws_load_test" [dependencies] -aquatic_common = { workspace = true, features = ["glommio"] } +aquatic_common.workspace = true aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true diff --git a/crates/ws_load_test/src/config.rs b/crates/ws_load_test/src/config.rs index 3f3169d..cd759e9 100644 --- a/crates/ws_load_test/src/config.rs +++ b/crates/ws_load_test/src/config.rs @@ -1,7 +1,6 @@ use std::net::SocketAddr; use aquatic_common::cli::LogLevel; -use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -17,7 +16,6 @@ pub struct Config { pub duration: usize, pub measure_after_max_connections_reached: bool, pub torrents: TorrentConfig, - pub cpu_pinning: CpuPinningConfigDesc, } impl aquatic_common::cli::Config for Config { @@ -37,7 +35,6 @@ impl Default for Config { duration: 0, measure_after_max_connections_reached: true, torrents: TorrentConfig::default(), - cpu_pinning: Default::default(), } } } diff --git a/crates/ws_load_test/src/main.rs b/crates/ws_load_test/src/main.rs index 91cd8ff..94e91e8 100644 --- a/crates/ws_load_test/src/main.rs +++ b/crates/ws_load_test/src/main.rs @@ -2,8 +2,6 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; -use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; -use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_ws_protocol::common::InfoHash; use glommio::LocalExecutorBuilder; use rand::prelude::*; @@ -59,19 +57,12 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = create_tls_config().unwrap(); - for i in 0..config.num_workers { + for _ in 0..config.num_workers { let config = config.clone(); let tls_config = tls_config.clone(); let state = state.clone(); - let placement = get_worker_placement( - &config.cpu_pinning, - config.num_workers, - 0, - WorkerIndex::SocketWorker(i), - )?; - - LocalExecutorBuilder::new(placement) + LocalExecutorBuilder::default() .name("load-test") .spawn(move || async move { run_socket_thread(config, tls_config, state).await.unwrap(); @@ -79,10 +70,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { .unwrap(); } - if config.cpu_pinning.active { - set_affinity_for_util_worker(&config.cpu_pinning, config.num_workers, 0)?; - } - monitor_statistics(state, &config); Ok(()) diff --git a/crates/ws_protocol/src/common.rs b/crates/ws_protocol/src/common.rs index 5c0a430..7c429cb 100644 --- a/crates/ws_protocol/src/common.rs +++ b/crates/ws_protocol/src/common.rs @@ -84,9 +84,20 @@ fn serialize_20_bytes(data: &[u8; 20], serializer: S) -> Result