udp: improve peer client statistics

This commit is contained in:
Joakim Frostegård 2023-06-08 01:06:19 +02:00
parent 1ddac59fee
commit da25d60a5d
7 changed files with 159 additions and 75 deletions

View file

@ -161,11 +161,16 @@ impl Default for ProtocolConfig {
pub struct StatisticsConfig {
/// Collect and print/write statistics this often (seconds)
pub interval: u64,
/// Enable extended statistics (on peers per torrent and on peer clients).
/// Also, see `prometheus_peer_clients`.
/// Collect statistics on number of peers per torrent
///
/// Will increase time taken for request handling and torrent cleaning.
pub extended: bool,
/// Will increase time taken for torrent cleaning.
pub torrent_peer_histograms: bool,
/// Collect statistics on peer clients.
///
/// Also, see `prometheus_peer_id_prefixes`.
///
/// Quite costly when it comes to CPU and RAM.
pub peer_clients: bool,
/// Print statistics to standard output
pub print_to_stdout: bool,
/// Save statistics as HTML to a file
@ -178,14 +183,13 @@ pub struct StatisticsConfig {
/// Address to run prometheus endpoint on
#[cfg(feature = "prometheus")]
pub prometheus_endpoint_address: SocketAddr,
/// Serve information on all peer clients on the prometheus endpoint.
/// Requires extended statistics to be activated.
/// Serve information on all peer id prefixes on the prometheus endpoint.
/// Requires `peer_clients` to be activated.
///
/// NOT RECOMMENDED. May consume lots of CPU and RAM since data on every
/// single peer client will be kept around by the endpoint, even those
/// which are no longer in the swarm.
/// May consume quite a bit of CPU and RAM, since data on every single peer
/// client will be reported continuously on the endpoint
#[cfg(feature = "prometheus")]
pub prometheus_peer_clients: bool,
pub prometheus_peer_id_prefixes: bool,
}
impl StatisticsConfig {
@ -207,7 +211,8 @@ impl Default for StatisticsConfig {
fn default() -> Self {
Self {
interval: 5,
extended: false,
torrent_peer_histograms: false,
peer_clients: false,
print_to_stdout: false,
write_html_to_file: false,
html_file_path: "tmp/statistics.html".into(),
@ -216,7 +221,7 @@ impl Default for StatisticsConfig {
#[cfg(feature = "prometheus")]
prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)),
#[cfg(feature = "prometheus")]
prometheus_peer_clients: false,
prometheus_peer_id_prefixes: false,
}
}
}

View file

@ -4,6 +4,7 @@ pub mod workers;
use std::collections::BTreeMap;
use std::thread::Builder;
use std::time::Duration;
use anyhow::Context;
use crossbeam_channel::{bounded, unbounded};
@ -143,8 +144,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
PrometheusBuilder::new()
.idle_timeout(
MetricKindMask::ALL,
Some(Duration::from_secs(config.statistics.interval * 2)),
)
.with_http_listener(config.statistics.prometheus_endpoint_address)
.install()
.with_context(|| {

View file

@ -134,7 +134,7 @@ impl StatisticsCollector {
);
}
if config.statistics.extended {
if config.statistics.torrent_peer_histograms {
self.last_complete_histogram
.update_metrics(self.ip_version.clone());
}

View file

@ -6,7 +6,8 @@ use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::{IndexMap, PanicSentinel};
use aquatic_udp_protocol::PeerClient;
use aquatic_udp_protocol::{PeerClient, PeerId};
use compact_str::CompactString;
use crossbeam_channel::Receiver;
use num_format::{Locale, ToFormattedString};
use serde::Serialize;
@ -46,6 +47,17 @@ pub fn run_statistics_worker(
shared_state: State,
statistics_receiver: Receiver<StatisticsMessage>,
) {
let process_peer_client_data = {
let mut collect = config.statistics.write_html_to_file;
#[cfg(feature = "prometheus")]
{
collect |= config.statistics.run_prometheus_endpoint;
}
collect & config.statistics.peer_clients
};
let opt_tt = if config.statistics.write_html_to_file {
let mut tt = TinyTemplate::new();
@ -71,7 +83,7 @@ pub fn run_statistics_worker(
"6".into(),
);
let mut peer_clients: IndexMap<PeerClient, usize> = IndexMap::default();
let mut peers: IndexMap<PeerId, (PeerClient, CompactString)> = IndexMap::default();
loop {
let start_time = Instant::now();
@ -81,45 +93,16 @@ pub fn run_statistics_worker(
StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h),
StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h),
StatisticsMessage::PeerAdded(peer_id) => {
let peer_client = peer_id.client();
if process_peer_client_data {
let peer_client = peer_id.client();
let prefix = peer_id.first_8_bytes_hex();
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint
&& config.statistics.prometheus_peer_clients
{
::metrics::increment_gauge!(
"aquatic_peer_clients",
1.0,
"client" => peer_client.to_string(),
"peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(),
);
peers.insert(peer_id, (peer_client, prefix));
}
*peer_clients.entry(peer_client).or_insert(0) += 1;
}
StatisticsMessage::PeerRemoved(peer_id) => {
let peer_client = peer_id.client();
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint
&& config.statistics.prometheus_peer_clients
{
::metrics::decrement_gauge!(
"aquatic_peer_clients",
1.0,
"client" => peer_client.to_string(),
"peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(),
);
}
if let Some(count) = peer_clients.get_mut(&peer_client) {
if *count == 1 {
drop(count);
peer_clients.remove(&peer_client);
} else {
*count -= 1;
}
if process_peer_client_data {
peers.remove(&peer_id);
}
}
}
@ -134,6 +117,60 @@ pub fn run_statistics_worker(
&config,
);
let peer_clients = if process_peer_client_data {
let mut clients: IndexMap<PeerClient, usize> = IndexMap::default();
#[cfg(feature = "prometheus")]
let mut prefixes: IndexMap<CompactString, usize> = IndexMap::default();
for (peer_client, prefix) in peers.values() {
*clients.entry(peer_client.to_owned()).or_insert(0) += 1;
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint
&& config.statistics.prometheus_peer_id_prefixes
{
*prefixes.entry(prefix.to_owned()).or_insert(0) += 1;
}
}
clients.sort_unstable_by(|_, a, _, b| b.cmp(a));
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint
&& config.statistics.prometheus_peer_id_prefixes
{
for (prefix, count) in prefixes {
::metrics::gauge!(
"aquatic_peer_id_prefixes",
count as f64,
"prefix_hex" => prefix.to_string(),
);
}
}
let mut client_vec = Vec::with_capacity(clients.len());
for (client, count) in clients {
if config.statistics.write_html_to_file {
client_vec.push((client.to_string(), count.to_formatted_string(&Locale::en)));
}
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_peer_clients",
count as f64,
"client" => client.to_string(),
);
}
}
client_vec
} else {
Vec::new()
};
if config.statistics.print_to_stdout {
println!("General:");
println!(
@ -154,29 +191,11 @@ pub fn run_statistics_worker(
}
if let Some(tt) = opt_tt.as_ref() {
let mut peer_clients = if config.statistics.extended {
peer_clients.iter().collect()
} else {
Vec::new()
};
peer_clients.sort_unstable_by(|a, b| b.1.cmp(a.1));
let peer_clients = peer_clients
.into_iter()
.map(|(peer_client, count)| {
(
peer_client.to_string(),
count.to_formatted_string(&Locale::en),
)
})
.collect();
let template_data = TemplateData {
stylesheet: STYLESHEET_CONTENTS.to_string(),
ipv4_active: config.network.ipv4_active(),
ipv6_active: config.network.ipv6_active(),
extended_active: config.statistics.extended,
extended_active: config.statistics.torrent_peer_histograms,
ipv4: statistics_ipv4,
ipv6: statistics_ipv6,
last_updated: OffsetDateTime::now_utc()
@ -191,6 +210,8 @@ pub fn run_statistics_worker(
}
}
peers.shrink_to_fit();
if let Some(time_remaining) =
Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed())
{
@ -236,7 +257,7 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) {
statistics.num_peers, config.cleaning.torrent_cleaning_interval
);
if config.statistics.extended {
if config.statistics.torrent_peer_histograms {
println!(
" peers per torrent (updated every {}s)",
config.cleaning.torrent_cleaning_interval

View file

@ -82,7 +82,7 @@ impl<I: Ip> TorrentData<I> {
PeerStatus::Stopped => self.peers.remove(&peer_id),
};
if config.statistics.extended {
if config.statistics.peer_clients {
match (status, opt_removed_peer.is_some()) {
// We added a new peer
(PeerStatus::Leeching | PeerStatus::Seeding, false) => {
@ -158,7 +158,7 @@ impl<I: Ip> TorrentData<I> {
if peer.is_seeder {
self.num_seeders -= 1;
}
if config.statistics.extended {
if config.statistics.peer_clients {
if let Err(_) =
statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id))
{
@ -201,7 +201,8 @@ impl<I: Ip> TorrentMap<I> {
) -> (usize, Option<Histogram<u64>>) {
let mut num_peers = 0;
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.extended {
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
{
match Histogram::new(3) {
Ok(histogram) => Some(histogram),
Err(err) => {