ws: prometheus: store ip version (v4/v6) for connections and messages

This commit is contained in:
Joakim Frostegård 2023-01-17 21:06:34 +01:00
parent 8b7c3c481c
commit 32253a37df

View file

@ -153,7 +153,11 @@ pub async fn run_socket_worker(
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_gauge!("aquatic_active_connections", 1.0); ::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_to_metrics_str(ip_version)
);
if let Err(err) = run_connection( if let Err(err) = run_connection(
config.clone(), config.clone(),
@ -177,7 +181,11 @@ pub async fn run_socket_worker(
// Clean up after closed connection // Clean up after closed connection
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::decrement_gauge!("aquatic_active_connections", 1.0); ::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_to_metrics_str(ip_version)
);
// Remove reference in separate statement to avoid // Remove reference in separate statement to avoid
// multiple RefCell borrows // multiple RefCell borrows
@ -425,6 +433,7 @@ async fn run_stream_agnostic_connection<
pending_scrape_slab, pending_scrape_slab,
connection_id, connection_id,
server_start_instant, server_start_instant,
ip_version,
}; };
let result = writer.run_out_message_loop().await; let result = writer.run_out_message_loop().await;
@ -508,7 +517,11 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
match in_message { match in_message {
InMessage::AnnounceRequest(announce_request) => { InMessage::AnnounceRequest(announce_request) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_requests_total", "type" => "announce"); ::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => ip_version_to_metrics_str(self.ip_version)
);
let info_hash = announce_request.info_hash; let info_hash = announce_request.info_hash;
@ -581,7 +594,11 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
} }
InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_requests_total", "type" => "scrape"); ::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version)
);
let info_hashes = if let Some(info_hashes) = info_hashes { let info_hashes = if let Some(info_hashes) = info_hashes {
info_hashes info_hashes
@ -663,7 +680,11 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
}); });
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_responses_total", "type" => "error"); ::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "error",
"ip_version" => ip_version_to_metrics_str(self.ip_version)
);
result result
} }
@ -686,6 +707,7 @@ struct ConnectionWriter<S> {
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>, pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
connection_id: ConnectionId, connection_id: ConnectionId,
ip_version: IpVersion,
} }
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> { impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
@ -749,6 +771,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
match result { match result {
Ok(Ok(())) => { Ok(Ok(())) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
{
let out_message_type = match &out_message { let out_message_type = match &out_message {
OutMessage::Offer(_) => "offer", OutMessage::Offer(_) => "offer",
OutMessage::Answer(_) => "offer_answer", OutMessage::Answer(_) => "offer_answer",
@ -757,8 +780,12 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
OutMessage::ErrorResponse(_) => "error", OutMessage::ErrorResponse(_) => "error",
}; };
#[cfg(feature = "metrics")] ::metrics::increment_counter!(
::metrics::increment_counter!("aquatic_responses_total", "type" => out_message_type); "aquatic_responses_total",
"type" => out_message_type,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
);
}
self.connection_slab self.connection_slab
.borrow_mut() .borrow_mut()
@ -839,3 +866,11 @@ fn create_tcp_listener(
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
} }
#[cfg(feature = "metrics")]
fn ip_version_to_metrics_str(ip_version: IpVersion) -> &'static str {
match ip_version {
IpVersion::V4 => "4",
IpVersion::V6 => "6",
}
}