diff --git a/Cargo.lock b/Cargo.lock index b6d2301..5a64467 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,8 @@ name = "aquatic_peer_id" version = "0.8.0" dependencies = [ "compact_str", + "hex", + "quickcheck", "regex", "serde", ] @@ -218,6 +220,7 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", + "compact_str", "constant_time_eq", "crossbeam-channel", "getrandom", @@ -284,6 +287,7 @@ dependencies = [ name = "aquatic_udp_protocol" version = "0.8.0" dependencies = [ + "aquatic_peer_id", "byteorder", "either", "quickcheck", diff --git a/aquatic_peer_id/Cargo.toml b/aquatic_peer_id/Cargo.toml index 3074e6b..36f5553 100644 --- a/aquatic_peer_id/Cargo.toml +++ b/aquatic_peer_id/Cargo.toml @@ -14,5 +14,7 @@ name = "aquatic_peer_id" [dependencies] compact_str = "0.7" +hex = "0.4" regex = "1" -serde = { version = "1", features = ["derive"] } \ No newline at end of file +serde = { version = "1", features = ["derive"] } +quickcheck = "1" \ No newline at end of file diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 8f002c3..b6f5ed4 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -7,6 +7,20 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct PeerId(pub [u8; 20]); +impl PeerId { + pub fn client(&self) -> PeerClient { + PeerClient::from_peer_id(self) + } + pub fn first_8_bytes_hex(&self) -> CompactString { + let mut buf = [0u8; 16]; + + hex::encode_to_slice(&self.0[..8], &mut buf) + .expect("PeerId.first_8_bytes_hex buffer too small"); + + CompactString::from_utf8_lossy(&buf) + } +} + #[non_exhaustive] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum PeerClient { @@ -112,7 +126,7 @@ impl PeerClient { } } - pub fn from_peer_id(peer_id: PeerId) -> Self { + pub fn from_peer_id(peer_id: &PeerId) -> Self { static AZ_RE: OnceLock = OnceLock::new(); if let Some(caps) = AZ_RE @@ -178,6 +192,18 @@ impl Display for PeerClient { } } +impl quickcheck::Arbitrary for PeerId { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let mut bytes = [0u8; 20]; + + for byte in bytes.iter_mut() { + *byte = u8::arbitrary(g); + } + + Self(bytes) + } +} + #[cfg(test)] mod tests { use super::*; @@ -195,43 +221,43 @@ mod tests { #[test] fn test_client_from_peer_id() { assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-lt1234-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-lt1234-k/asdh3")), PeerClient::LibTorrentRakshasa("1.23.4".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-UT123A-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-UT123A-k/asdh3")), PeerClient::UTorrent("1.2.3 [Alpha]".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-TR0012-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-TR0012-k/asdh3")), PeerClient::Transmission("0.12".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-TR1212-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-TR1212-k/asdh3")), PeerClient::Transmission("1.21".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-WW0102-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-WW0102-k/asdh3")), PeerClient::WebTorrent("1.2".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-WW1302-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-WW1302-k/asdh3")), PeerClient::WebTorrent("13.2".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-WW1324-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-WW1324-k/asdh3")), PeerClient::WebTorrent("13.24".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"M1-2-3--k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"M1-2-3--k/asdh3")), PeerClient::Mainline("1.2.3".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"M1-23-4-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"M1-23-4-k/asdh3")), PeerClient::Mainline("1.23.4".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"S3-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"S3-k/asdh3")), PeerClient::OtherWithPrefix("S3".into()) ); } diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 88bc11b..4a302c0 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -30,6 +30,7 @@ aquatic_udp_protocol.workspace = true anyhow = "1" blake3 = "1" cfg-if = "1" +compact_str = { version = "0.7", features = ["serde"] } constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 49aca5e..5e64376 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -136,6 +136,8 @@ impl PeerStatus { pub enum StatisticsMessage { Ipv4PeerHistogram(Histogram), Ipv6PeerHistogram(Histogram), + PeerAdded(PeerId), + PeerRemoved(PeerId), } pub struct Statistics { diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 6e67938..a61a846 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -161,10 +161,9 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, - /// Enable extended statistics (on peers per torrent) + /// Enable extended statistics (on peers per torrent and on peer clients) /// - /// Will increase time taken for torrent cleaning, since that's when - /// these statistics are collected. + /// Will increase time taken for request handling and torrent cleaning pub extended: bool, /// Print statistics to standard output pub print_to_stdout: bool, diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs index 3e3ed97..af25639 100644 --- a/aquatic_udp/src/workers/statistics/collector.rs +++ b/aquatic_udp/src/workers/statistics/collector.rs @@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Instant; +use crossbeam_channel::Receiver; use hdrhistogram::Histogram; use num_format::{Locale, ToFormattedString}; use serde::Serialize; diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index f54f08b..8a1d5ad 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -2,10 +2,12 @@ mod collector; use std::fs::File; use std::io::Write; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::Context; -use aquatic_common::PanicSentinel; +use aquatic_common::{IndexMap, PanicSentinel}; +use aquatic_udp_protocol::PeerClient; +use compact_str::{CompactString, ToCompactString}; use crossbeam_channel::Receiver; use serde::Serialize; use time::format_description::well_known::Rfc2822; @@ -35,6 +37,7 @@ struct TemplateData { ipv6: CollectedStatistics, last_updated: String, peer_update_interval: String, + peer_clients: Vec<(CompactString, CompactString, usize)>, } pub fn run_statistics_worker( @@ -68,13 +71,46 @@ pub fn run_statistics_worker( "6".into(), ); + let mut peer_clients: IndexMap = IndexMap::default(); + loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + let start_time = Instant::now(); for message in statistics_receiver.try_iter() { match message { StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), + StatisticsMessage::PeerAdded(peer_id) => { + peer_clients + .entry(peer_id.client()) + .or_insert((0, peer_id.first_8_bytes_hex())) + .0 += 1; + } + StatisticsMessage::PeerRemoved(peer_id) => { + let client = peer_id.client(); + + if let Some((count, _)) = peer_clients.get_mut(&client) { + if *count == 1 { + drop(count); + + peer_clients.remove(&client); + } else { + *count -= 1; + } + } + } + } + } + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint && config.statistics.extended { + for (peer_client, (count, first_8_bytes)) in peer_clients.iter() { + ::metrics::gauge!( + "aquatic_peer_clients", + *count as f64, + "client" => peer_client.to_string(), + "peer_id_prefix_hex" => first_8_bytes.to_string(), + ); } } @@ -107,6 +143,23 @@ pub fn run_statistics_worker( } if let Some(tt) = opt_tt.as_ref() { + let mut peer_clients = if config.statistics.extended { + peer_clients + .iter() + .map(|(peer_client, (count, first_8_bytes))| { + ( + peer_client.to_compact_string(), + first_8_bytes.to_owned(), + *count, + ) + }) + .collect() + } else { + Vec::new() + }; + + peer_clients.sort_unstable_by(|a, b| b.2.cmp(&a.2)); + let template_data = TemplateData { stylesheet: STYLESHEET_CONTENTS.to_string(), ipv4_active: config.network.ipv4_active(), @@ -118,12 +171,19 @@ pub fn run_statistics_worker( .format(&Rfc2822) .unwrap_or("(formatting error)".into()), peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval), + peer_clients, }; if let Err(err) = save_html_to_file(&config, tt, &template_data) { ::log::error!("Couldn't save statistics to file: {:#}", err) } } + + if let Some(time_remaining) = + Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed()) + { + ::std::thread::sleep(time_remaining); + } } } diff --git a/aquatic_udp/src/workers/swarm/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs index bab716e..d4bb3c5 100644 --- a/aquatic_udp/src/workers/swarm/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -50,6 +50,7 @@ pub fn run_swarm_worker( let response = handle_announce_request( &config, &mut rng, + &statistics_sender, &mut torrents.ipv4, request, ip, @@ -62,6 +63,7 @@ pub fn run_swarm_worker( let response = handle_announce_request( &config, &mut rng, + &statistics_sender, &mut torrents.ipv6, request, ip, @@ -88,28 +90,15 @@ pub fn run_swarm_worker( peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - let (ipv4, ipv6) = torrents.clean_and_get_statistics( + torrents.clean_and_update_statistics( &config, + &state, + &statistics_sender, &state.access_list, server_start_instant, + worker_index, ); - if config.statistics.active() { - state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); - - if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err) - } - } - if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err) - } - } - } - last_cleaning = now; } if config.statistics.active() @@ -131,6 +120,7 @@ pub fn run_swarm_worker( fn handle_announce_request( config: &Config, rng: &mut SmallRng, + statistics_sender: &Sender, torrents: &mut TorrentMap, request: AnnounceRequest, peer_ip: I, @@ -150,6 +140,8 @@ fn handle_announce_request( let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); torrent_data.update_peer( + config, + statistics_sender, request.peer_id, peer_ip, request.port, diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index e96416c..9c78021 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -1,5 +1,6 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; +use std::sync::atomic::Ordering; use std::sync::Arc; use aquatic_common::IndexMap; @@ -11,6 +12,7 @@ use aquatic_common::{ }; use aquatic_udp_protocol::*; +use crossbeam_channel::Sender; use hdrhistogram::Histogram; use rand::prelude::SmallRng; @@ -46,6 +48,8 @@ pub struct TorrentData { impl TorrentData { pub fn update_peer( &mut self, + config: &Config, + statistics_sender: &Sender, peer_id: PeerId, ip_address: I, port: Port, @@ -78,6 +82,20 @@ impl TorrentData { PeerStatus::Stopped => self.peers.remove(&peer_id), }; + if config.statistics.extended { + match (status, opt_removed_peer.is_some()) { + // We added a new peer + (PeerStatus::Leeching | PeerStatus::Seeding, false) => { + statistics_sender.try_send(StatisticsMessage::PeerAdded(peer_id)); + } + // We removed an existing peer + (PeerStatus::Stopped, true) => { + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer_id)); + } + _ => (), + } + } + if let Some(Peer { is_seeder: true, .. }) = opt_removed_peer @@ -117,12 +135,22 @@ impl TorrentData { } /// Remove inactive peers and reclaim space - fn clean(&mut self, now: SecondsSinceServerStart) { - self.peers.retain(|_, peer| { + fn clean( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) { + self.peers.retain(|peer_id, peer| { let keep = peer.valid_until.valid(now); - if (!keep) & peer.is_seeder { - self.num_seeders -= 1; + if !keep { + if peer.is_seeder { + self.num_seeders -= 1; + } + if config.statistics.extended { + statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)); + } } keep @@ -151,6 +179,7 @@ impl TorrentMap { fn clean_and_get_statistics( &mut self, config: &Config, + statistics_sender: &Sender, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, now: SecondsSinceServerStart, @@ -178,7 +207,7 @@ impl TorrentMap { return false; } - torrent.clean(now); + torrent.clean(config, statistics_sender, now); num_peers += torrent.peers.len(); @@ -225,28 +254,42 @@ impl Default for TorrentMaps { } impl TorrentMaps { - /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - pub fn clean_and_get_statistics( + /// Remove forbidden or inactive torrents, reclaim space and update statistics + pub fn clean_and_update_statistics( &mut self, config: &Config, + state: &State, + statistics_sender: &Sender, access_list: &Arc, server_start_instant: ServerStartInstant, - ) -> ( - (usize, Option>), - (usize, Option>), + worker_index: SwarmWorkerIndex, ) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; let now = server_start_instant.seconds_elapsed(); - let ipv4 = self - .ipv4 - .clean_and_get_statistics(config, &mut cache, mode, now); - let ipv6 = self - .ipv6 - .clean_and_get_statistics(config, &mut cache, mode, now); + let ipv4 = + self.ipv4 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let ipv6 = + self.ipv6 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - (ipv4, ipv6) + if config.statistics.active() { + state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); + + if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + } } } diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 2007d01..a27ba81 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -251,5 +251,30 @@ {{ endif }} {{ endif }} + + {{ if extended_active }} + +

Peer clients

+ + + + + + + + + + + {{ for value in peer_clients }} + + + + + + {{ endfor }} + +
ClientPeer ID prefix (hex)Count
{ value.0 }{ value.1 }{ value.2 }
+ + {{ endif }} diff --git a/aquatic_udp_protocol/Cargo.toml b/aquatic_udp_protocol/Cargo.toml index eb78d2b..033b65f 100644 --- a/aquatic_udp_protocol/Cargo.toml +++ b/aquatic_udp_protocol/Cargo.toml @@ -11,6 +11,8 @@ readme.workspace = true rust-version.workspace = true [dependencies] +aquatic_peer_id.workspace = true + byteorder = "1" either = "1" diff --git a/aquatic_udp_protocol/src/common.rs b/aquatic_udp_protocol/src/common.rs index 8aab39f..c99a0cb 100644 --- a/aquatic_udp_protocol/src/common.rs +++ b/aquatic_udp_protocol/src/common.rs @@ -1,6 +1,8 @@ use std::fmt::Debug; use std::net::{Ipv4Addr, Ipv6Addr}; +pub use aquatic_peer_id::{PeerClient, PeerId}; + pub trait Ip: Clone + Copy + Debug + PartialEq + Eq {} impl Ip for Ipv4Addr {} @@ -30,9 +32,6 @@ pub struct NumberOfDownloads(pub i32); #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct Port(pub u16); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, PartialOrd, Ord)] -pub struct PeerId(pub [u8; 20]); - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct PeerKey(pub u32); @@ -55,19 +54,6 @@ impl quickcheck::Arbitrary for InfoHash { } } -#[cfg(test)] -impl quickcheck::Arbitrary for PeerId { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - let mut bytes = [0u8; 20]; - - for byte in bytes.iter_mut() { - *byte = u8::arbitrary(g); - } - - Self(bytes) - } -} - #[cfg(test)] impl quickcheck::Arbitrary for ResponsePeer { fn arbitrary(g: &mut quickcheck::Gen) -> Self { diff --git a/aquatic_udp_protocol/src/request.rs b/aquatic_udp_protocol/src/request.rs index 16be9c8..0ed591c 100644 --- a/aquatic_udp_protocol/src/request.rs +++ b/aquatic_udp_protocol/src/request.rs @@ -5,6 +5,8 @@ use std::net::Ipv4Addr; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use either::Either; +use aquatic_peer_id::PeerId; + use super::common::*; const PROTOCOL_IDENTIFIER: i64 = 4_497_486_125_440;