diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 1c296c8..1263175 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -109,6 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { response_mesh_builder, priv_dropper, server_start_instant, + i, ) .await }) @@ -145,6 +146,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_mesh_builder, response_mesh_builder, server_start_instant, + i, ) .await }) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index a63d7df..82b52f9 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -34,6 +34,9 @@ use crate::common::*; const LOCAL_CHANNEL_SIZE: usize = 16; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + struct PendingScrapeResponse { pending_worker_out_messages: usize, stats: HashMap, @@ -60,7 +63,11 @@ pub async fn run_socket_worker( out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let config = Rc::new(config); let access_list = state.access_list; @@ -156,7 +163,8 @@ pub async fn run_socket_worker( ::metrics::increment_gauge!( "aquatic_active_connections", 1.0, - "ip_version" => ip_version_to_metrics_str(ip_version) + "ip_version" => ip_version_to_metrics_str(ip_version), + "worker_index" => worker_index.to_string(), ); if let Err(err) = run_connection( @@ -184,7 +192,8 @@ pub async fn run_socket_worker( ::metrics::decrement_gauge!( "aquatic_active_connections", 1.0, - "ip_version" => ip_version_to_metrics_str(ip_version) + "ip_version" => ip_version_to_metrics_str(ip_version), + "worker_index" => worker_index.to_string(), ); // Remove reference in separate statement to avoid @@ -520,7 +529,8 @@ impl ConnectionReader { ::metrics::increment_counter!( "aquatic_requests_total", "type" => "announce", - "ip_version" => ip_version_to_metrics_str(self.ip_version) + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); let info_hash = announce_request.info_hash; @@ -597,7 +607,8 @@ impl ConnectionReader { ::metrics::increment_counter!( "aquatic_requests_total", "type" => "scrape", - "ip_version" => ip_version_to_metrics_str(self.ip_version) + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); let info_hashes = if let Some(info_hashes) = info_hashes { @@ -681,9 +692,10 @@ impl ConnectionReader { #[cfg(feature = "metrics")] ::metrics::increment_counter!( - "aquatic_requests_total", + "aquatic_responses_total", "type" => "error", - "ip_version" => ip_version_to_metrics_str(self.ip_version) + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); result @@ -784,6 +796,7 @@ impl ConnectionWriter { "aquatic_responses_total", "type" => out_message_type, "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index dfef5a4..21806a3 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -22,6 +22,9 @@ use crate::common::*; use crate::config::Config; use crate::SHARED_IN_CHANNEL_SIZE; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + #[derive(PartialEq, Eq, Clone, Copy, Debug)] enum PeerStatus { Seeding, @@ -145,7 +148,12 @@ impl TorrentMaps { let total_num_peers = total_num_peers as f64; #[cfg(feature = "metrics")] - ::metrics::gauge!("aquatic_peers", total_num_peers, "ip_version" => ip_version); + ::metrics::gauge!( + "aquatic_peers", + total_num_peers, + "ip_version" => ip_version, + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } } @@ -157,7 +165,11 @@ pub async fn run_swarm_worker( in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) .await @@ -189,12 +201,14 @@ pub async fn run_swarm_worker( ::metrics::gauge!( "aquatic_torrents", torrents.ipv4.len() as f64, - "ip_version" => "4" + "ip_version" => "4", + "worker_index" => worker_index.to_string(), ); ::metrics::gauge!( "aquatic_torrents", torrents.ipv6.len() as f64, - "ip_version" => "6" + "ip_version" => "6", + "worker_index" => worker_index.to_string(), ); Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) @@ -392,10 +406,20 @@ fn handle_announce_request( #[cfg(feature = "metrics")] match peer_status { PeerStatus::Stopped if opt_removed_peer.is_some() => { - ::metrics::decrement_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version); + ::metrics::decrement_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => ip_version, + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { - ::metrics::increment_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version); + ::metrics::increment_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => ip_version, + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); } _ => {} }