diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 9a58b18..7f9fc65 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -92,6 +92,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_mesh_builder, priv_dropper, server_start_instant, + i, ) .await }) @@ -122,6 +123,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { state, request_mesh_builder, server_start_instant, + i, ) .await }) diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index af55e3c..3c1ac49 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -39,6 +39,9 @@ const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: "; const RESPONSE_HEADER_B: &[u8] = b" "; const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n"; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + static RESPONSE_HEADER: Lazy> = Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat()); @@ -60,7 +63,11 @@ pub async fn run_socket_worker( request_mesh_builder: MeshBuilder, priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let config = Rc::new(config); let access_list = state.access_list; @@ -105,6 +112,7 @@ pub async fn run_socket_worker( "aquatic_active_connections", 1.0, "ip_version" => ip_version_str, + "worker_index" => worker_index.to_string(), ); let result = Connection::run( @@ -124,6 +132,7 @@ pub async fn run_socket_worker( "aquatic_active_connections", 1.0, "ip_version" => ip_version_str, + "worker_index" => worker_index.to_string(), ); result @@ -319,7 +328,8 @@ impl Connection { ::metrics::increment_counter!( "aquatic_requests_total", "type" => "announce", - "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr) + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); let info_hash = request.info_hash; @@ -365,7 +375,8 @@ impl Connection { ::metrics::increment_counter!( "aquatic_requests_total", "type" => "scrape", - "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr) + "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); @@ -507,6 +518,7 @@ impl Connection { "aquatic_responses_total", "type" => response_type, "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); } diff --git a/aquatic_http/src/workers/swarm.rs b/aquatic_http/src/workers/swarm.rs index c2a2c6a..76893ad 100644 --- a/aquatic_http/src/workers/swarm.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -27,6 +27,9 @@ use aquatic_http_protocol::response::*; use crate::common::*; use crate::config::Config; +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { #[cfg(feature = "metrics")] fn ip_version_str() -> &'static str; @@ -177,6 +180,7 @@ impl TorrentMaps { "aquatic_peers", total_num_peers, "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); torrent_map.shrink_to_fit(); @@ -189,7 +193,11 @@ pub async fn run_swarm_worker( state: State, request_mesh_builder: MeshBuilder, server_start_instant: ServerStartInstant, + worker_index: usize, ) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); @@ -222,18 +230,20 @@ pub async fn run_swarm_worker( // Periodically update torrent count metrics #[cfg(feature = "metrics")] TimerActionRepeat::repeat(enclose!((config, torrents) move || { - enclose!((config, torrents) move || async move { + enclose!((config, torrents, worker_index) move || async move { let torrents = torrents.borrow_mut(); ::metrics::gauge!( "aquatic_torrents", torrents.ipv4.len() as f64, - "ip_version" => "4" + "ip_version" => "4", + "worker_index" => worker_index.to_string(), ); ::metrics::gauge!( "aquatic_torrents", torrents.ipv6.len() as f64, - "ip_version" => "6" + "ip_version" => "6", + "worker_index" => worker_index.to_string(), ); Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) @@ -425,14 +435,16 @@ pub fn upsert_peer_and_get_response_peers( ::metrics::decrement_gauge!( "aquatic_peers", 1.0, - "ip_version" => I::ip_version_str() + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); } PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { ::metrics::increment_gauge!( "aquatic_peers", 1.0, - "ip_version" => I::ip_version_str() + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), ); } _ => {}