http: add worker index data

This commit is contained in:
Joakim Frostegård 2023-01-18 21:24:46 +01:00
parent b41c565e38
commit 766c6f87ca
3 changed files with 33 additions and 7 deletions

View file

@ -92,6 +92,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
request_mesh_builder, request_mesh_builder,
priv_dropper, priv_dropper,
server_start_instant, server_start_instant,
i,
) )
.await .await
}) })
@ -122,6 +123,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
state, state,
request_mesh_builder, request_mesh_builder,
server_start_instant, server_start_instant,
i,
) )
.await .await
}) })

View file

@ -39,6 +39,9 @@ const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
const RESPONSE_HEADER_B: &[u8] = b" "; const RESPONSE_HEADER_B: &[u8] = b" ";
const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n"; const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n";
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
static RESPONSE_HEADER: Lazy<Vec<u8>> = static RESPONSE_HEADER: Lazy<Vec<u8>> =
Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat()); Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat());
@ -60,7 +63,11 @@ pub async fn run_socket_worker(
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>, request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
priv_dropper: PrivilegeDropper, priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
worker_index: usize,
) { ) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config); let config = Rc::new(config);
let access_list = state.access_list; let access_list = state.access_list;
@ -105,6 +112,7 @@ pub async fn run_socket_worker(
"aquatic_active_connections", "aquatic_active_connections",
1.0, 1.0,
"ip_version" => ip_version_str, "ip_version" => ip_version_str,
"worker_index" => worker_index.to_string(),
); );
let result = Connection::run( let result = Connection::run(
@ -124,6 +132,7 @@ pub async fn run_socket_worker(
"aquatic_active_connections", "aquatic_active_connections",
1.0, 1.0,
"ip_version" => ip_version_str, "ip_version" => ip_version_str,
"worker_index" => worker_index.to_string(),
); );
result result
@ -319,7 +328,8 @@ impl Connection {
::metrics::increment_counter!( ::metrics::increment_counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "announce", "type" => "announce",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr) "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
let info_hash = request.info_hash; let info_hash = request.info_hash;
@ -365,7 +375,8 @@ impl Connection {
::metrics::increment_counter!( ::metrics::increment_counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "scrape", "type" => "scrape",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr) "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new(); let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
@ -507,6 +518,7 @@ impl Connection {
"aquatic_responses_total", "aquatic_responses_total",
"type" => response_type, "type" => response_type,
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
} }

View file

@ -27,6 +27,9 @@ use aquatic_http_protocol::response::*;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str; fn ip_version_str() -> &'static str;
@ -177,6 +180,7 @@ impl TorrentMaps {
"aquatic_peers", "aquatic_peers",
total_num_peers, total_num_peers,
"ip_version" => I::ip_version_str(), "ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
torrent_map.shrink_to_fit(); torrent_map.shrink_to_fit();
@ -189,7 +193,11 @@ pub async fn run_swarm_worker(
state: State, state: State,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>, request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
worker_index: usize,
) { ) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let torrents = Rc::new(RefCell::new(TorrentMaps::default())); let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
@ -222,18 +230,20 @@ pub async fn run_swarm_worker(
// Periodically update torrent count metrics // Periodically update torrent count metrics
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
TimerActionRepeat::repeat(enclose!((config, torrents) move || { TimerActionRepeat::repeat(enclose!((config, torrents) move || {
enclose!((config, torrents) move || async move { enclose!((config, torrents, worker_index) move || async move {
let torrents = torrents.borrow_mut(); let torrents = torrents.borrow_mut();
::metrics::gauge!( ::metrics::gauge!(
"aquatic_torrents", "aquatic_torrents",
torrents.ipv4.len() as f64, torrents.ipv4.len() as f64,
"ip_version" => "4" "ip_version" => "4",
"worker_index" => worker_index.to_string(),
); );
::metrics::gauge!( ::metrics::gauge!(
"aquatic_torrents", "aquatic_torrents",
torrents.ipv6.len() as f64, torrents.ipv6.len() as f64,
"ip_version" => "6" "ip_version" => "6",
"worker_index" => worker_index.to_string(),
); );
Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
@ -425,14 +435,16 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
::metrics::decrement_gauge!( ::metrics::decrement_gauge!(
"aquatic_peers", "aquatic_peers",
1.0, 1.0,
"ip_version" => I::ip_version_str() "ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
} }
PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => {
::metrics::increment_gauge!( ::metrics::increment_gauge!(
"aquatic_peers", "aquatic_peers",
1.0, 1.0,
"ip_version" => I::ip_version_str() "ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
); );
} }
_ => {} _ => {}