diff --git a/CHANGELOG.md b/CHANGELOG.md index 7920167..3bdd65c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,10 @@ ### aquatic_http +#### Added + +* Support exposing a Prometheus endpoint for metrics + #### Changed * Don't return any response peers if announce event is stopped @@ -74,6 +78,7 @@ #### Added * Add HTTP health check route when running without TLS +* Support exposing a Prometheus endpoint for metrics #### Changed diff --git a/Cargo.lock b/Cargo.lock index 3b4b6b1..7e0fc45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,7 +77,7 @@ dependencies = [ "duplicate", "git-testament", "glommio", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "hex", "hwloc", "indexmap", @@ -111,6 +111,8 @@ dependencies = [ "libc", "log", "memchr", + "metrics", + "metrics-exporter-prometheus", "mimalloc", "once_cell", "privdrop", @@ -136,7 +138,7 @@ dependencies = [ "futures-lite", "futures-rustls", "glommio", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "log", "mimalloc", "quickcheck", @@ -226,7 +228,7 @@ dependencies = [ "constant_time_eq", "crossbeam-channel", "getrandom", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "hdrhistogram", "hex", "libc", @@ -271,7 +273,7 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "mimalloc", "mio", "quickcheck", @@ -306,7 +308,7 @@ dependencies = [ "futures-lite", "futures-rustls", "glommio", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "httparse", "log", "metrics", @@ -355,7 +357,7 @@ version = "0.2.0" dependencies = [ "anyhow", "criterion", - "hashbrown 0.13.1", + "hashbrown 0.13.2", "quickcheck", "quickcheck_macros", "serde", @@ -590,9 +592,9 @@ checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5" [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "byteorder" @@ -663,9 +665,9 @@ dependencies = [ [[package]] name = "console" -version = "0.15.4" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9b6515d269224923b26b5febea2ed42b2d5f2ce37284a4dd670fedd6cb8347a" +checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" dependencies = [ "encode_unicode", "lazy_static", @@ -706,9 +708,9 @@ dependencies = [ [[package]] name = "crc-catalog" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" [[package]] name = "crc32fast" @@ -1260,7 +1262,7 @@ version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e2a3c70a9c00cc1ee87b54e89f9505f73bb17d63f1b25c9a462ba8ef885444f" dependencies = [ - "hashbrown 0.13.1", + "hashbrown 0.13.2", "serde", ] @@ -1281,9 +1283,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ "ahash 0.8.2", "serde", @@ -1481,9 +1483,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.2" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4295cbb7573c16d310e99e713cf9e75101eb190ab31fccd35f2d2691b4352b19" +checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" dependencies = [ "console", "number_prefix", @@ -1796,9 +1798,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" dependencies = [ "bitflags 1.3.2", "cfg-if", @@ -1816,9 +1818,9 @@ checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" [[package]] name = "nom" -version = "7.1.2" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ "memchr", "minimal-lexical", @@ -1931,9 +1933,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "object" -version = "0.30.1" +version = "0.30.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d864c91689fdc196779b98dba0aceac6118594c2df6ee5d943eb6a8df4d107a" +checksum = "2b8c786513eb403643f2a88c244c2aaa270ef2153f55094587d0c48a3cf22a83" dependencies = [ "memchr", ] @@ -1980,7 +1982,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.5", + "parking_lot_core 0.9.6", ] [[package]] @@ -1999,9 +2001,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" dependencies = [ "cfg-if", "libc", @@ -2138,7 +2140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81ed9e5437d82d5f2cde999a21571474c5f09b3d76e33eab94bf0e8e42a4fd96" dependencies = [ "libc", - "nix 0.26.1", + "nix 0.26.2", ] [[package]] @@ -2167,9 +2169,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] @@ -2376,9 +2378,9 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] name = "rustls" -version = "0.20.7" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" dependencies = [ "log", "ring", @@ -2679,9 +2681,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87e292b4291f154971a43c3774364e2cbcaec599d3f5bf6fa9d122885dbc38a" +checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" dependencies = [ "itertools", "nom", @@ -2901,9 +2903,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.24.1" +version = "1.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae" +checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" dependencies = [ "autocfg", "bytes", @@ -3340,45 +3342,45 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" [[package]] name = "windows_aarch64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" [[package]] name = "windows_i686_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" [[package]] name = "windows_i686_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" [[package]] name = "windows_x86_64_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" [[package]] name = "windows_x86_64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" [[package]] name = "zeroize" diff --git a/README.md b/README.md index 25dae04..73fd72e 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,26 @@ parsing fails, the program exits. Later failures result in in emitting of an error-level log message, while successful updates of the access list result in emitting of an info-level log message. +#### Prometheus + +`aquatic_http` and `aquatic_ws` support exporting [Prometheus](https://prometheus.io/) metrics. + +Pass the `prometheus` feature when building: + +```sh +. ./scripts/env-native-cpu-without-avx-512 +cargo build --release -p aquatic_ws --features "prometheus" +cargo build --release -p aquatic_http --features "prometheus" +``` + +Then activate the prometheus endpoint in the configuration file: + +```toml +[metrics] +run_prometheus_endpoint = true +prometheus_endpoint_address = "0.0.0.0:9000" +``` + ### Running If you're running `aquatic_http` or `aquatic_ws`, please make sure locked memory @@ -232,29 +252,6 @@ proxied to IPv4 requests, and IPv6 requests to IPv6 requests. More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf). Please note that request workers have been renamed to swarm workers. -#### Prometheus - -`aquatic_ws` supports exporting [Prometheus](https://prometheus.io/) metrics. - -Pass the `prometheus` feature when building: - -```sh -. ./scripts/env-native-cpu-without-avx-512 -cargo build --release -p aquatic_ws --features "prometheus" -``` - -Then activate the prometheus endpoint in the configuration file: - -```toml -[metrics] -# Run a prometheus endpoint -run_prometheus_endpoint = true -# Address to run prometheus endpoint on -prometheus_endpoint_address = "0.0.0.0:9000" -# Update metrics for torrent count this often (seconds) -torrent_count_update_interval = 10 -``` - ## Load testing There are load test binaries for all protocols. They use a CLI structure diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index ae5a674..d160bea 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -16,6 +16,10 @@ name = "aquatic_http" [[bin]] name = "aquatic_http" +[features] +prometheus = ["metrics", "metrics-exporter-prometheus"] +metrics = ["dep:metrics"] + [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } aquatic_http_protocol.workspace = true @@ -31,6 +35,8 @@ glommio = "0.7" itoa = "1" libc = "0.2" log = "0.4" +metrics = { version = "0.20", optional = true } +metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } memchr = "2" privdrop = "0.5" diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 995d51c..7b66866 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -29,6 +29,8 @@ pub struct Config { pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, pub cpu_pinning: CpuPinningConfigAsc, + #[cfg(feature = "metrics")] + pub metrics: MetricsConfig, } impl Default for Config { @@ -43,6 +45,8 @@ impl Default for Config { privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), cpu_pinning: Default::default(), + #[cfg(feature = "metrics")] + metrics: Default::default(), } } } @@ -128,6 +132,29 @@ impl Default for CleaningConfig { } } +#[cfg(feature = "metrics")] +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct MetricsConfig { + /// Run a prometheus endpoint + pub run_prometheus_endpoint: bool, + /// Address to run prometheus endpoint on + pub prometheus_endpoint_address: SocketAddr, + /// Update metrics for torrent count this often (seconds) + pub torrent_count_update_interval: u64, +} + +#[cfg(feature = "metrics")] +impl Default for MetricsConfig { + fn default() -> Self { + Self { + run_prometheus_endpoint: false, + prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + torrent_count_update_interval: 10, + } + } +} + #[cfg(test)] mod tests { use super::Config; diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 72a9656..7f9fc65 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use aquatic_common::{ access_list::update_access_list, cpu_pinning::{ @@ -30,6 +31,21 @@ const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + #[cfg(feature = "prometheus")] + if config.metrics.run_prometheus_endpoint { + use metrics_exporter_prometheus::PrometheusBuilder; + + PrometheusBuilder::new() + .with_http_listener(config.metrics.prometheus_endpoint_address) + .install() + .with_context(|| { + format!( + "Install prometheus endpoint on {}", + config.metrics.prometheus_endpoint_address + ) + })?; + } + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; @@ -76,6 +92,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_mesh_builder, priv_dropper, server_start_instant, + i, ) .await }) @@ -106,6 +123,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { state, request_mesh_builder, server_start_instant, + i, ) .await }) diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 927c53f..3c1ac49 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -39,6 +39,9 @@ const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: "; const RESPONSE_HEADER_B: &[u8] = b" "; const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n"; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + static RESPONSE_HEADER: Lazy> = Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat()); @@ -60,7 +63,11 @@ pub async fn run_socket_worker( request_mesh_builder: MeshBuilder, priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let config = Rc::new(config); let access_list = state.access_list; @@ -93,16 +100,49 @@ pub async fn run_socket_worker( }); let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { - if let Err(err) = Connection::run( - config, - access_list, - request_senders, - server_start_instant, - ConnectionId(key), - tls_config, - connection_slab.clone(), - stream - ).await { + let result = match stream.peer_addr() { + Ok(peer_addr) => { + let peer_addr = CanonicalSocketAddr::new(peer_addr); + + #[cfg(feature = "metrics")] + let ip_version_str = peer_addr_to_ip_version_str(&peer_addr); + + #[cfg(feature = "metrics")] + ::metrics::increment_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_str, + "worker_index" => worker_index.to_string(), + ); + + let result = Connection::run( + config, + access_list, + request_senders, + server_start_instant, + ConnectionId(key), + tls_config, + connection_slab.clone(), + stream, + peer_addr + ).await; + + #[cfg(feature = "metrics")] + ::metrics::decrement_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_str, + "worker_index" => worker_index.to_string(), + ); + + result + } + Err(err) => { + Err(anyhow::anyhow!("Couldn't get peer addr: {:?}", err)) + } + }; + + if let Err(err) = result { ::log::debug!("Connection::run() error: {:?}", err); } @@ -171,12 +211,8 @@ impl Connection { tls_config: Arc, connection_slab: Rc>>, stream: TcpStream, + peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { - let peer_addr = stream - .peer_addr() - .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; - let peer_addr = CanonicalSocketAddr::new(peer_addr); - let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; @@ -288,6 +324,14 @@ impl Connection { match request { Request::Announce(request) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + let info_hash = request.info_hash; if self @@ -327,6 +371,14 @@ impl Connection { } } Request::Scrape(ScrapeRequest { info_hashes }) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); for info_hash in info_hashes.into_iter() { @@ -454,6 +506,22 @@ impl Connection { self.stream.write(&self.response_buffer[..position]).await?; self.stream.flush().await?; + #[cfg(feature = "metrics")] + { + let response_type = match response { + Response::Announce(_) => "announce", + Response::Scrape(_) => "scrape", + Response::Failure(_) => "error", + }; + + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => response_type, + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } + Ok(()) } } @@ -496,3 +564,12 @@ fn create_tcp_listener( Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } + +#[cfg(feature = "metrics")] +fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str { + if addr.is_ipv4() { + "4" + } else { + "6" + } +} diff --git a/aquatic_http/src/workers/swarm.rs b/aquatic_http/src/workers/swarm.rs index e1b056d..76893ad 100644 --- a/aquatic_http/src/workers/swarm.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -27,10 +27,26 @@ use aquatic_http_protocol::response::*; use crate::common::*; use crate::config::Config; -pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } -impl Ip for Ipv4Addr {} -impl Ip for Ipv6Addr {} +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { + #[cfg(feature = "metrics")] + fn ip_version_str() -> &'static str; +} + +impl Ip for Ipv4Addr { + #[cfg(feature = "metrics")] + fn ip_version_str() -> &'static str { + "4" + } +} +impl Ip for Ipv6Addr { + #[cfg(feature = "metrics")] + fn ip_version_str() -> &'static str { + "6" + } +} #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum PeerStatus { @@ -59,8 +75,8 @@ impl PeerStatus { pub struct Peer { pub ip_address: I, pub port: u16, - pub status: PeerStatus, pub valid_until: ValidUntil, + pub seeder: bool, } impl Peer { @@ -83,7 +99,6 @@ pub type PeerMap = IndexMap, Peer>; pub struct TorrentData { pub peers: PeerMap, pub num_seeders: usize, - pub num_leechers: usize, } impl Default for TorrentData { @@ -92,11 +107,16 @@ impl Default for TorrentData { Self { peers: Default::default(), num_seeders: 0, - num_leechers: 0, } } } +impl TorrentData { + fn num_leechers(&self) -> usize { + self.peers.len() - self.num_seeders + } +} + pub type TorrentMap = AmortizedIndexMap>; #[derive(Default)] @@ -126,6 +146,8 @@ impl TorrentMaps { torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, ) { + let mut total_num_peers = 0; + torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -135,29 +157,32 @@ impl TorrentMaps { } let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { let keep = peer.valid_until.valid(now); - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; + if (!keep) & peer.seeder { + *num_seeders -= 1; } keep }); + total_num_peers += torrent_data.peers.len() as u64; + !torrent_data.peers.is_empty() }); + let total_num_peers = total_num_peers as f64; + + #[cfg(feature = "metrics")] + ::metrics::gauge!( + "aquatic_peers", + total_num_peers, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + torrent_map.shrink_to_fit(); } } @@ -168,7 +193,11 @@ pub async fn run_swarm_worker( state: State, request_mesh_builder: MeshBuilder, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); @@ -198,6 +227,29 @@ pub async fn run_swarm_worker( })() })); + // Periodically update torrent count metrics + #[cfg(feature = "metrics")] + TimerActionRepeat::repeat(enclose!((config, torrents) move || { + enclose!((config, torrents, worker_index) move || async move { + let torrents = torrents.borrow_mut(); + + ::metrics::gauge!( + "aquatic_torrents", + torrents.ipv4.len() as f64, + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ); + ::metrics::gauge!( + "aquatic_torrents", + torrents.ipv6.len() as f64, + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ); + + Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) + })() + })); + let mut handles = Vec::new(); for (_, receiver) in request_receivers.streams() { @@ -337,13 +389,6 @@ pub fn upsert_peer_and_get_response_peers( let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); - let peer = Peer { - ip_address: peer_ip_address, - port: request.port, - status: peer_status, - valid_until, - }; - let ip_or_key = request .key .map(Either::Right) @@ -356,24 +401,51 @@ pub fn upsert_peer_and_get_response_peers( let opt_removed_peer = match peer_status { PeerStatus::Leeching => { - torrent_data.num_leechers += 1; + let peer = Peer { + ip_address: peer_ip_address, + port: request.port, + valid_until, + seeder: false, + }; torrent_data.peers.insert(peer_map_key.clone(), peer) } PeerStatus::Seeding => { torrent_data.num_seeders += 1; + let peer = Peer { + ip_address: peer_ip_address, + port: request.port, + valid_until, + seeder: true, + }; + torrent_data.peers.insert(peer_map_key.clone(), peer) } PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), }; - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; + if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { + torrent_data.num_seeders -= 1; + } + + #[cfg(feature = "metrics")] + match peer_status { + PeerStatus::Stopped if opt_removed_peer.is_some() => { + ::metrics::decrement_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; + PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { + ::metrics::increment_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } _ => {} } @@ -397,7 +469,7 @@ pub fn upsert_peer_and_get_response_peers( ( torrent_data.num_seeders, - torrent_data.num_leechers, + torrent_data.num_leechers(), response_peers, ) } @@ -427,7 +499,7 @@ pub fn handle_scrape_request( let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, + incomplete: torrent_data.num_leechers(), }; response.files.insert(info_hash, stats); @@ -439,7 +511,7 @@ pub fn handle_scrape_request( let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, + incomplete: torrent_data.num_leechers(), }; response.files.insert(info_hash, stats); diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 5dfc0d0..7e3c017 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -102,22 +102,23 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { let statistics = state.statistics.as_ref(); - let responses_announce = - statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64; + let responses_announce = statistics + .responses_announce + .fetch_and(0, Ordering::Relaxed) as f64; // let response_peers = statistics.response_peers // .fetch_and(0, Ordering::SeqCst) as f64; let requests_per_second = - statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.requests.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_scrape_per_second = - statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.responses_scrape.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_failure_per_second = - statistics.responses_failure.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.responses_failure.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let bytes_sent_per_second = - statistics.bytes_sent.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let bytes_received_per_second = - statistics.bytes_received.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64; let responses_announce_per_second = responses_announce / interval_f64; diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 480d422..f4d718d 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -230,6 +230,11 @@ impl Connection { } } + self.load_test_state + .statistics + .bytes_received + .fetch_add(interesting_bytes.len(), Ordering::Relaxed); + break; } Err(err) => { diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 1c296c8..1263175 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -109,6 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { response_mesh_builder, priv_dropper, server_start_instant, + i, ) .await }) @@ -145,6 +146,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_mesh_builder, response_mesh_builder, server_start_instant, + i, ) .await }) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index a63d7df..f0c76ae 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -34,6 +34,9 @@ use crate::common::*; const LOCAL_CHANNEL_SIZE: usize = 16; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + struct PendingScrapeResponse { pending_worker_out_messages: usize, stats: HashMap, @@ -60,7 +63,11 @@ pub async fn run_socket_worker( out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let config = Rc::new(config); let access_list = state.access_list; @@ -156,7 +163,8 @@ pub async fn run_socket_worker( ::metrics::increment_gauge!( "aquatic_active_connections", 1.0, - "ip_version" => ip_version_to_metrics_str(ip_version) + "ip_version" => ip_version_to_metrics_str(ip_version), + "worker_index" => worker_index.to_string(), ); if let Err(err) = run_connection( @@ -184,7 +192,8 @@ pub async fn run_socket_worker( ::metrics::decrement_gauge!( "aquatic_active_connections", 1.0, - "ip_version" => ip_version_to_metrics_str(ip_version) + "ip_version" => ip_version_to_metrics_str(ip_version), + "worker_index" => worker_index.to_string(), ); // Remove reference in separate statement to avoid @@ -520,7 +529,8 @@ impl ConnectionReader { ::metrics::increment_counter!( "aquatic_requests_total", "type" => "announce", - "ip_version" => ip_version_to_metrics_str(self.ip_version) + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); let info_hash = announce_request.info_hash; @@ -597,7 +607,8 @@ impl ConnectionReader { ::metrics::increment_counter!( "aquatic_requests_total", "type" => "scrape", - "ip_version" => ip_version_to_metrics_str(self.ip_version) + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); let info_hashes = if let Some(info_hashes) = info_hashes { @@ -681,9 +692,10 @@ impl ConnectionReader { #[cfg(feature = "metrics")] ::metrics::increment_counter!( - "aquatic_requests_total", + "aquatic_responses_total", "type" => "error", - "ip_version" => ip_version_to_metrics_str(self.ip_version) + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); result @@ -784,6 +796,7 @@ impl ConnectionWriter { "aquatic_responses_total", "type" => out_message_type, "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); } @@ -805,7 +818,7 @@ impl ConnectionWriter { } Ok(Err(err)) => Err(err.into()), Err(err) => { - ::log::debug!("send_out_message: sending to peer took to long: {}", err); + ::log::debug!("send_out_message: sending to peer took too long: {}", err); Ok(()) } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 362acc8..21806a3 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -22,6 +22,9 @@ use crate::common::*; use crate::config::Config; use crate::SHARED_IN_CHANNEL_SIZE; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + #[derive(PartialEq, Eq, Clone, Copy, Debug)] enum PeerStatus { Seeding, @@ -145,7 +148,12 @@ impl TorrentMaps { let total_num_peers = total_num_peers as f64; #[cfg(feature = "metrics")] - ::metrics::gauge!("aquatic_peers", total_num_peers, "ip_version" => ip_version); + ::metrics::gauge!( + "aquatic_peers", + total_num_peers, + "ip_version" => ip_version, + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } } @@ -157,7 +165,11 @@ pub async fn run_swarm_worker( in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) .await @@ -189,12 +201,14 @@ pub async fn run_swarm_worker( ::metrics::gauge!( "aquatic_torrents", torrents.ipv4.len() as f64, - "ip_version" => "4" + "ip_version" => "4", + "worker_index" => worker_index.to_string(), ); ::metrics::gauge!( "aquatic_torrents", torrents.ipv6.len() as f64, - "ip_version" => "6" + "ip_version" => "6", + "worker_index" => worker_index.to_string(), ); Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) @@ -385,13 +399,29 @@ fn handle_announce_request( PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id), }; - if let Some(removed_peer) = opt_removed_peer { - if removed_peer.seeder { - torrent_data.num_seeders -= 1; + if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { + torrent_data.num_seeders -= 1; + } + + #[cfg(feature = "metrics")] + match peer_status { + PeerStatus::Stopped if opt_removed_peer.is_some() => { + ::metrics::decrement_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => ip_version, + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } - } else { - #[cfg(feature = "metrics")] - ::metrics::increment_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version); + PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { + ::metrics::increment_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => ip_version, + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } + _ => {} } } diff --git a/scripts/run-aquatic-http.sh b/scripts/run-aquatic-http.sh index 492f9be..26943da 100755 --- a/scripts/run-aquatic-http.sh +++ b/scripts/run-aquatic-http.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --profile "release-debug" -p aquatic_http -- $@ +cargo run --profile "release-debug" -p aquatic_http --features "prometheus" -- $@