From 5c04245cbef2d94ebdd73ea2ff239205af56e047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 18 Jan 2023 20:48:59 +0100 Subject: [PATCH] http: add metrics for connection count, requests and responses --- Cargo.lock | 2 + aquatic_http/Cargo.toml | 6 ++ aquatic_http/src/workers/socket.rs | 95 +++++++++++++++++++++++++----- 3 files changed, 88 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3a6a897..7e0fc45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,6 +111,8 @@ dependencies = [ "libc", "log", "memchr", + "metrics", + "metrics-exporter-prometheus", "mimalloc", "once_cell", "privdrop", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index ae5a674..d160bea 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -16,6 +16,10 @@ name = "aquatic_http" [[bin]] name = "aquatic_http" +[features] +prometheus = ["metrics", "metrics-exporter-prometheus"] +metrics = ["dep:metrics"] + [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } aquatic_http_protocol.workspace = true @@ -31,6 +35,8 @@ glommio = "0.7" itoa = "1" libc = "0.2" log = "0.4" +metrics = { version = "0.20", optional = true } +metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } memchr = "2" privdrop = "0.5" diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 927c53f..af55e3c 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -93,16 +93,47 @@ pub async fn run_socket_worker( }); let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { - if let Err(err) = Connection::run( - config, - access_list, - request_senders, - server_start_instant, - ConnectionId(key), - tls_config, - connection_slab.clone(), - stream - ).await { + let result = match stream.peer_addr() { + Ok(peer_addr) => { + let peer_addr = CanonicalSocketAddr::new(peer_addr); + + #[cfg(feature = "metrics")] + let ip_version_str = peer_addr_to_ip_version_str(&peer_addr); + + #[cfg(feature = "metrics")] + ::metrics::increment_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_str, + ); + + let result = Connection::run( + config, + access_list, + request_senders, + server_start_instant, + ConnectionId(key), + tls_config, + connection_slab.clone(), + stream, + peer_addr + ).await; + + #[cfg(feature = "metrics")] + ::metrics::decrement_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_str, + ); + + result + } + Err(err) => { + Err(anyhow::anyhow!("Couldn't get peer addr: {:?}", err)) + } + }; + + if let Err(err) = result { ::log::debug!("Connection::run() error: {:?}", err); } @@ -171,12 +202,8 @@ impl Connection { tls_config: Arc, connection_slab: Rc>>, stream: TcpStream, + peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { - let peer_addr = stream - .peer_addr() - .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; - let peer_addr = CanonicalSocketAddr::new(peer_addr); - let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; @@ -288,6 +315,13 @@ impl Connection { match request { Request::Announce(request) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr) + ); + let info_hash = request.info_hash; if self @@ -327,6 +361,13 @@ impl Connection { } } Request::Scrape(ScrapeRequest { info_hashes }) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr) + ); + let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); for info_hash in info_hashes.into_iter() { @@ -454,6 +495,21 @@ impl Connection { self.stream.write(&self.response_buffer[..position]).await?; self.stream.flush().await?; + #[cfg(feature = "metrics")] + { + let response_type = match response { + Response::Announce(_) => "announce", + Response::Scrape(_) => "scrape", + Response::Failure(_) => "error", + }; + + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => response_type, + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + ); + } + Ok(()) } } @@ -496,3 +552,12 @@ fn create_tcp_listener( Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } + +#[cfg(feature = "metrics")] +fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str { + if addr.is_ipv4() { + "4" + } else { + "6" + } +}