http: add peer and torrent count metrics

This commit is contained in:
Joakim Frostegård 2023-01-18 21:04:25 +01:00
parent 5c04245cbe
commit 32884aae36
2 changed files with 120 additions and 33 deletions

View file

@ -29,6 +29,8 @@ pub struct Config {
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig, pub access_list: AccessListConfig,
pub cpu_pinning: CpuPinningConfigAsc, pub cpu_pinning: CpuPinningConfigAsc,
#[cfg(feature = "metrics")]
pub metrics: MetricsConfig,
} }
impl Default for Config { impl Default for Config {
@ -43,6 +45,8 @@ impl Default for Config {
privileges: PrivilegeConfig::default(), privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(), access_list: AccessListConfig::default(),
cpu_pinning: Default::default(), cpu_pinning: Default::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
} }
} }
} }
@ -128,6 +132,29 @@ impl Default for CleaningConfig {
} }
} }
#[cfg(feature = "metrics")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct MetricsConfig {
/// Run a prometheus endpoint
pub run_prometheus_endpoint: bool,
/// Address to run prometheus endpoint on
pub prometheus_endpoint_address: SocketAddr,
/// Update metrics for torrent count this often (seconds)
pub torrent_count_update_interval: u64,
}
#[cfg(feature = "metrics")]
impl Default for MetricsConfig {
fn default() -> Self {
Self {
run_prometheus_endpoint: false,
prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)),
torrent_count_update_interval: 10,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Config; use super::Config;

View file

@ -27,10 +27,23 @@ use aquatic_http_protocol::response::*;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {
#[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str;
}
impl Ip for Ipv4Addr {} impl Ip for Ipv4Addr {
impl Ip for Ipv6Addr {} #[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str {
"4"
}
}
impl Ip for Ipv6Addr {
#[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str {
"6"
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PeerStatus { pub enum PeerStatus {
@ -59,8 +72,8 @@ impl PeerStatus {
pub struct Peer<I: Ip> { pub struct Peer<I: Ip> {
pub ip_address: I, pub ip_address: I,
pub port: u16, pub port: u16,
pub status: PeerStatus,
pub valid_until: ValidUntil, pub valid_until: ValidUntil,
pub seeder: bool,
} }
impl<I: Ip> Peer<I> { impl<I: Ip> Peer<I> {
@ -83,7 +96,6 @@ pub type PeerMap<I> = IndexMap<PeerMapKey<I>, Peer<I>>;
pub struct TorrentData<I: Ip> { pub struct TorrentData<I: Ip> {
pub peers: PeerMap<I>, pub peers: PeerMap<I>,
pub num_seeders: usize, pub num_seeders: usize,
pub num_leechers: usize,
} }
impl<I: Ip> Default for TorrentData<I> { impl<I: Ip> Default for TorrentData<I> {
@ -92,11 +104,16 @@ impl<I: Ip> Default for TorrentData<I> {
Self { Self {
peers: Default::default(), peers: Default::default(),
num_seeders: 0, num_seeders: 0,
num_leechers: 0,
} }
} }
} }
impl<I: Ip> TorrentData<I> {
fn num_leechers(&self) -> usize {
self.peers.len() - self.num_seeders
}
}
pub type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>; pub type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)] #[derive(Default)]
@ -126,6 +143,8 @@ impl TorrentMaps {
torrent_map: &mut TorrentMap<I>, torrent_map: &mut TorrentMap<I>,
now: SecondsSinceServerStart, now: SecondsSinceServerStart,
) { ) {
let mut total_num_peers = 0;
torrent_map.retain(|info_hash, torrent_data| { torrent_map.retain(|info_hash, torrent_data| {
if !access_list_cache if !access_list_cache
.load() .load()
@ -135,29 +154,31 @@ impl TorrentMaps {
} }
let num_seeders = &mut torrent_data.num_seeders; let num_seeders = &mut torrent_data.num_seeders;
let num_leechers = &mut torrent_data.num_leechers;
torrent_data.peers.retain(|_, peer| { torrent_data.peers.retain(|_, peer| {
let keep = peer.valid_until.valid(now); let keep = peer.valid_until.valid(now);
if !keep { if (!keep) & peer.seeder {
match peer.status { *num_seeders -= 1;
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
} }
keep keep
}); });
total_num_peers += torrent_data.peers.len() as u64;
!torrent_data.peers.is_empty() !torrent_data.peers.is_empty()
}); });
let total_num_peers = total_num_peers as f64;
#[cfg(feature = "metrics")]
::metrics::gauge!(
"aquatic_peers",
total_num_peers,
"ip_version" => I::ip_version_str(),
);
torrent_map.shrink_to_fit(); torrent_map.shrink_to_fit();
} }
} }
@ -198,6 +219,27 @@ pub async fn run_swarm_worker(
})() })()
})); }));
// Periodically update torrent count metrics
#[cfg(feature = "metrics")]
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
enclose!((config, torrents) move || async move {
let torrents = torrents.borrow_mut();
::metrics::gauge!(
"aquatic_torrents",
torrents.ipv4.len() as f64,
"ip_version" => "4"
);
::metrics::gauge!(
"aquatic_torrents",
torrents.ipv6.len() as f64,
"ip_version" => "6"
);
Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
})()
}));
let mut handles = Vec::new(); let mut handles = Vec::new();
for (_, receiver) in request_receivers.streams() { for (_, receiver) in request_receivers.streams() {
@ -337,13 +379,6 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
let peer_status = let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
let peer = Peer {
ip_address: peer_ip_address,
port: request.port,
status: peer_status,
valid_until,
};
let ip_or_key = request let ip_or_key = request
.key .key
.map(Either::Right) .map(Either::Right)
@ -356,24 +391,49 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
let opt_removed_peer = match peer_status { let opt_removed_peer = match peer_status {
PeerStatus::Leeching => { PeerStatus::Leeching => {
torrent_data.num_leechers += 1; let peer = Peer {
ip_address: peer_ip_address,
port: request.port,
valid_until,
seeder: false,
};
torrent_data.peers.insert(peer_map_key.clone(), peer) torrent_data.peers.insert(peer_map_key.clone(), peer)
} }
PeerStatus::Seeding => { PeerStatus::Seeding => {
torrent_data.num_seeders += 1; torrent_data.num_seeders += 1;
let peer = Peer {
ip_address: peer_ip_address,
port: request.port,
valid_until,
seeder: true,
};
torrent_data.peers.insert(peer_map_key.clone(), peer) torrent_data.peers.insert(peer_map_key.clone(), peer)
} }
PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key),
}; };
match opt_removed_peer.map(|peer| peer.status) { if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
Some(PeerStatus::Leeching) => { torrent_data.num_seeders -= 1;
torrent_data.num_leechers -= 1; }
#[cfg(feature = "metrics")]
match peer_status {
PeerStatus::Stopped if opt_removed_peer.is_some() => {
::metrics::decrement_gauge!(
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str()
);
} }
Some(PeerStatus::Seeding) => { PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => {
torrent_data.num_seeders -= 1; ::metrics::increment_gauge!(
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str()
);
} }
_ => {} _ => {}
} }
@ -397,7 +457,7 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
( (
torrent_data.num_seeders, torrent_data.num_seeders,
torrent_data.num_leechers, torrent_data.num_leechers(),
response_peers, response_peers,
) )
} }
@ -427,7 +487,7 @@ pub fn handle_scrape_request(
let stats = ScrapeStatistics { let stats = ScrapeStatistics {
complete: torrent_data.num_seeders, complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers, incomplete: torrent_data.num_leechers(),
}; };
response.files.insert(info_hash, stats); response.files.insert(info_hash, stats);
@ -439,7 +499,7 @@ pub fn handle_scrape_request(
let stats = ScrapeStatistics { let stats = ScrapeStatistics {
complete: torrent_data.num_seeders, complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers, incomplete: torrent_data.num_leechers(),
}; };
response.files.insert(info_hash, stats); response.files.insert(info_hash, stats);