udp: initial support for listing peer clients

This commit is contained in:
Joakim Frostegård 2023-06-06 01:04:37 +02:00
parent 977349ec03
commit a74d6aa458
14 changed files with 213 additions and 68 deletions

View file

@ -136,6 +136,8 @@ impl PeerStatus {
pub enum StatisticsMessage {
Ipv4PeerHistogram(Histogram<u64>),
Ipv6PeerHistogram(Histogram<u64>),
PeerAdded(PeerId),
PeerRemoved(PeerId),
}
pub struct Statistics {

View file

@ -161,10 +161,9 @@ impl Default for ProtocolConfig {
pub struct StatisticsConfig {
/// Collect and print/write statistics this often (seconds)
pub interval: u64,
/// Enable extended statistics (on peers per torrent)
/// Enable extended statistics (on peers per torrent and on peer clients)
///
/// Will increase time taken for torrent cleaning, since that's when
/// these statistics are collected.
/// Will increase time taken for request handling and torrent cleaning
pub extended: bool,
/// Print statistics to standard output
pub print_to_stdout: bool,

View file

@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
use crossbeam_channel::Receiver;
use hdrhistogram::Histogram;
use num_format::{Locale, ToFormattedString};
use serde::Serialize;

View file

@ -2,10 +2,12 @@ mod collector;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::PanicSentinel;
use aquatic_common::{IndexMap, PanicSentinel};
use aquatic_udp_protocol::PeerClient;
use compact_str::{CompactString, ToCompactString};
use crossbeam_channel::Receiver;
use serde::Serialize;
use time::format_description::well_known::Rfc2822;
@ -35,6 +37,7 @@ struct TemplateData {
ipv6: CollectedStatistics,
last_updated: String,
peer_update_interval: String,
peer_clients: Vec<(CompactString, CompactString, usize)>,
}
pub fn run_statistics_worker(
@ -68,13 +71,46 @@ pub fn run_statistics_worker(
"6".into(),
);
let mut peer_clients: IndexMap<PeerClient, (usize, CompactString)> = IndexMap::default();
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
let start_time = Instant::now();
for message in statistics_receiver.try_iter() {
match message {
StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h),
StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h),
StatisticsMessage::PeerAdded(peer_id) => {
peer_clients
.entry(peer_id.client())
.or_insert((0, peer_id.first_8_bytes_hex()))
.0 += 1;
}
StatisticsMessage::PeerRemoved(peer_id) => {
let client = peer_id.client();
if let Some((count, _)) = peer_clients.get_mut(&client) {
if *count == 1 {
drop(count);
peer_clients.remove(&client);
} else {
*count -= 1;
}
}
}
}
}
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint && config.statistics.extended {
for (peer_client, (count, first_8_bytes)) in peer_clients.iter() {
::metrics::gauge!(
"aquatic_peer_clients",
*count as f64,
"client" => peer_client.to_string(),
"peer_id_prefix_hex" => first_8_bytes.to_string(),
);
}
}
@ -107,6 +143,23 @@ pub fn run_statistics_worker(
}
if let Some(tt) = opt_tt.as_ref() {
let mut peer_clients = if config.statistics.extended {
peer_clients
.iter()
.map(|(peer_client, (count, first_8_bytes))| {
(
peer_client.to_compact_string(),
first_8_bytes.to_owned(),
*count,
)
})
.collect()
} else {
Vec::new()
};
peer_clients.sort_unstable_by(|a, b| b.2.cmp(&a.2));
let template_data = TemplateData {
stylesheet: STYLESHEET_CONTENTS.to_string(),
ipv4_active: config.network.ipv4_active(),
@ -118,12 +171,19 @@ pub fn run_statistics_worker(
.format(&Rfc2822)
.unwrap_or("(formatting error)".into()),
peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval),
peer_clients,
};
if let Err(err) = save_html_to_file(&config, tt, &template_data) {
::log::error!("Couldn't save statistics to file: {:#}", err)
}
}
if let Some(time_remaining) =
Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed())
{
::std::thread::sleep(time_remaining);
}
}
}

View file

@ -50,6 +50,7 @@ pub fn run_swarm_worker(
let response = handle_announce_request(
&config,
&mut rng,
&statistics_sender,
&mut torrents.ipv4,
request,
ip,
@ -62,6 +63,7 @@ pub fn run_swarm_worker(
let response = handle_announce_request(
&config,
&mut rng,
&statistics_sender,
&mut torrents.ipv6,
request,
ip,
@ -88,28 +90,15 @@ pub fn run_swarm_worker(
peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
if now > last_cleaning + cleaning_interval {
let (ipv4, ipv6) = torrents.clean_and_get_statistics(
torrents.clean_and_update_statistics(
&config,
&state,
&statistics_sender,
&state.access_list,
server_start_instant,
worker_index,
);
if config.statistics.active() {
state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release);
state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release);
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err)
}
}
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err)
}
}
}
last_cleaning = now;
}
if config.statistics.active()
@ -131,6 +120,7 @@ pub fn run_swarm_worker(
fn handle_announce_request<I: Ip>(
config: &Config,
rng: &mut SmallRng,
statistics_sender: &Sender<StatisticsMessage>,
torrents: &mut TorrentMap<I>,
request: AnnounceRequest,
peer_ip: I,
@ -150,6 +140,8 @@ fn handle_announce_request<I: Ip>(
let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left);
torrent_data.update_peer(
config,
statistics_sender,
request.peer_id,
peer_ip,
request.port,

View file

@ -1,5 +1,6 @@
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use aquatic_common::IndexMap;
@ -11,6 +12,7 @@ use aquatic_common::{
};
use aquatic_udp_protocol::*;
use crossbeam_channel::Sender;
use hdrhistogram::Histogram;
use rand::prelude::SmallRng;
@ -46,6 +48,8 @@ pub struct TorrentData<I: Ip> {
impl<I: Ip> TorrentData<I> {
pub fn update_peer(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
peer_id: PeerId,
ip_address: I,
port: Port,
@ -78,6 +82,20 @@ impl<I: Ip> TorrentData<I> {
PeerStatus::Stopped => self.peers.remove(&peer_id),
};
if config.statistics.extended {
match (status, opt_removed_peer.is_some()) {
// We added a new peer
(PeerStatus::Leeching | PeerStatus::Seeding, false) => {
statistics_sender.try_send(StatisticsMessage::PeerAdded(peer_id));
}
// We removed an existing peer
(PeerStatus::Stopped, true) => {
statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer_id));
}
_ => (),
}
}
if let Some(Peer {
is_seeder: true, ..
}) = opt_removed_peer
@ -117,12 +135,22 @@ impl<I: Ip> TorrentData<I> {
}
/// Remove inactive peers and reclaim space
fn clean(&mut self, now: SecondsSinceServerStart) {
self.peers.retain(|_, peer| {
fn clean(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
now: SecondsSinceServerStart,
) {
self.peers.retain(|peer_id, peer| {
let keep = peer.valid_until.valid(now);
if (!keep) & peer.is_seeder {
self.num_seeders -= 1;
if !keep {
if peer.is_seeder {
self.num_seeders -= 1;
}
if config.statistics.extended {
statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id));
}
}
keep
@ -151,6 +179,7 @@ impl<I: Ip> TorrentMap<I> {
fn clean_and_get_statistics(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
access_list_cache: &mut AccessListCache,
access_list_mode: AccessListMode,
now: SecondsSinceServerStart,
@ -178,7 +207,7 @@ impl<I: Ip> TorrentMap<I> {
return false;
}
torrent.clean(now);
torrent.clean(config, statistics_sender, now);
num_peers += torrent.peers.len();
@ -225,28 +254,42 @@ impl Default for TorrentMaps {
}
impl TorrentMaps {
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
pub fn clean_and_get_statistics(
/// Remove forbidden or inactive torrents, reclaim space and update statistics
pub fn clean_and_update_statistics(
&mut self,
config: &Config,
state: &State,
statistics_sender: &Sender<StatisticsMessage>,
access_list: &Arc<AccessListArcSwap>,
server_start_instant: ServerStartInstant,
) -> (
(usize, Option<Histogram<u64>>),
(usize, Option<Histogram<u64>>),
worker_index: SwarmWorkerIndex,
) {
let mut cache = create_access_list_cache(access_list);
let mode = config.access_list.mode;
let now = server_start_instant.seconds_elapsed();
let ipv4 = self
.ipv4
.clean_and_get_statistics(config, &mut cache, mode, now);
let ipv6 = self
.ipv6
.clean_and_get_statistics(config, &mut cache, mode, now);
let ipv4 =
self.ipv4
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
let ipv6 =
self.ipv6
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
(ipv4, ipv6)
if config.statistics.active() {
state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release);
state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release);
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err);
}
}
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err);
}
}
}
}
}