udp: add prometheus support

This commit is contained in:
Joakim Frostegård 2023-02-26 11:57:00 +01:00
parent e4b7c8451d
commit 5276a919da
6 changed files with 122 additions and 22 deletions

View file

@ -14,15 +14,17 @@ pub struct StatisticsCollector {
last_update: Instant,
pending_histograms: Vec<Histogram<u64>>,
last_complete_histogram: PeerHistogramStatistics,
ip_version: String,
}
impl StatisticsCollector {
pub fn new(shared: Arc<Statistics>) -> Self {
pub fn new(shared: Arc<Statistics>, ip_version: String) -> Self {
Self {
shared,
last_update: Instant::now(),
pending_histograms: Vec::new(),
last_complete_histogram: Default::default(),
ip_version,
}
}
@ -35,16 +37,28 @@ impl StatisticsCollector {
}
}
pub fn collect_from_shared(&mut self) -> CollectedStatistics {
pub fn collect_from_shared(&mut self, config: &Config) -> CollectedStatistics {
let requests_received = Self::fetch_and_reset(&self.shared.requests_received);
let responses_sent_connect = Self::fetch_and_reset(&self.shared.responses_sent_connect);
let responses_sent_announce = Self::fetch_and_reset(&self.shared.responses_sent_announce);
let responses_sent_scrape = Self::fetch_and_reset(&self.shared.responses_sent_scrape);
let responses_sent_error = Self::fetch_and_reset(&self.shared.responses_sent_error);
let bytes_received = Self::fetch_and_reset(&self.shared.bytes_received);
let bytes_sent = Self::fetch_and_reset(&self.shared.bytes_sent);
let num_torrents = Self::sum_atomic_usizes(&self.shared.torrents);
let num_peers = Self::sum_atomic_usizes(&self.shared.peers);
let num_torrents_by_worker: Vec<usize> = self
.shared
.torrents
.iter()
.map(|n| n.load(Ordering::Relaxed))
.collect();
let num_peers_by_worker: Vec<usize> = self
.shared
.peers
.iter()
.map(|n| n.load(Ordering::Relaxed))
.collect();
let elapsed = {
let now = Instant::now();
@ -56,13 +70,76 @@ impl StatisticsCollector {
elapsed
};
let requests_per_second = requests_received / elapsed;
let responses_per_second_connect = responses_sent_connect / elapsed;
let responses_per_second_announce = responses_sent_announce / elapsed;
let responses_per_second_scrape = responses_sent_scrape / elapsed;
let responses_per_second_error = responses_sent_error / elapsed;
let bytes_received_per_second = bytes_received / elapsed;
let bytes_sent_per_second = bytes_sent / elapsed;
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::counter!(
"aquatic_requests_total",
requests_received.try_into().unwrap(),
"ip_version" => self.ip_version.clone(),
);
::metrics::counter!(
"aquatic_responses_total",
responses_sent_connect.try_into().unwrap(),
"type" => "connect",
"ip_version" => self.ip_version.clone(),
);
::metrics::counter!(
"aquatic_responses_total",
responses_sent_announce.try_into().unwrap(),
"type" => "announce",
"ip_version" => self.ip_version.clone(),
);
::metrics::counter!(
"aquatic_responses_total",
responses_sent_scrape.try_into().unwrap(),
"type" => "scrape",
"ip_version" => self.ip_version.clone(),
);
::metrics::counter!(
"aquatic_responses_total",
responses_sent_error.try_into().unwrap(),
"type" => "error",
"ip_version" => self.ip_version.clone(),
);
::metrics::counter!(
"aquatic_rx_bytes",
bytes_received.try_into().unwrap(),
"ip_version" => self.ip_version.clone(),
);
::metrics::counter!(
"aquatic_tx_bytes",
bytes_sent.try_into().unwrap(),
"ip_version" => self.ip_version.clone(),
);
for (worker_index, n) in num_torrents_by_worker.iter().copied().enumerate() {
::metrics::gauge!(
"aquatic_torrents",
n as f64,
"ip_version" => self.ip_version.clone(),
"worker_index" => worker_index.to_string(),
);
}
for (worker_index, n) in num_peers_by_worker.iter().copied().enumerate() {
::metrics::gauge!(
"aquatic_peers",
n as f64,
"ip_version" => self.ip_version.clone(),
"worker_index" => worker_index.to_string(),
);
}
}
let num_peers: usize = num_peers_by_worker.into_iter().sum();
let num_torrents: usize = num_torrents_by_worker.into_iter().sum();
let requests_per_second = requests_received as f64 / elapsed;
let responses_per_second_connect = responses_sent_connect as f64 / elapsed;
let responses_per_second_announce = responses_sent_announce as f64 / elapsed;
let responses_per_second_scrape = responses_sent_scrape as f64 / elapsed;
let responses_per_second_error = responses_sent_error as f64 / elapsed;
let bytes_received_per_second = bytes_received as f64 / elapsed;
let bytes_sent_per_second = bytes_sent as f64 / elapsed;
let responses_per_second_total = responses_per_second_connect
+ responses_per_second_announce
@ -89,12 +166,8 @@ impl StatisticsCollector {
}
}
fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {
values.iter().map(|n| n.load(Ordering::Relaxed)).sum()
}
fn fetch_and_reset(atomic: &AtomicUsize) -> f64 {
atomic.fetch_and(0, Ordering::Relaxed) as f64
fn fetch_and_reset(atomic: &AtomicUsize) -> usize {
atomic.fetch_and(0, Ordering::Relaxed)
}
}

View file

@ -57,8 +57,8 @@ pub fn run_statistics_worker(
None
};
let mut ipv4_collector = StatisticsCollector::new(shared_state.statistics_ipv4);
let mut ipv6_collector = StatisticsCollector::new(shared_state.statistics_ipv6);
let mut ipv4_collector = StatisticsCollector::new(shared_state.statistics_ipv4, "4".into());
let mut ipv6_collector = StatisticsCollector::new(shared_state.statistics_ipv6, "6".into());
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
@ -70,8 +70,8 @@ pub fn run_statistics_worker(
}
}
let statistics_ipv4 = ipv4_collector.collect_from_shared();
let statistics_ipv6 = ipv6_collector.collect_from_shared();
let statistics_ipv4 = ipv4_collector.collect_from_shared(&config);
let statistics_ipv6 = ipv6_collector.collect_from_shared(&config);
if config.statistics.print_to_stdout {
println!("General:");