diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 460d7a8..a63d7df 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -153,7 +153,11 @@ pub async fn run_socket_worker( let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { #[cfg(feature = "metrics")] - ::metrics::increment_gauge!("aquatic_active_connections", 1.0); + ::metrics::increment_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_to_metrics_str(ip_version) + ); if let Err(err) = run_connection( config.clone(), @@ -177,7 +181,11 @@ pub async fn run_socket_worker( // Clean up after closed connection #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!("aquatic_active_connections", 1.0); + ::metrics::decrement_gauge!( + "aquatic_active_connections", + 1.0, + "ip_version" => ip_version_to_metrics_str(ip_version) + ); // Remove reference in separate statement to avoid // multiple RefCell borrows @@ -425,6 +433,7 @@ async fn run_stream_agnostic_connection< pending_scrape_slab, connection_id, server_start_instant, + ip_version, }; let result = writer.run_out_message_loop().await; @@ -508,7 +517,11 @@ impl ConnectionReader { match in_message { InMessage::AnnounceRequest(announce_request) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_requests_total", "type" => "announce"); + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => ip_version_to_metrics_str(self.ip_version) + ); let info_hash = announce_request.info_hash; @@ -581,7 +594,11 @@ impl ConnectionReader { } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_requests_total", "type" => "scrape"); + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => ip_version_to_metrics_str(self.ip_version) + ); let info_hashes = if let Some(info_hashes) = info_hashes { info_hashes @@ -663,7 +680,11 @@ impl ConnectionReader { }); #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_responses_total", "type" => "error"); + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "error", + "ip_version" => ip_version_to_metrics_str(self.ip_version) + ); result } @@ -686,6 +707,7 @@ struct ConnectionWriter { pending_scrape_slab: Rc>>, server_start_instant: ServerStartInstant, connection_id: ConnectionId, + ip_version: IpVersion, } impl ConnectionWriter { @@ -749,16 +771,21 @@ impl ConnectionWriter { match result { Ok(Ok(())) => { #[cfg(feature = "metrics")] - let out_message_type = match &out_message { - OutMessage::Offer(_) => "offer", - OutMessage::Answer(_) => "offer_answer", - OutMessage::AnnounceResponse(_) => "announce", - OutMessage::ScrapeResponse(_) => "scrape", - OutMessage::ErrorResponse(_) => "error", - }; + { + let out_message_type = match &out_message { + OutMessage::Offer(_) => "offer", + OutMessage::Answer(_) => "offer_answer", + OutMessage::AnnounceResponse(_) => "announce", + OutMessage::ScrapeResponse(_) => "scrape", + OutMessage::ErrorResponse(_) => "error", + }; - #[cfg(feature = "metrics")] - ::metrics::increment_counter!("aquatic_responses_total", "type" => out_message_type); + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => out_message_type, + "ip_version" => ip_version_to_metrics_str(self.ip_version), + ); + } self.connection_slab .borrow_mut() @@ -839,3 +866,11 @@ fn create_tcp_listener( Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } + +#[cfg(feature = "metrics")] +fn ip_version_to_metrics_str(ip_version: IpVersion) -> &'static str { + match ip_version { + IpVersion::V4 => "4", + IpVersion::V6 => "6", + } +}