diff --git a/Cargo.lock b/Cargo.lock index 53a4b60..adfcd6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,8 +194,8 @@ dependencies = [ "libc", "log", "memchr", - "metrics 0.21.1", - "metrics-exporter-prometheus 0.12.2", + "metrics", + "metrics-exporter-prometheus", "mimalloc", "once_cell", "privdrop", @@ -306,9 +306,9 @@ dependencies = [ "io-uring", "libc", "log", - "metrics 0.22.0", - "metrics-exporter-prometheus 0.13.0", - "metrics-util 0.16.0", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "mio", "num-format", @@ -376,9 +376,9 @@ dependencies = [ "httparse", "indexmap 2.1.0", "log", - "metrics 0.22.0", - "metrics-exporter-prometheus 0.13.0", - "metrics-util 0.16.0", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "privdrop", "quickcheck", @@ -1696,15 +1696,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "mach2" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" -dependencies = [ - "libc", -] - [[package]] name = "memchr" version = "2.7.1" @@ -1738,17 +1729,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metrics" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" -dependencies = [ - "ahash 0.8.7", - "metrics-macros", - "portable-atomic", -] - [[package]] name = "metrics" version = "0.22.0" @@ -1759,23 +1739,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "metrics-exporter-prometheus" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d4fa7ce7c4862db464a37b0b31d89bca874562f034bd7993895572783d02950" -dependencies = [ - "base64", - "hyper", - "indexmap 1.9.3", - "ipnet", - "metrics 0.21.1", - "metrics-util 0.15.1", - "quanta 0.11.1", - "thiserror", - "tokio", -] - [[package]] name = "metrics-exporter-prometheus" version = "0.13.0" @@ -1786,39 +1749,13 @@ dependencies = [ "hyper", "indexmap 1.9.3", "ipnet", - "metrics 0.22.0", - "metrics-util 0.16.0", - "quanta 0.12.2", + "metrics", + "metrics-util", + "quanta", "thiserror", "tokio", ] -[[package]] -name = "metrics-macros" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.48", -] - -[[package]] -name = "metrics-util" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", - "hashbrown 0.13.1", - "metrics 0.21.1", - "num_cpus", - "quanta 0.11.1", - "sketches-ddsketch 0.2.1", -] - [[package]] name = "metrics-util" version = "0.16.0" @@ -1830,10 +1767,10 @@ dependencies = [ "crossbeam-utils", "hashbrown 0.13.1", "indexmap 1.9.3", - "metrics 0.22.0", + "metrics", "num_cpus", "ordered-float", - "quanta 0.12.2", + "quanta", "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -2199,22 +2136,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quanta" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" -dependencies = [ - "crossbeam-utils", - "libc", - "mach2", - "once_cell", - "raw-cpuid 10.7.0", - "wasi", - "web-sys", - "winapi 0.3.9", -] - [[package]] name = "quanta" version = "0.12.2" @@ -2224,7 +2145,7 @@ dependencies = [ "crossbeam-utils", "libc", "once_cell", - "raw-cpuid 11.0.1", + "raw-cpuid", "wasi", "web-sys", "winapi 0.3.9", @@ -2311,15 +2232,6 @@ dependencies = [ "rand", ] -[[package]] -name = "raw-cpuid" -version = "10.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "raw-cpuid" version = "11.0.1" diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 60930a8..8c0a33b 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -39,8 +39,8 @@ httparse = "1" itoa = "1" libc = "0.2" log = "0.4" -metrics = { version = "0.21", optional = true } -metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } +metrics = { version = "0.22", optional = true } +metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } memchr = "2" privdrop = "0.5" diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs index d44508a..4ab82ab 100644 --- a/crates/http/src/workers/socket/connection.rs +++ b/crates/http/src/workers/socket/connection.rs @@ -26,9 +26,9 @@ use once_cell::sync::Lazy; use crate::common::*; use crate::config::Config; -use super::request::{parse_request, RequestParseError}; #[cfg(feature = "metrics")] -use super::{peer_addr_to_ip_version_str, WORKER_INDEX}; +use super::peer_addr_to_ip_version_str; +use super::request::{parse_request, RequestParseError}; const REQUEST_BUFFER_SIZE: usize = 2048; const RESPONSE_BUFFER_SIZE: usize = 4096; @@ -67,6 +67,7 @@ pub enum ConnectionError { Other(#[from] anyhow::Error), } +#[allow(clippy::too_many_arguments)] pub(super) async fn run_connection( config: Rc, access_list: Arc, @@ -75,6 +76,7 @@ pub(super) async fn run_connection( opt_tls_config: Option>>, valid_until: Rc>, stream: TcpStream, + worker_index: usize, ) -> Result<(), ConnectionError> { let access_list_cache = create_access_list_cache(&access_list); let request_buffer = Box::new([0u8; REQUEST_BUFFER_SIZE]); @@ -114,6 +116,7 @@ pub(super) async fn run_connection( request_buffer_position: 0, response_buffer, stream, + worker_index_string: worker_index.to_string(), }; conn.run().await @@ -130,6 +133,7 @@ pub(super) async fn run_connection( request_buffer_position: 0, response_buffer, stream, + worker_index_string: worker_index.to_string(), }; conn.run().await @@ -148,6 +152,7 @@ struct Connection { request_buffer_position: usize, response_buffer: Box<[u8; RESPONSE_BUFFER_SIZE]>, stream: S, + worker_index_string: String, } impl Connection @@ -244,12 +249,13 @@ where match request { Request::Announce(request) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_requests_total", "type" => "announce", "ip_version" => peer_addr_to_ip_version_str(&peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + "worker_index" => self.worker_index_string.clone(), + ) + .increment(1); let info_hash = request.info_hash; @@ -291,12 +297,13 @@ where } Request::Scrape(ScrapeRequest { info_hashes }) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_requests_total", "type" => "scrape", "ip_version" => peer_addr_to_ip_version_str(&peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + "worker_index" => self.worker_index_string.clone(), + ) + .increment(1); let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); @@ -438,12 +445,13 @@ where .opt_peer_addr .expect("peer addr should already have been extracted by now"); - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_responses_total", "type" => response_type, "ip_version" => peer_addr_to_ip_version_str(&peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + "worker_index" => self.worker_index_string.clone(), + ) + .increment(1); } Ok(()) diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index 025b4a4..a289607 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -25,9 +25,6 @@ use crate::common::*; use crate::config::Config; use crate::workers::socket::connection::{run_connection, ConnectionError}; -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - struct ConnectionHandle { close_conn_sender: LocalSender<()>, valid_until: Rc>, @@ -44,9 +41,6 @@ pub async fn run_socket_worker( server_start_instant: ServerStartInstant, worker_index: usize, ) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - let config = Rc::new(config); let access_list = state.access_list; @@ -93,12 +87,14 @@ pub async fn run_socket_worker( ) async move { #[cfg(feature = "metrics")] - ::metrics::increment_gauge!( + let active_connections_gauge = ::metrics::gauge!( "aquatic_active_connections", - 1.0, "worker_index" => worker_index.to_string(), ); + #[cfg(feature = "metrics")] + active_connections_gauge.increment(1.0); + let f1 = async { run_connection( config, access_list, @@ -107,6 +103,7 @@ pub async fn run_socket_worker( opt_tls_config, valid_until.clone(), stream, + worker_index, ).await }; let f2 = async { @@ -118,11 +115,7 @@ pub async fn run_socket_worker( let result = race(f1, f2).await; #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!( - "aquatic_active_connections", - 1.0, - "worker_index" => worker_index.to_string(), - ); + active_connections_gauge.decrement(1.0); match result { Ok(()) => (), diff --git a/crates/http/src/workers/swarm/mod.rs b/crates/http/src/workers/swarm/mod.rs index c93c028..5df1c46 100644 --- a/crates/http/src/workers/swarm/mod.rs +++ b/crates/http/src/workers/swarm/mod.rs @@ -18,9 +18,6 @@ use crate::config::Config; use self::storage::TorrentMaps; -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, @@ -29,12 +26,9 @@ pub async fn run_swarm_worker( server_start_instant: ServerStartInstant, worker_index: usize, ) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index))); let access_list = state.access_list; // Periodically clean torrents @@ -69,16 +63,14 @@ pub async fn run_swarm_worker( ::metrics::gauge!( "aquatic_torrents", - torrents.ipv4.len() as f64, "ip_version" => "4", "worker_index" => worker_index.to_string(), - ); + ).set(torrents.ipv4.len() as f64); ::metrics::gauge!( "aquatic_torrents", - torrents.ipv6.len() as f64, "ip_version" => "6", "worker_index" => worker_index.to_string(), - ); + ).set(torrents.ipv6.len() as f64); Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) })() diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index 3bf17de..b49b1ea 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -16,7 +16,6 @@ use aquatic_http_protocol::response::*; use crate::config::Config; #[cfg(feature = "metrics")] -use crate::workers::swarm::WORKER_INDEX; pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { #[cfg(feature = "metrics")] @@ -36,13 +35,35 @@ impl Ip for Ipv6Addr { } } -#[derive(Default)] pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, + #[cfg(feature = "metrics")] + pub ipv4_peer_gauge: metrics::Gauge, + #[cfg(feature = "metrics")] + pub ipv6_peer_gauge: metrics::Gauge, } impl TorrentMaps { + pub fn new(worker_index: usize) -> Self { + Self { + ipv4: Default::default(), + ipv6: Default::default(), + #[cfg(feature = "metrics")] + ipv4_peer_gauge: ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ), + #[cfg(feature = "metrics")] + ipv6_peer_gauge: ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ), + } + } + pub fn handle_announce_request( &mut self, config: &Config, @@ -63,6 +84,8 @@ impl TorrentMaps { peer_ip_address, request, valid_until, + #[cfg(feature = "metrics")] + &self.ipv4_peer_gauge, ); AnnounceResponse { @@ -85,6 +108,8 @@ impl TorrentMaps { peer_ip_address, request, valid_until, + #[cfg(feature = "metrics")] + &self.ipv6_peer_gauge, ); AnnounceResponse { @@ -157,8 +182,20 @@ impl TorrentMaps { let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); + Self::clean_torrent_map( + config, + &mut access_list_cache, + &mut self.ipv4, + now, + &self.ipv4_peer_gauge, + ); + Self::clean_torrent_map( + config, + &mut access_list_cache, + &mut self.ipv6, + now, + &self.ipv6_peer_gauge, + ); } fn clean_torrent_map( @@ -166,6 +203,7 @@ impl TorrentMaps { access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) { let mut total_num_peers = 0; @@ -197,12 +235,7 @@ impl TorrentMaps { let total_num_peers = total_num_peers as f64; #[cfg(feature = "metrics")] - ::metrics::gauge!( - "aquatic_peers", - total_num_peers, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peer_gauge.set(total_num_peers); torrent_map.shrink_to_fit(); } @@ -238,6 +271,7 @@ impl TorrentData { peer_ip_address: I, request: AnnounceRequest, valid_until: ValidUntil, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) -> (usize, usize, Vec>) { let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); @@ -257,12 +291,7 @@ impl TorrentData { PeerStatus::Seeding | PeerStatus::Leeching => { #[cfg(feature = "metrics")] if opt_removed_peer.is_none() { - ::metrics::increment_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peer_gauge.increment(1.0); } let max_num_peers_to_take = match request.numwant { @@ -288,12 +317,7 @@ impl TorrentData { PeerStatus::Stopped => { #[cfg(feature = "metrics")] if opt_removed_peer.is_some() { - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peer_gauge.decrement(1.0); } Vec::new()