ws: add worker index data to metrics, fix incorrect key

This commit is contained in:
Joakim Frostegård 2023-01-18 21:35:57 +01:00
parent 766c6f87ca
commit da1ec6b5e1
3 changed files with 50 additions and 11 deletions

View file

@ -109,6 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
response_mesh_builder, response_mesh_builder,
priv_dropper, priv_dropper,
server_start_instant, server_start_instant,
i,
) )
.await .await
}) })
@ -145,6 +146,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
request_mesh_builder, request_mesh_builder,
response_mesh_builder, response_mesh_builder,
server_start_instant, server_start_instant,
i,
) )
.await .await
}) })

View file

@ -34,6 +34,9 @@ use crate::common::*;
const LOCAL_CHANNEL_SIZE: usize = 16; const LOCAL_CHANNEL_SIZE: usize = 16;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
struct PendingScrapeResponse { struct PendingScrapeResponse {
pending_worker_out_messages: usize, pending_worker_out_messages: usize,
stats: HashMap<InfoHash, ScrapeStatistics>, stats: HashMap<InfoHash, ScrapeStatistics>,
@ -60,7 +63,11 @@ pub async fn run_socket_worker(
out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>,
priv_dropper: PrivilegeDropper, priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
worker_index: usize,
) { ) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config); let config = Rc::new(config);
let access_list = state.access_list; let access_list = state.access_list;
@ -156,7 +163,8 @@ pub async fn run_socket_worker(
::metrics::increment_gauge!( ::metrics::increment_gauge!(
"aquatic_active_connections", "aquatic_active_connections",
1.0, 1.0,
"ip_version" => ip_version_to_metrics_str(ip_version) "ip_version" => ip_version_to_metrics_str(ip_version),
"worker_index" => worker_index.to_string(),
); );
if let Err(err) = run_connection( if let Err(err) = run_connection(
@ -184,7 +192,8 @@ pub async fn run_socket_worker(
::metrics::decrement_gauge!( ::metrics::decrement_gauge!(
"aquatic_active_connections", "aquatic_active_connections",
1.0, 1.0,
"ip_version" => ip_version_to_metrics_str(ip_version) "ip_version" => ip_version_to_metrics_str(ip_version),
"worker_index" => worker_index.to_string(),
); );
// Remove reference in separate statement to avoid // Remove reference in separate statement to avoid
@ -520,7 +529,8 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
::metrics::increment_counter!( ::metrics::increment_counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "announce", "type" => "announce",
"ip_version" => ip_version_to_metrics_str(self.ip_version) "ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
let info_hash = announce_request.info_hash; let info_hash = announce_request.info_hash;
@ -597,7 +607,8 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
::metrics::increment_counter!( ::metrics::increment_counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "scrape", "type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version) "ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
let info_hashes = if let Some(info_hashes) = info_hashes { let info_hashes = if let Some(info_hashes) = info_hashes {
@ -681,9 +692,10 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_counter!( ::metrics::increment_counter!(
"aquatic_requests_total", "aquatic_responses_total",
"type" => "error", "type" => "error",
"ip_version" => ip_version_to_metrics_str(self.ip_version) "ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
result result
@ -784,6 +796,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
"aquatic_responses_total", "aquatic_responses_total",
"type" => out_message_type, "type" => out_message_type,
"ip_version" => ip_version_to_metrics_str(self.ip_version), "ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
} }

View file

@ -22,6 +22,9 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
use crate::SHARED_IN_CHANNEL_SIZE; use crate::SHARED_IN_CHANNEL_SIZE;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
#[derive(PartialEq, Eq, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Clone, Copy, Debug)]
enum PeerStatus { enum PeerStatus {
Seeding, Seeding,
@ -145,7 +148,12 @@ impl TorrentMaps {
let total_num_peers = total_num_peers as f64; let total_num_peers = total_num_peers as f64;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::gauge!("aquatic_peers", total_num_peers, "ip_version" => ip_version); ::metrics::gauge!(
"aquatic_peers",
total_num_peers,
"ip_version" => ip_version,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
} }
} }
@ -157,7 +165,11 @@ pub async fn run_swarm_worker(
in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
worker_index: usize,
) { ) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let (_, mut control_message_receivers) = control_message_mesh_builder let (_, mut control_message_receivers) = control_message_mesh_builder
.join(Role::Consumer) .join(Role::Consumer)
.await .await
@ -189,12 +201,14 @@ pub async fn run_swarm_worker(
::metrics::gauge!( ::metrics::gauge!(
"aquatic_torrents", "aquatic_torrents",
torrents.ipv4.len() as f64, torrents.ipv4.len() as f64,
"ip_version" => "4" "ip_version" => "4",
"worker_index" => worker_index.to_string(),
); );
::metrics::gauge!( ::metrics::gauge!(
"aquatic_torrents", "aquatic_torrents",
torrents.ipv6.len() as f64, torrents.ipv6.len() as f64,
"ip_version" => "6" "ip_version" => "6",
"worker_index" => worker_index.to_string(),
); );
Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
@ -392,10 +406,20 @@ fn handle_announce_request(
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
match peer_status { match peer_status {
PeerStatus::Stopped if opt_removed_peer.is_some() => { PeerStatus::Stopped if opt_removed_peer.is_some() => {
::metrics::decrement_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version); ::metrics::decrement_gauge!(
"aquatic_peers",
1.0,
"ip_version" => ip_version,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
} }
PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => {
::metrics::increment_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version); ::metrics::increment_gauge!(
"aquatic_peers",
1.0,
"ip_version" => ip_version,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
} }
_ => {} _ => {}
} }