From 3ac12b947f389f66bbea29a42d9e7eaf191560bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 17 Jan 2023 20:27:43 +0100 Subject: [PATCH] ws: add prometheus support (active connections, requests, responses) --- Cargo.lock | 117 ++++++++++++++++++++++++++++++- aquatic_ws/Cargo.toml | 6 ++ aquatic_ws/src/config.rs | 31 ++++++++ aquatic_ws/src/lib.rs | 15 ++++ aquatic_ws/src/workers/socket.rs | 36 +++++++++- scripts/run-aquatic-ws.sh | 2 +- 6 files changed, 201 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4ca6a8..3b4b6b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,8 @@ dependencies = [ "hashbrown 0.13.1", "httparse", "log", + "metrics", + "metrics-exporter-prometheus", "mimalloc", "privdrop", "quickcheck", @@ -1168,7 +1170,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -1230,7 +1232,7 @@ dependencies = [ "scoped-tls", "scopeguard", "signal-hook", - "sketches-ddsketch", + "sketches-ddsketch 0.1.3", "smallvec", "socket2 0.3.19", "tracing", @@ -1507,6 +1509,12 @@ dependencies = [ "memoffset 0.5.6", ] +[[package]] +name = "ipnet" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" + [[package]] name = "itertools" version = "0.10.5" @@ -1606,6 +1614,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "matchit" version = "0.5.0" @@ -1656,6 +1673,63 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849" +dependencies = [ + "ahash 0.7.6", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" +dependencies = [ + "hyper", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "thiserror", + "tokio", +] + +[[package]] +name = "metrics-macros" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "metrics-util" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.12.3", + "metrics", + "num_cpus", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "sketches-ddsketch 0.2.0", +] + [[package]] name = "mimalloc" version = "0.1.34" @@ -1694,7 +1768,7 @@ checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -2100,6 +2174,22 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.2+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quickcheck" version = "1.0.3" @@ -2171,6 +2261,15 @@ dependencies = [ "rand", ] +[[package]] +name = "raw-cpuid" +version = "10.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.6.1" @@ -2478,6 +2577,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee" +[[package]] +name = "sketches-ddsketch" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7" + [[package]] name = "slab" version = "0.4.7" @@ -3080,6 +3185,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index de37067..a3da870 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -16,6 +16,10 @@ name = "aquatic_ws" [[bin]] name = "aquatic_ws" +[features] +prometheus = ["metrics", "metrics-exporter-prometheus"] +metrics = ["dep:metrics"] + [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } aquatic_toml_config.workspace = true @@ -31,6 +35,8 @@ glommio = "0.7" hashbrown = { version = "0.13", features = ["serde"] } httparse = "1" log = "0.4" +metrics = { version = "0.20", optional = true } +metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 2964459..e222d89 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -28,6 +28,8 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, + #[cfg(feature = "metrics")] + pub metrics: MetricsConfig, pub cpu_pinning: CpuPinningConfigAsc, } @@ -42,6 +44,8 @@ impl Default for Config { cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), + #[cfg(feature = "metrics")] + metrics: Default::default(), cpu_pinning: Default::default(), } } @@ -142,6 +146,33 @@ impl Default for CleaningConfig { } } +#[cfg(feature = "metrics")] +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct MetricsConfig { + /// Run a prometheus endpoint + pub run_prometheus_endpoint: bool, + /// Address to run prometheus endpoint on + pub prometheus_endpoint_address: SocketAddr, +} + +#[cfg(feature = "metrics")] +impl Default for MetricsConfig { + fn default() -> Self { + Self { + run_prometheus_endpoint: false, + prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + } + } +} + +#[cfg(feature = "metrics")] +impl MetricsConfig { + pub fn active(&self) -> bool { + self.run_prometheus_endpoint + } +} + #[cfg(test)] mod tests { use super::Config; diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index a7c2e83..1c296c8 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -35,6 +35,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + #[cfg(feature = "prometheus")] + if config.metrics.run_prometheus_endpoint { + use metrics_exporter_prometheus::PrometheusBuilder; + + PrometheusBuilder::new() + .with_http_listener(config.metrics.prometheus_endpoint_address) + .install() + .with_context(|| { + format!( + "Install prometheus endpoint on {}", + config.metrics.prometheus_endpoint_address + ) + })?; + } + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index fc47cac..460d7a8 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -152,6 +152,9 @@ pub async fn run_socket_worker( ::log::trace!("accepting stream, assigning id {}", key); let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { + #[cfg(feature = "metrics")] + ::metrics::increment_gauge!("aquatic_active_connections", 1.0); + if let Err(err) = run_connection( config.clone(), access_list, @@ -173,6 +176,9 @@ pub async fn run_socket_worker( // Clean up after closed connection + #[cfg(feature = "metrics")] + ::metrics::decrement_gauge!("aquatic_active_connections", 1.0); + // Remove reference in separate statement to avoid // multiple RefCell borrows let opt_reference = connection_slab.borrow_mut().try_remove(key); @@ -501,6 +507,9 @@ impl ConnectionReader { async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> { match in_message { InMessage::AnnounceRequest(announce_request) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_requests_total", "type" => "announce"); + let info_hash = announce_request.info_hash; if self @@ -571,6 +580,9 @@ impl ConnectionReader { } } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_requests_total", "type" => "scrape"); + let info_hashes = if let Some(info_hashes) = info_hashes { info_hashes } else { @@ -642,10 +654,18 @@ impl ConnectionReader { info_hash, }); - self.out_message_sender + let result = self + .out_message_sender .send((self.make_connection_meta(None).into(), out_message)) .await - .map_err(|err| anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err)) + .map_err(|err| { + anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err) + }); + + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_responses_total", "type" => "error"); + + result } fn make_connection_meta(&self, pending_scrape_id: Option) -> InMessageMeta { @@ -728,6 +748,18 @@ impl ConnectionWriter { match result { Ok(Ok(())) => { + #[cfg(feature = "metrics")] + let out_message_type = match &out_message { + OutMessage::Offer(_) => "offer", + OutMessage::Answer(_) => "offer_answer", + OutMessage::AnnounceResponse(_) => "announce", + OutMessage::ScrapeResponse(_) => "scrape", + OutMessage::ErrorResponse(_) => "error", + }; + + #[cfg(feature = "metrics")] + ::metrics::increment_counter!("aquatic_responses_total", "type" => out_message_type); + self.connection_slab .borrow_mut() .get_mut(self.connection_id.0) diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 298a3d3..d6681ce 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --profile "release-debug" -p aquatic_ws -- $@ +cargo run --profile "release-debug" -p aquatic_ws --features "prometheus" -- $@