diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c2a2f0..3d6e8f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog +## Unreleased + +### General + +#### Added + +* Add `aquatic_peer_id` crate with peer client information logic + +### aquatic_udp + +#### Added + +* Add support for reporting peer client information + +### aquatic_ws + +#### Added + +* Add support for reporting peer client information + ## 0.8.0 - 2023-03-17 ### General diff --git a/Cargo.lock b/Cargo.lock index 20f3262..5136f89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + [[package]] name = "aho-corasick" version = "1.0.1" @@ -179,6 +188,17 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "aquatic_peer_id" +version = "0.8.0" +dependencies = [ + "compact_str", + "hex", + "quickcheck", + "regex", + "serde", +] + [[package]] name = "aquatic_toml_config" version = "0.8.0" @@ -209,6 +229,7 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", + "compact_str", "constant_time_eq", "crossbeam-channel", "getrandom", @@ -220,6 +241,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "mio", "num-format", @@ -275,6 +297,7 @@ dependencies = [ name = "aquatic_udp_protocol" version = "0.8.0" dependencies = [ + "aquatic_peer_id", "byteorder", "either", "quickcheck", @@ -287,6 +310,7 @@ version = "0.8.0" dependencies = [ "anyhow", "aquatic_common", + "aquatic_peer_id", "aquatic_toml_config", "aquatic_ws_protocol", "async-tungstenite", @@ -300,6 +324,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "privdrop", "quickcheck", @@ -873,6 +898,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "env_logger" version = "0.8.4" @@ -1660,12 +1691,16 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "111cb375987443c3de8d503580b536f77dc8416d32db62d9456db5d93bd7ac47" dependencies = [ + "aho-corasick 0.7.20", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.2", + "indexmap", "metrics", "num_cpus", + "ordered-float", "quanta", + "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -1723,6 +1758,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.23.2" @@ -1864,6 +1908,15 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "ordered-float" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.5.0" @@ -2056,6 +2109,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2133,7 +2196,7 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.1", "memchr", "regex-syntax", ] diff --git a/Cargo.toml b/Cargo.toml index 1bd11d7..1ba315c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "aquatic_http", "aquatic_http_load_test", "aquatic_http_protocol", + "aquatic_peer_id", "aquatic_toml_config", "aquatic_toml_config_derive", "aquatic_udp", @@ -29,6 +30,7 @@ rust-version = "1.64" aquatic_common = { version = "0.8.0", path = "./aquatic_common" } aquatic_http_protocol = { version = "0.8.0", path = "./aquatic_http_protocol" } aquatic_http = { version = "0.8.0", path = "./aquatic_http" } +aquatic_peer_id = { version = "0.8.0", path = "./aquatic_peer_id" } aquatic_toml_config = { version = "0.8.0", path = "./aquatic_toml_config" } aquatic_toml_config_derive = { version = "0.8.0", path = "./aquatic_toml_config_derive" } aquatic_udp_protocol = { version = "0.8.0", path = "./aquatic_udp_protocol" } diff --git a/aquatic_peer_id/Cargo.toml b/aquatic_peer_id/Cargo.toml new file mode 100644 index 0000000..5563c1d --- /dev/null +++ b/aquatic_peer_id/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "aquatic_peer_id" +description = "BitTorrent peer ID handling" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +rust-version.workspace = true + +[lib] +name = "aquatic_peer_id" + +[features] +default = ["quickcheck"] + +[dependencies] +compact_str = "0.7" +hex = "0.4" +regex = "1" +serde = { version = "1", features = ["derive"] } +quickcheck = { version = "1", optional = true } \ No newline at end of file diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs new file mode 100644 index 0000000..ce1a463 --- /dev/null +++ b/aquatic_peer_id/src/lib.rs @@ -0,0 +1,277 @@ +use std::{borrow::Cow, fmt::Display, sync::OnceLock}; + +use compact_str::{format_compact, CompactString}; +use regex::bytes::Regex; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct PeerId(pub [u8; 20]); + +impl PeerId { + pub fn client(&self) -> PeerClient { + PeerClient::from_peer_id(self) + } + pub fn first_8_bytes_hex(&self) -> CompactString { + let mut buf = [0u8; 16]; + + hex::encode_to_slice(&self.0[..8], &mut buf) + .expect("PeerId.first_8_bytes_hex buffer too small"); + + CompactString::from_utf8_lossy(&buf) + } +} + +#[non_exhaustive] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum PeerClient { + BitTorrent(CompactString), + Deluge(CompactString), + LibTorrentRakshasa(CompactString), + LibTorrentRasterbar(CompactString), + QBitTorrent(CompactString), + Transmission(CompactString), + UTorrent(CompactString), + UTorrentEmbedded(CompactString), + UTorrentMac(CompactString), + UTorrentWeb(CompactString), + Vuze(CompactString), + WebTorrent(CompactString), + WebTorrentDesktop(CompactString), + Mainline(CompactString), + OtherWithPrefixAndVersion { + prefix: CompactString, + version: CompactString, + }, + OtherWithPrefix(CompactString), + Other, +} + +impl PeerClient { + pub fn from_prefix_and_version(prefix: &[u8], version: &[u8]) -> Self { + fn three_digits_plus_prerelease(v1: char, v2: char, v3: char, v4: char) -> CompactString { + let prerelease: Cow = match v4 { + 'd' | 'D' => " dev".into(), + 'a' | 'A' => " alpha".into(), + 'b' | 'B' => " beta".into(), + 'r' | 'R' => " rc".into(), + 's' | 'S' => " stable".into(), + other => format_compact!("{}", other).into(), + }; + + format_compact!("{}.{}.{}{}", v1, v2, v3, prerelease) + } + + fn webtorrent(v1: char, v2: char, v3: char, v4: char) -> CompactString { + let major = if v1 == '0' { + format_compact!("{}", v2) + } else { + format_compact!("{}{}", v1, v2) + }; + + let minor = if v3 == '0' { + format_compact!("{}", v4) + } else { + format_compact!("{}{}", v3, v4) + }; + + format_compact!("{}.{}", major, minor) + } + + if let [v1, v2, v3, v4] = version { + let (v1, v2, v3, v4) = (*v1 as char, *v2 as char, *v3 as char, *v4 as char); + + match prefix { + b"AZ" => Self::Vuze(format_compact!("{}.{}.{}.{}", v1, v2, v3, v4)), + b"BT" => Self::BitTorrent(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"DE" => Self::Deluge(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"lt" => Self::LibTorrentRakshasa(format_compact!("{}.{}{}.{}", v1, v2, v3, v4)), + b"LT" => Self::LibTorrentRasterbar(format_compact!("{}.{}{}.{}", v1, v2, v3, v4)), + b"qB" => Self::QBitTorrent(format_compact!("{}.{}.{}", v1, v2, v3)), + b"TR" => { + let v = match (v1, v2, v3, v4) { + ('0', '0', '0', v4) => format_compact!("0.{}", v4), + ('0', '0', v3, v4) => format_compact!("0.{}{}", v3, v4), + _ => format_compact!("{}.{}{}", v1, v2, v3), + }; + + Self::Transmission(v) + } + b"UE" => Self::UTorrentEmbedded(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"UM" => Self::UTorrentMac(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"UT" => Self::UTorrent(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"UW" => Self::UTorrentWeb(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"WD" => Self::WebTorrentDesktop(webtorrent(v1, v2, v3, v4)), + b"WW" => Self::WebTorrent(webtorrent(v1, v2, v3, v4)), + _ => Self::OtherWithPrefixAndVersion { + prefix: CompactString::from_utf8_lossy(prefix), + version: CompactString::from_utf8_lossy(version), + }, + } + } else { + match (prefix, version) { + (b"M", &[major, b'-', minor, b'-', patch, b'-']) => Self::Mainline( + format_compact!("{}.{}.{}", major as char, minor as char, patch as char), + ), + (b"M", &[major, b'-', minor1, minor2, b'-', patch]) => { + Self::Mainline(format_compact!( + "{}.{}{}.{}", + major as char, + minor1 as char, + minor2 as char, + patch as char + )) + } + _ => Self::OtherWithPrefixAndVersion { + prefix: CompactString::from_utf8_lossy(prefix), + version: CompactString::from_utf8_lossy(version), + }, + } + } + } + + pub fn from_peer_id(peer_id: &PeerId) -> Self { + static AZ_RE: OnceLock = OnceLock::new(); + + if let Some(caps) = AZ_RE + .get_or_init(|| { + Regex::new(r"^\-(?P[a-zA-Z]{2})(?P[0-9]{3}[0-9a-zA-Z])") + .expect("compile AZ_RE regex") + }) + .captures(&peer_id.0) + { + return Self::from_prefix_and_version(&caps["name"], &caps["version"]); + } + + static MAINLINE_RE: OnceLock = OnceLock::new(); + + if let Some(caps) = MAINLINE_RE + .get_or_init(|| { + Regex::new(r"^(?P[a-zA-Z])(?P[0-9\-]{6})\-") + .expect("compile MAINLINE_RE regex") + }) + .captures(&peer_id.0) + { + return Self::from_prefix_and_version(&caps["name"], &caps["version"]); + } + + static PREFIX_RE: OnceLock = OnceLock::new(); + + if let Some(caps) = PREFIX_RE + .get_or_init(|| { + Regex::new(r"^(?P[a-zA-Z0-9\-]+)\-").expect("compile PREFIX_RE regex") + }) + .captures(&peer_id.0) + { + return Self::OtherWithPrefix(CompactString::from_utf8_lossy(&caps["prefix"])); + } + + Self::Other + } +} + +impl Display for PeerClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::BitTorrent(v) => write!(f, "BitTorrent {}", v.as_str()), + Self::Deluge(v) => write!(f, "Deluge {}", v.as_str()), + Self::LibTorrentRakshasa(v) => write!(f, "lt (rakshasa) {}", v.as_str()), + Self::LibTorrentRasterbar(v) => write!(f, "lt (rasterbar) {}", v.as_str()), + Self::QBitTorrent(v) => write!(f, "QBitTorrent {}", v.as_str()), + Self::Transmission(v) => write!(f, "Transmission {}", v.as_str()), + Self::UTorrent(v) => write!(f, "µTorrent {}", v.as_str()), + Self::UTorrentEmbedded(v) => write!(f, "µTorrent Emb. {}", v.as_str()), + Self::UTorrentMac(v) => write!(f, "µTorrent Mac {}", v.as_str()), + Self::UTorrentWeb(v) => write!(f, "µTorrent Web {}", v.as_str()), + Self::Vuze(v) => write!(f, "Vuze {}", v.as_str()), + Self::WebTorrent(v) => write!(f, "WebTorrent {}", v.as_str()), + Self::WebTorrentDesktop(v) => write!(f, "WebTorrent Desktop {}", v.as_str()), + Self::Mainline(v) => write!(f, "Mainline {}", v.as_str()), + Self::OtherWithPrefixAndVersion { prefix, version } => { + write!(f, "Other ({}) ({})", prefix.as_str(), version.as_str()) + } + Self::OtherWithPrefix(prefix) => write!(f, "Other ({})", prefix.as_str()), + Self::Other => f.write_str("Other"), + } + } +} + +#[cfg(feature = "quickcheck")] +impl quickcheck::Arbitrary for PeerId { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let mut bytes = [0u8; 20]; + + for byte in bytes.iter_mut() { + *byte = u8::arbitrary(g); + } + + Self(bytes) + } +} + +#[cfg(feature = "quickcheck")] +#[cfg(test)] +mod tests { + use super::*; + + fn create_peer_id(bytes: &[u8]) -> PeerId { + let mut peer_id = PeerId([0; 20]); + + let len = bytes.len(); + + (&mut peer_id.0[..len]).copy_from_slice(bytes); + + peer_id + } + + #[test] + fn test_client_from_peer_id() { + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-lt1234-k/asdh3")), + PeerClient::LibTorrentRakshasa("1.23.4".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-DE123s-k/asdh3")), + PeerClient::Deluge("1.2.3 stable".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-DE123r-k/asdh3")), + PeerClient::Deluge("1.2.3 rc".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-UT123A-k/asdh3")), + PeerClient::UTorrent("1.2.3 alpha".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-TR0012-k/asdh3")), + PeerClient::Transmission("0.12".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-TR1212-k/asdh3")), + PeerClient::Transmission("1.21".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-WW0102-k/asdh3")), + PeerClient::WebTorrent("1.2".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-WW1302-k/asdh3")), + PeerClient::WebTorrent("13.2".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-WW1324-k/asdh3")), + PeerClient::WebTorrent("13.24".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"M1-2-3--k/asdh3")), + PeerClient::Mainline("1.2.3".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"M1-23-4-k/asdh3")), + PeerClient::Mainline("1.23.4".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"S3-k/asdh3")), + PeerClient::OtherWithPrefix("S3".into()) + ); + } +} diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 88bc11b..7f779bc 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -19,7 +19,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] cpu-pinning = ["aquatic_common/hwloc"] -prometheus = ["metrics", "metrics-exporter-prometheus"] +prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"] io-uring = ["dep:io-uring"] [dependencies] @@ -30,6 +30,7 @@ aquatic_udp_protocol.workspace = true anyhow = "1" blake3 = "1" cfg-if = "1" +compact_str = "0.7" constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" @@ -40,6 +41,7 @@ io-uring = { version = "0.6", optional = true } libc = "0.2" log = "0.4" metrics = { version = "0.21", optional = true } +metrics-util = { version = "0.15", optional = true } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 49aca5e..5e64376 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -136,6 +136,8 @@ impl PeerStatus { pub enum StatisticsMessage { Ipv4PeerHistogram(Histogram), Ipv6PeerHistogram(Histogram), + PeerAdded(PeerId), + PeerRemoved(PeerId), } pub struct Statistics { diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 6e67938..4aac6cb 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -161,11 +161,17 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, - /// Enable extended statistics (on peers per torrent) + /// Collect statistics on number of peers per torrent /// - /// Will increase time taken for torrent cleaning, since that's when - /// these statistics are collected. - pub extended: bool, + /// Will increase time taken for torrent cleaning. + pub torrent_peer_histograms: bool, + /// Collect statistics on peer clients. + /// + /// Also, see `prometheus_peer_id_prefixes`. + /// + /// Expect a certain CPU hit (maybe 5% higher consumption) and a bit higher + /// memory use + pub peer_clients: bool, /// Print statistics to standard output pub print_to_stdout: bool, /// Save statistics as HTML to a file @@ -178,6 +184,14 @@ pub struct StatisticsConfig { /// Address to run prometheus endpoint on #[cfg(feature = "prometheus")] pub prometheus_endpoint_address: SocketAddr, + /// Serve information on all peer id prefixes on the prometheus endpoint. + /// + /// Requires `peer_clients` to be activated. + /// + /// May consume quite a bit of CPU and RAM, since data on every single peer + /// client will be reported continuously on the endpoint + #[cfg(feature = "prometheus")] + pub prometheus_peer_id_prefixes: bool, } impl StatisticsConfig { @@ -199,7 +213,8 @@ impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 5, - extended: false, + torrent_peer_histograms: false, + peer_clients: false, print_to_stdout: false, write_html_to_file: false, html_file_path: "tmp/statistics.html".into(), @@ -207,6 +222,8 @@ impl Default for StatisticsConfig { run_prometheus_endpoint: false, #[cfg(feature = "prometheus")] prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + #[cfg(feature = "prometheus")] + prometheus_peer_id_prefixes: false, } } } diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index a869c4f..6f8ffe6 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -4,6 +4,7 @@ pub mod workers; use std::collections::BTreeMap; use std::thread::Builder; +use std::time::Duration; use anyhow::Context; use crossbeam_channel::{bounded, unbounded}; @@ -143,8 +144,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { #[cfg(feature = "prometheus")] if config.statistics.run_prometheus_endpoint { use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; PrometheusBuilder::new() + .idle_timeout( + MetricKindMask::ALL, + Some(Duration::from_secs(config.statistics.interval * 2)), + ) .with_http_listener(config.statistics.prometheus_endpoint_address) .install() .with_context(|| { diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs index 3e3ed97..33c0a4e 100644 --- a/aquatic_udp/src/workers/statistics/collector.rs +++ b/aquatic_udp/src/workers/statistics/collector.rs @@ -134,7 +134,7 @@ impl StatisticsCollector { ); } - if config.statistics.extended { + if config.statistics.torrent_peer_histograms { self.last_complete_histogram .update_metrics(self.ip_version.clone()); } diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index f54f08b..c950850 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -2,11 +2,14 @@ mod collector; use std::fs::File; use std::io::Write; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::Context; -use aquatic_common::PanicSentinel; +use aquatic_common::{IndexMap, PanicSentinel}; +use aquatic_udp_protocol::{PeerClient, PeerId}; +use compact_str::CompactString; use crossbeam_channel::Receiver; +use num_format::{Locale, ToFormattedString}; use serde::Serialize; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; @@ -35,6 +38,7 @@ struct TemplateData { ipv6: CollectedStatistics, last_updated: String, peer_update_interval: String, + peer_clients: Vec<(String, String)>, } pub fn run_statistics_worker( @@ -43,6 +47,17 @@ pub fn run_statistics_worker( shared_state: State, statistics_receiver: Receiver, ) { + let process_peer_client_data = { + let mut collect = config.statistics.write_html_to_file; + + #[cfg(feature = "prometheus")] + { + collect |= config.statistics.run_prometheus_endpoint; + } + + collect & config.statistics.peer_clients + }; + let opt_tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); @@ -68,13 +83,36 @@ pub fn run_statistics_worker( "6".into(), ); + // Store a count to enable not removing peers from the count completely + // just because they were removed from one torrent + let mut peers: IndexMap = IndexMap::default(); + loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + let start_time = Instant::now(); for message in statistics_receiver.try_iter() { match message { StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), + StatisticsMessage::PeerAdded(peer_id) => { + if process_peer_client_data { + peers + .entry(peer_id) + .or_insert_with(|| (0, peer_id.client(), peer_id.first_8_bytes_hex())) + .0 += 1; + } + } + StatisticsMessage::PeerRemoved(peer_id) => { + if process_peer_client_data { + if let Some((count, _, _)) = peers.get_mut(&peer_id) { + *count -= 1; + + if *count == 0 { + peers.remove(&peer_id); + } + } + } + } } } @@ -87,6 +125,61 @@ pub fn run_statistics_worker( &config, ); + let peer_clients = if process_peer_client_data { + let mut clients: IndexMap = IndexMap::default(); + + #[cfg(feature = "prometheus")] + let mut prefixes: IndexMap = IndexMap::default(); + + // Only count peer_ids once, even if they are in multiple torrents + for (_, peer_client, prefix) in peers.values() { + *clients.entry(peer_client.to_owned()).or_insert(0) += 1; + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_id_prefixes + { + *prefixes.entry(prefix.to_owned()).or_insert(0) += 1; + } + } + + clients.sort_unstable_by(|_, a, _, b| b.cmp(a)); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_id_prefixes + { + for (prefix, count) in prefixes { + ::metrics::gauge!( + "aquatic_peer_id_prefixes", + count as f64, + "prefix_hex" => prefix.to_string(), + ); + } + } + + let mut client_vec = Vec::with_capacity(clients.len()); + + for (client, count) in clients { + if config.statistics.write_html_to_file { + client_vec.push((client.to_string(), count.to_formatted_string(&Locale::en))); + } + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_peer_clients", + count as f64, + "client" => client.to_string(), + ); + } + } + + client_vec + } else { + Vec::new() + }; + if config.statistics.print_to_stdout { println!("General:"); println!( @@ -111,19 +204,32 @@ pub fn run_statistics_worker( stylesheet: STYLESHEET_CONTENTS.to_string(), ipv4_active: config.network.ipv4_active(), ipv6_active: config.network.ipv6_active(), - extended_active: config.statistics.extended, + extended_active: config.statistics.torrent_peer_histograms, ipv4: statistics_ipv4, ipv6: statistics_ipv6, last_updated: OffsetDateTime::now_utc() .format(&Rfc2822) .unwrap_or("(formatting error)".into()), peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval), + peer_clients, }; if let Err(err) = save_html_to_file(&config, tt, &template_data) { ::log::error!("Couldn't save statistics to file: {:#}", err) } } + + peers.shrink_to_fit(); + + if let Some(time_remaining) = + Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed()) + { + ::std::thread::sleep(time_remaining); + } else { + ::log::warn!( + "statistics interval not long enough to process all data, output may be misleading" + ); + } } } @@ -160,7 +266,7 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) { statistics.num_peers, config.cleaning.torrent_cleaning_interval ); - if config.statistics.extended { + if config.statistics.torrent_peer_histograms { println!( " peers per torrent (updated every {}s)", config.cleaning.torrent_cleaning_interval diff --git a/aquatic_udp/src/workers/swarm/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs index bab716e..d4bb3c5 100644 --- a/aquatic_udp/src/workers/swarm/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -50,6 +50,7 @@ pub fn run_swarm_worker( let response = handle_announce_request( &config, &mut rng, + &statistics_sender, &mut torrents.ipv4, request, ip, @@ -62,6 +63,7 @@ pub fn run_swarm_worker( let response = handle_announce_request( &config, &mut rng, + &statistics_sender, &mut torrents.ipv6, request, ip, @@ -88,28 +90,15 @@ pub fn run_swarm_worker( peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - let (ipv4, ipv6) = torrents.clean_and_get_statistics( + torrents.clean_and_update_statistics( &config, + &state, + &statistics_sender, &state.access_list, server_start_instant, + worker_index, ); - if config.statistics.active() { - state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); - - if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err) - } - } - if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err) - } - } - } - last_cleaning = now; } if config.statistics.active() @@ -131,6 +120,7 @@ pub fn run_swarm_worker( fn handle_announce_request( config: &Config, rng: &mut SmallRng, + statistics_sender: &Sender, torrents: &mut TorrentMap, request: AnnounceRequest, peer_ip: I, @@ -150,6 +140,8 @@ fn handle_announce_request( let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); torrent_data.update_peer( + config, + statistics_sender, request.peer_id, peer_ip, request.port, diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index e96416c..777dba4 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -1,5 +1,6 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; +use std::sync::atomic::Ordering; use std::sync::Arc; use aquatic_common::IndexMap; @@ -11,6 +12,7 @@ use aquatic_common::{ }; use aquatic_udp_protocol::*; +use crossbeam_channel::Sender; use hdrhistogram::Histogram; use rand::prelude::SmallRng; @@ -46,6 +48,8 @@ pub struct TorrentData { impl TorrentData { pub fn update_peer( &mut self, + config: &Config, + statistics_sender: &Sender, peer_id: PeerId, ip_address: I, port: Port, @@ -78,6 +82,30 @@ impl TorrentData { PeerStatus::Stopped => self.peers.remove(&peer_id), }; + if config.statistics.peer_clients { + match (status, opt_removed_peer.is_some()) { + // We added a new peer + (PeerStatus::Leeching | PeerStatus::Seeding, false) => { + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerAdded(peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerAdded"); + } + } + // We removed an existing peer + (PeerStatus::Stopped, true) => { + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } + } + _ => (), + } + } + if let Some(Peer { is_seeder: true, .. }) = opt_removed_peer @@ -117,12 +145,27 @@ impl TorrentData { } /// Remove inactive peers and reclaim space - fn clean(&mut self, now: SecondsSinceServerStart) { - self.peers.retain(|_, peer| { + fn clean( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) { + self.peers.retain(|peer_id, peer| { let keep = peer.valid_until.valid(now); - if (!keep) & peer.is_seeder { - self.num_seeders -= 1; + if !keep { + if peer.is_seeder { + self.num_seeders -= 1; + } + if config.statistics.peer_clients { + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } + } } keep @@ -151,13 +194,15 @@ impl TorrentMap { fn clean_and_get_statistics( &mut self, config: &Config, + statistics_sender: &Sender, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, now: SecondsSinceServerStart, ) -> (usize, Option>) { let mut num_peers = 0; - let mut opt_histogram: Option> = if config.statistics.extended { + let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms + { match Histogram::new(3) { Ok(histogram) => Some(histogram), Err(err) => { @@ -178,7 +223,7 @@ impl TorrentMap { return false; } - torrent.clean(now); + torrent.clean(config, statistics_sender, now); num_peers += torrent.peers.len(); @@ -225,28 +270,42 @@ impl Default for TorrentMaps { } impl TorrentMaps { - /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - pub fn clean_and_get_statistics( + /// Remove forbidden or inactive torrents, reclaim space and update statistics + pub fn clean_and_update_statistics( &mut self, config: &Config, + state: &State, + statistics_sender: &Sender, access_list: &Arc, server_start_instant: ServerStartInstant, - ) -> ( - (usize, Option>), - (usize, Option>), + worker_index: SwarmWorkerIndex, ) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; let now = server_start_instant.seconds_elapsed(); - let ipv4 = self - .ipv4 - .clean_and_get_statistics(config, &mut cache, mode, now); - let ipv6 = self - .ipv6 - .clean_and_get_statistics(config, &mut cache, mode, now); + let ipv4 = + self.ipv4 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let ipv6 = + self.ipv6 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - (ipv4, ipv6) + if config.statistics.active() { + state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); + + if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + } } } diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 2007d01..0fe8930 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -251,5 +251,28 @@ {{ endif }} {{ endif }} + + {{ if extended_active }} + +

Peer clients

+ + + + + + + + + + {{ for value in peer_clients }} + + + + + {{ endfor }} + +
ClientCount
{ value.0 }{ value.1 }
+ + {{ endif }} diff --git a/aquatic_udp_protocol/Cargo.toml b/aquatic_udp_protocol/Cargo.toml index eb78d2b..033b65f 100644 --- a/aquatic_udp_protocol/Cargo.toml +++ b/aquatic_udp_protocol/Cargo.toml @@ -11,6 +11,8 @@ readme.workspace = true rust-version.workspace = true [dependencies] +aquatic_peer_id.workspace = true + byteorder = "1" either = "1" diff --git a/aquatic_udp_protocol/src/common.rs b/aquatic_udp_protocol/src/common.rs index 8aab39f..c99a0cb 100644 --- a/aquatic_udp_protocol/src/common.rs +++ b/aquatic_udp_protocol/src/common.rs @@ -1,6 +1,8 @@ use std::fmt::Debug; use std::net::{Ipv4Addr, Ipv6Addr}; +pub use aquatic_peer_id::{PeerClient, PeerId}; + pub trait Ip: Clone + Copy + Debug + PartialEq + Eq {} impl Ip for Ipv4Addr {} @@ -30,9 +32,6 @@ pub struct NumberOfDownloads(pub i32); #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct Port(pub u16); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, PartialOrd, Ord)] -pub struct PeerId(pub [u8; 20]); - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct PeerKey(pub u32); @@ -55,19 +54,6 @@ impl quickcheck::Arbitrary for InfoHash { } } -#[cfg(test)] -impl quickcheck::Arbitrary for PeerId { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - let mut bytes = [0u8; 20]; - - for byte in bytes.iter_mut() { - *byte = u8::arbitrary(g); - } - - Self(bytes) - } -} - #[cfg(test)] impl quickcheck::Arbitrary for ResponsePeer { fn arbitrary(g: &mut quickcheck::Gen) -> Self { diff --git a/aquatic_udp_protocol/src/request.rs b/aquatic_udp_protocol/src/request.rs index 16be9c8..0ed591c 100644 --- a/aquatic_udp_protocol/src/request.rs +++ b/aquatic_udp_protocol/src/request.rs @@ -5,6 +5,8 @@ use std::net::Ipv4Addr; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use either::Either; +use aquatic_peer_id::PeerId; + use super::common::*; const PROTOCOL_IDENTIFIER: i64 = 4_497_486_125_440; diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 7a65872..90b656c 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -19,10 +19,11 @@ name = "aquatic_ws" [features] default = ["prometheus"] prometheus = ["metrics", "metrics-exporter-prometheus"] -metrics = ["dep:metrics"] +metrics = ["dep:metrics", "metrics-util"] [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } +aquatic_peer_id.workspace = true aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true @@ -37,6 +38,7 @@ hashbrown = { version = "0.13", features = ["serde"] } httparse = "1" log = "0.4" metrics = { version = "0.21", optional = true } +metrics-util = { version = "0.15", optional = true } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 42b34f7..25651e1 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -156,6 +156,16 @@ pub struct MetricsConfig { pub prometheus_endpoint_address: SocketAddr, /// Update metrics for torrent count this often (seconds) pub torrent_count_update_interval: u64, + /// Serve information on peer clients + /// + /// Expect a certain CPU hit + pub peer_clients: bool, + /// Serve information on all peer id prefixes + /// + /// Requires `peer_clients` to be activated. + /// + /// Expect a certain CPU hit + pub peer_id_prefixes: bool, } #[cfg(feature = "metrics")] @@ -165,6 +175,8 @@ impl Default for MetricsConfig { run_prometheus_endpoint: false, prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), torrent_count_update_interval: 10, + peer_clients: false, + peer_id_prefixes: false, } } } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 1263175..423b006 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -3,6 +3,7 @@ pub mod config; pub mod workers; use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; @@ -39,7 +40,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { if config.metrics.run_prometheus_endpoint { use metrics_exporter_prometheus::PrometheusBuilder; + let idle_timeout = config + .cleaning + .connection_cleaning_interval + .max(config.cleaning.torrent_cleaning_interval) + .max(config.metrics.torrent_count_update_interval) + * 2; + PrometheusBuilder::new() + .idle_timeout( + metrics_util::MetricKindMask::GAUGE, + Some(Duration::from_secs(idle_timeout)), + ) .with_http_listener(config.metrics.prometheus_endpoint_address) .install() .with_context(|| { diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index f0c76ae..09c1884 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -11,6 +11,7 @@ use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, A use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{PanicSentinel, ServerStartInstant}; +use aquatic_peer_id::PeerClient; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -51,6 +52,7 @@ struct ConnectionReference { valid_until: ValidUntil, announced_info_hashes: HashMap, ip_version: IpVersion, + opt_peer_client: Option<(PeerClient, String)>, } pub async fn run_socket_worker( @@ -154,6 +156,7 @@ pub async fn run_socket_worker( ), announced_info_hashes: Default::default(), ip_version, + opt_peer_client: None, }); ::log::trace!("accepting stream, assigning id {}", key); @@ -221,6 +224,23 @@ pub async fn run_socket_worker( .await .unwrap(); } + + #[cfg(feature = "prometheus")] + if let Some((peer_client, prefix)) = reference.opt_peer_client { + ::metrics::decrement_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => peer_client.to_string(), + ); + + if config.metrics.peer_id_prefixes { + ::metrics::decrement_gauge!( + "aquatic_peer_id_prefixes", + 1.0, + "prefix_hex" => prefix.to_string(), + ); + } + } } }), tq_regular) .unwrap() @@ -246,6 +266,27 @@ async fn clean_connections( connection_slab.borrow_mut().retain(|_, reference| { if reference.valid_until.valid(now) { + #[cfg(feature = "prometheus")] + if let Some((peer_client, prefix)) = &reference.opt_peer_client { + // As long as connection is still alive, increment peer client + // gauges by zero to prevent them from being removed due to + // idleness + + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 0.0, + "client" => peer_client.to_string(), + ); + + if config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 0.0, + "prefix_hex" => prefix.to_string(), + ); + } + } + true } else { if let Some(ref handle) = reference.task_handle { @@ -258,6 +299,31 @@ async fn clean_connections( connection_slab.borrow_mut().shrink_to_fit(); + #[cfg(feature = "metrics")] + { + // Increment gauges by zero to prevent them from being removed due to + // idleness + + let worker_index = WORKER_INDEX.with(|index| index.get()).to_string(); + + if config.network.address.is_ipv4() || !config.network.only_ipv6 { + ::metrics::increment_gauge!( + "aquatic_active_connections", + 0.0, + "ip_version" => "4", + "worker_index" => worker_index.clone(), + ); + } + if config.network.address.is_ipv6() { + ::metrics::increment_gauge!( + "aquatic_active_connections", + 0.0, + "ip_version" => "6", + "worker_index" => worker_index, + ); + } + } + Some(Duration::from_secs( config.cleaning.connection_cleaning_interval, )) @@ -575,6 +641,33 @@ impl ConnectionReader { } } Entry::Vacant(entry) => { + #[cfg(feature = "prometheus")] + if self.config.metrics.run_prometheus_endpoint + && self.config.metrics.peer_clients + && connection_reference.opt_peer_client.is_none() + { + let peer_id = + aquatic_peer_id::PeerId(announce_request.peer_id.0); + let client = peer_id.client(); + let prefix = peer_id.first_8_bytes_hex().to_string(); + + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + ); + + if self.config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 1.0, + "prefix_hex" => prefix.to_string(), + ); + } + + connection_reference.opt_peer_client = Some((client, prefix)); + }; + entry.insert(announce_request.peer_id); } }