From 3ca21390dff1afd555cbc437e52832baac19171f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 4 Jun 2023 16:10:13 +0200 Subject: [PATCH 01/20] Add aquatic_peer_id crate for peer client parsing --- Cargo.lock | 9 ++ Cargo.toml | 2 + aquatic_peer_id/Cargo.toml | 18 ++++ aquatic_peer_id/src/lib.rs | 166 +++++++++++++++++++++++++++++++++++++ 4 files changed, 195 insertions(+) create mode 100644 aquatic_peer_id/Cargo.toml create mode 100644 aquatic_peer_id/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 20f3262..b6d2301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,6 +179,15 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "aquatic_peer_id" +version = "0.8.0" +dependencies = [ + "compact_str", + "regex", + "serde", +] + [[package]] name = "aquatic_toml_config" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 1bd11d7..1ba315c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "aquatic_http", "aquatic_http_load_test", "aquatic_http_protocol", + "aquatic_peer_id", "aquatic_toml_config", "aquatic_toml_config_derive", "aquatic_udp", @@ -29,6 +30,7 @@ rust-version = "1.64" aquatic_common = { version = "0.8.0", path = "./aquatic_common" } aquatic_http_protocol = { version = "0.8.0", path = "./aquatic_http_protocol" } aquatic_http = { version = "0.8.0", path = "./aquatic_http" } +aquatic_peer_id = { version = "0.8.0", path = "./aquatic_peer_id" } aquatic_toml_config = { version = "0.8.0", path = "./aquatic_toml_config" } aquatic_toml_config_derive = { version = "0.8.0", path = "./aquatic_toml_config_derive" } aquatic_udp_protocol = { version = "0.8.0", path = "./aquatic_udp_protocol" } diff --git a/aquatic_peer_id/Cargo.toml b/aquatic_peer_id/Cargo.toml new file mode 100644 index 0000000..3074e6b --- /dev/null +++ b/aquatic_peer_id/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "aquatic_peer_id" +description = "BitTorrent peer ID handling" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +readme.workspace = true +rust-version.workspace = true + +[lib] +name = "aquatic_peer_id" + +[dependencies] +compact_str = "0.7" +regex = "1" +serde = { version = "1", features = ["derive"] } \ No newline at end of file diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs new file mode 100644 index 0000000..0371e22 --- /dev/null +++ b/aquatic_peer_id/src/lib.rs @@ -0,0 +1,166 @@ +use std::{fmt::Display, sync::OnceLock}; + +use compact_str::CompactString; +use regex::bytes::Regex; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct PeerId(pub [u8; 20]); + +#[non_exhaustive] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum PeerClient { + BitTorrent(CompactString), + Deluge(CompactString), + LibTorrentRakshasa(CompactString), + LibTorrentRasterbar(CompactString), + QBitTorrent(CompactString), + Transmission(CompactString), + UTorrent(CompactString), + UTorrentEmbedded(CompactString), + UTorrentMac(CompactString), + UTorrentWeb(CompactString), + Vuze(CompactString), + WebTorrent(CompactString), + WebTorrentDesktop(CompactString), + Mainline(CompactString), + OtherWithPrefixAndVersion { + prefix: CompactString, + version: CompactString, + }, + OtherWithPrefix(CompactString), + Other, +} + +impl PeerClient { + pub fn from_prefix_and_version(prefix: &[u8], version: &[u8]) -> Option { + let version = CompactString::from_utf8(version).ok()?; + + match prefix { + b"AZ" => Some(Self::Vuze(version)), + b"BT" => Some(Self::BitTorrent(version)), + b"DE" => Some(Self::Deluge(version)), + b"lt" => Some(Self::LibTorrentRakshasa(version)), + b"LT" => Some(Self::LibTorrentRasterbar(version)), + b"qB" => Some(Self::QBitTorrent(version)), + b"TR" => Some(Self::Transmission(version)), + b"UE" => Some(Self::UTorrentEmbedded(version)), + b"UM" => Some(Self::UTorrentMac(version)), + b"UT" => Some(Self::UTorrent(version)), + b"UW" => Some(Self::UTorrentWeb(version)), + b"WD" => Some(Self::WebTorrentDesktop(version)), + b"WW" => Some(Self::WebTorrent(version)), + b"M" => Some(Self::Mainline(version)), + name => Some(Self::OtherWithPrefixAndVersion { + prefix: CompactString::from_utf8(name).ok()?, + version, + }), + } + } + + pub fn from_peer_id(peer_id: PeerId) -> Self { + static AZ_RE: OnceLock = OnceLock::new(); + + if let Some(caps) = AZ_RE + .get_or_init(|| { + Regex::new(r"^\-(?P[a-zA-Z]{2})(?P[0-9A-Z]{4})") + .expect("compile AZ_RE regex") + }) + .captures(&peer_id.0) + { + if let Some(client) = Self::from_prefix_and_version(&caps["name"], &caps["version"]) { + return client; + } + } + + static MAINLINE_RE: OnceLock = OnceLock::new(); + + if let Some(caps) = MAINLINE_RE + .get_or_init(|| { + Regex::new(r"^(?P[a-zA-Z])(?P[0-9\-]{6})\-") + .expect("compile MAINLINE_RE regex") + }) + .captures(&peer_id.0) + { + if let Some(client) = Self::from_prefix_and_version(&caps["name"], &caps["version"]) { + return client; + } + } + + static PREFIX_RE: OnceLock = OnceLock::new(); + + if let Some(caps) = PREFIX_RE + .get_or_init(|| { + Regex::new(r"^(?P[a-zA-Z0-9\-]*)\-").expect("compile PREFIX_RE regex") + }) + .captures(&peer_id.0) + { + if let Ok(prefix) = CompactString::from_utf8(&caps["prefix"]) { + return Self::OtherWithPrefix(prefix); + } + } + + Self::Other + } +} + +impl Display for PeerClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::BitTorrent(v) => write!(f, "BitTorrent ({})", v.as_str()), + Self::Deluge(v) => write!(f, "Deluge ({})", v.as_str()), + Self::LibTorrentRakshasa(v) => write!(f, "libTorrent (rakshasa) ({})", v.as_str()), + Self::LibTorrentRasterbar(v) => write!(f, "libtorrent (rasterbar) ({})", v.as_str()), + Self::QBitTorrent(v) => write!(f, "QBitTorrent ({})", v.as_str()), + Self::Transmission(v) => write!(f, "Transmission ({})", v.as_str()), + Self::UTorrent(v) => write!(f, "uTorrent ({})", v.as_str()), + Self::UTorrentEmbedded(v) => write!(f, "uTorrent Embedded ({})", v.as_str()), + Self::UTorrentMac(v) => write!(f, "uTorrent Mac ({})", v.as_str()), + Self::UTorrentWeb(v) => write!(f, "uTorrent Web ({})", v.as_str()), + Self::Vuze(v) => write!(f, "Vuze ({})", v.as_str()), + Self::WebTorrent(v) => write!(f, "WebTorrent ({})", v.as_str()), + Self::WebTorrentDesktop(v) => write!(f, "WebTorrent Desktop ({})", v.as_str()), + Self::Mainline(v) => write!(f, "Mainline ({})", v.as_str()), + Self::OtherWithPrefixAndVersion { prefix, version } => { + write!(f, "Other ({}) ({})", prefix.as_str(), version.as_str()) + } + Self::OtherWithPrefix(prefix) => write!(f, "Other ({})", prefix.as_str()), + Self::Other => f.write_str("Other"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_peer_id(bytes: &[u8]) -> PeerId { + let mut peer_id = PeerId([0; 20]); + + let len = bytes.len(); + + (&mut peer_id.0[..len]).copy_from_slice(bytes); + + peer_id + } + + #[test] + fn test_client_from_peer_id() { + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-lt1234-k/asdh3")), + PeerClient::LibTorrentRakshasa("1234".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"M1-2-3--k/asdh3")), + PeerClient::Mainline("1-2-3-".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"M1-23-4-k/asdh3")), + PeerClient::Mainline("1-23-4".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"S3-k/asdh3")), + PeerClient::OtherWithPrefix("S3".into()) + ); + } +} From fa2f4a29b9b343613336dcd37674974bfc90e043 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 4 Jun 2023 17:12:06 +0200 Subject: [PATCH 02/20] aquatic_peer_id: improve version parsing/formatting --- aquatic_peer_id/src/lib.rs | 116 ++++++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 34 deletions(-) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 0371e22..1087621 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -1,6 +1,6 @@ use std::{fmt::Display, sync::OnceLock}; -use compact_str::CompactString; +use compact_str::{format_compact, CompactString}; use regex::bytes::Regex; use serde::{Deserialize, Serialize}; @@ -33,28 +33,82 @@ pub enum PeerClient { } impl PeerClient { - pub fn from_prefix_and_version(prefix: &[u8], version: &[u8]) -> Option { - let version = CompactString::from_utf8(version).ok()?; + pub fn from_prefix_and_version(prefix: &[u8], version: &[u8]) -> Self { + fn three_digits_plus_prerelease(v1: char, v2: char, v3: char, v4: char) -> CompactString { + let prerelease = match v4 { + 'A' => " [Alpha]", + 'B' => " [Beta]", + _ => "", + }; - match prefix { - b"AZ" => Some(Self::Vuze(version)), - b"BT" => Some(Self::BitTorrent(version)), - b"DE" => Some(Self::Deluge(version)), - b"lt" => Some(Self::LibTorrentRakshasa(version)), - b"LT" => Some(Self::LibTorrentRasterbar(version)), - b"qB" => Some(Self::QBitTorrent(version)), - b"TR" => Some(Self::Transmission(version)), - b"UE" => Some(Self::UTorrentEmbedded(version)), - b"UM" => Some(Self::UTorrentMac(version)), - b"UT" => Some(Self::UTorrent(version)), - b"UW" => Some(Self::UTorrentWeb(version)), - b"WD" => Some(Self::WebTorrentDesktop(version)), - b"WW" => Some(Self::WebTorrent(version)), - b"M" => Some(Self::Mainline(version)), - name => Some(Self::OtherWithPrefixAndVersion { - prefix: CompactString::from_utf8(name).ok()?, - version, - }), + format_compact!("{}.{}.{}{}", v1, v2, v3, prerelease) + } + + fn webtorrent(v1: char, v2: char, v3: char, v4: char) -> CompactString { + let major = if v1 == '0' { + format_compact!("{}", v2) + } else { + format_compact!("{}{}", v1, v2) + }; + + let minor = if v3 == '0' { + format_compact!("{}", v4) + } else { + format_compact!("{}{}", v3, v4) + }; + + format_compact!("{}.{}", major, minor) + } + + if let [v1, v2, v3, v4] = version { + let (v1, v2, v3, v4) = (*v1 as char, *v2 as char, *v3 as char, *v4 as char); + + match prefix { + b"AZ" => Self::Vuze(format_compact!("{}.{}.{}.{}", v1, v2, v3, v4)), + b"BT" => Self::BitTorrent(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"DE" => Self::Deluge(format_compact!("{}.{}.{}", v1, v2, v3)), + b"lt" => Self::LibTorrentRakshasa(format_compact!("{}.{}{}.{}", v1, v2, v3, v4)), + b"LT" => Self::LibTorrentRasterbar(format_compact!("{}.{}{}.{}", v1, v2, v3, v4)), + b"qB" => Self::QBitTorrent(format_compact!("{}.{}.{}", v1, v2, v3)), + b"TR" => { + let v = match (v1, v2, v3, v4) { + ('0', '0', '0', v4) => format_compact!("0.{}", v4), + ('0', '0', v3, v4) => format_compact!("0.{}{}", v3, v4), + _ => format_compact!("{}.{}{}", v1, v2, v3), + }; + + Self::Transmission(v) + } + b"UE" => Self::UTorrentEmbedded(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"UM" => Self::UTorrentMac(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"UT" => Self::UTorrent(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"UW" => Self::UTorrentWeb(three_digits_plus_prerelease(v1, v2, v3, v4)), + b"WD" => Self::WebTorrentDesktop(webtorrent(v1, v2, v3, v4)), + b"WW" => Self::WebTorrent(webtorrent(v1, v2, v3, v4)), + _ => Self::OtherWithPrefixAndVersion { + prefix: CompactString::from_utf8_lossy(prefix), + version: CompactString::from_utf8_lossy(version), + }, + } + } else { + match (prefix, version) { + (b"M", &[major, b'-', minor, b'-', patch, b'-']) => Self::Mainline( + format_compact!("{}.{}.{}", major as char, minor as char, patch as char), + ), + (b"M", &[major, b'-', minor1, minor2, b'-', patch]) => { + Self::Mainline(format_compact!( + "{}.{}{}.{}", + major as char, + minor1 as char, + minor2 as char, + patch as char + )) + } + _ => Self::OtherWithPrefixAndVersion { + prefix: CompactString::from_utf8_lossy(prefix), + version: CompactString::from_utf8_lossy(version), + }, + } } } @@ -68,9 +122,7 @@ impl PeerClient { }) .captures(&peer_id.0) { - if let Some(client) = Self::from_prefix_and_version(&caps["name"], &caps["version"]) { - return client; - } + return Self::from_prefix_and_version(&caps["name"], &caps["version"]); } static MAINLINE_RE: OnceLock = OnceLock::new(); @@ -82,9 +134,7 @@ impl PeerClient { }) .captures(&peer_id.0) { - if let Some(client) = Self::from_prefix_and_version(&caps["name"], &caps["version"]) { - return client; - } + return Self::from_prefix_and_version(&caps["name"], &caps["version"]); } static PREFIX_RE: OnceLock = OnceLock::new(); @@ -95,9 +145,7 @@ impl PeerClient { }) .captures(&peer_id.0) { - if let Ok(prefix) = CompactString::from_utf8(&caps["prefix"]) { - return Self::OtherWithPrefix(prefix); - } + return Self::OtherWithPrefix(CompactString::from_utf8_lossy(&caps["prefix"])); } Self::Other @@ -148,15 +196,15 @@ mod tests { fn test_client_from_peer_id() { assert_eq!( PeerClient::from_peer_id(create_peer_id(b"-lt1234-k/asdh3")), - PeerClient::LibTorrentRakshasa("1234".into()) + PeerClient::LibTorrentRakshasa("1.23.4".into()) ); assert_eq!( PeerClient::from_peer_id(create_peer_id(b"M1-2-3--k/asdh3")), - PeerClient::Mainline("1-2-3-".into()) + PeerClient::Mainline("1.2.3".into()) ); assert_eq!( PeerClient::from_peer_id(create_peer_id(b"M1-23-4-k/asdh3")), - PeerClient::Mainline("1-23-4".into()) + PeerClient::Mainline("1.23.4".into()) ); assert_eq!( PeerClient::from_peer_id(create_peer_id(b"S3-k/asdh3")), From 977349ec0366b28e0195f68e1fcb544b9d569e43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 4 Jun 2023 17:18:54 +0200 Subject: [PATCH 03/20] aquatic_peer_id: add more tests --- aquatic_peer_id/src/lib.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 1087621..8f002c3 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -198,6 +198,30 @@ mod tests { PeerClient::from_peer_id(create_peer_id(b"-lt1234-k/asdh3")), PeerClient::LibTorrentRakshasa("1.23.4".into()) ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-UT123A-k/asdh3")), + PeerClient::UTorrent("1.2.3 [Alpha]".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-TR0012-k/asdh3")), + PeerClient::Transmission("0.12".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-TR1212-k/asdh3")), + PeerClient::Transmission("1.21".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-WW0102-k/asdh3")), + PeerClient::WebTorrent("1.2".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-WW1302-k/asdh3")), + PeerClient::WebTorrent("13.2".into()) + ); + assert_eq!( + PeerClient::from_peer_id(create_peer_id(b"-WW1324-k/asdh3")), + PeerClient::WebTorrent("13.24".into()) + ); assert_eq!( PeerClient::from_peer_id(create_peer_id(b"M1-2-3--k/asdh3")), PeerClient::Mainline("1.2.3".into()) From a74d6aa45885f49cd9868dfb0656cf247dc09ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Jun 2023 01:04:37 +0200 Subject: [PATCH 04/20] udp: initial support for listing peer clients --- Cargo.lock | 4 + aquatic_peer_id/Cargo.toml | 4 +- aquatic_peer_id/src/lib.rs | 48 +++++++++--- aquatic_udp/Cargo.toml | 1 + aquatic_udp/src/common.rs | 2 + aquatic_udp/src/config.rs | 5 +- .../src/workers/statistics/collector.rs | 1 + aquatic_udp/src/workers/statistics/mod.rs | 66 +++++++++++++++- aquatic_udp/src/workers/swarm/mod.rs | 26 +++---- aquatic_udp/src/workers/swarm/storage.rs | 77 +++++++++++++++---- aquatic_udp/templates/statistics.html | 25 ++++++ aquatic_udp_protocol/Cargo.toml | 2 + aquatic_udp_protocol/src/common.rs | 18 +---- aquatic_udp_protocol/src/request.rs | 2 + 14 files changed, 213 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b6d2301..5a64467 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,6 +184,8 @@ name = "aquatic_peer_id" version = "0.8.0" dependencies = [ "compact_str", + "hex", + "quickcheck", "regex", "serde", ] @@ -218,6 +220,7 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", + "compact_str", "constant_time_eq", "crossbeam-channel", "getrandom", @@ -284,6 +287,7 @@ dependencies = [ name = "aquatic_udp_protocol" version = "0.8.0" dependencies = [ + "aquatic_peer_id", "byteorder", "either", "quickcheck", diff --git a/aquatic_peer_id/Cargo.toml b/aquatic_peer_id/Cargo.toml index 3074e6b..36f5553 100644 --- a/aquatic_peer_id/Cargo.toml +++ b/aquatic_peer_id/Cargo.toml @@ -14,5 +14,7 @@ name = "aquatic_peer_id" [dependencies] compact_str = "0.7" +hex = "0.4" regex = "1" -serde = { version = "1", features = ["derive"] } \ No newline at end of file +serde = { version = "1", features = ["derive"] } +quickcheck = "1" \ No newline at end of file diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 8f002c3..b6f5ed4 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -7,6 +7,20 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct PeerId(pub [u8; 20]); +impl PeerId { + pub fn client(&self) -> PeerClient { + PeerClient::from_peer_id(self) + } + pub fn first_8_bytes_hex(&self) -> CompactString { + let mut buf = [0u8; 16]; + + hex::encode_to_slice(&self.0[..8], &mut buf) + .expect("PeerId.first_8_bytes_hex buffer too small"); + + CompactString::from_utf8_lossy(&buf) + } +} + #[non_exhaustive] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum PeerClient { @@ -112,7 +126,7 @@ impl PeerClient { } } - pub fn from_peer_id(peer_id: PeerId) -> Self { + pub fn from_peer_id(peer_id: &PeerId) -> Self { static AZ_RE: OnceLock = OnceLock::new(); if let Some(caps) = AZ_RE @@ -178,6 +192,18 @@ impl Display for PeerClient { } } +impl quickcheck::Arbitrary for PeerId { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let mut bytes = [0u8; 20]; + + for byte in bytes.iter_mut() { + *byte = u8::arbitrary(g); + } + + Self(bytes) + } +} + #[cfg(test)] mod tests { use super::*; @@ -195,43 +221,43 @@ mod tests { #[test] fn test_client_from_peer_id() { assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-lt1234-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-lt1234-k/asdh3")), PeerClient::LibTorrentRakshasa("1.23.4".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-UT123A-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-UT123A-k/asdh3")), PeerClient::UTorrent("1.2.3 [Alpha]".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-TR0012-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-TR0012-k/asdh3")), PeerClient::Transmission("0.12".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-TR1212-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-TR1212-k/asdh3")), PeerClient::Transmission("1.21".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-WW0102-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-WW0102-k/asdh3")), PeerClient::WebTorrent("1.2".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-WW1302-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-WW1302-k/asdh3")), PeerClient::WebTorrent("13.2".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"-WW1324-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"-WW1324-k/asdh3")), PeerClient::WebTorrent("13.24".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"M1-2-3--k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"M1-2-3--k/asdh3")), PeerClient::Mainline("1.2.3".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"M1-23-4-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"M1-23-4-k/asdh3")), PeerClient::Mainline("1.23.4".into()) ); assert_eq!( - PeerClient::from_peer_id(create_peer_id(b"S3-k/asdh3")), + PeerClient::from_peer_id(&create_peer_id(b"S3-k/asdh3")), PeerClient::OtherWithPrefix("S3".into()) ); } diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 88bc11b..4a302c0 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -30,6 +30,7 @@ aquatic_udp_protocol.workspace = true anyhow = "1" blake3 = "1" cfg-if = "1" +compact_str = { version = "0.7", features = ["serde"] } constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 49aca5e..5e64376 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -136,6 +136,8 @@ impl PeerStatus { pub enum StatisticsMessage { Ipv4PeerHistogram(Histogram), Ipv6PeerHistogram(Histogram), + PeerAdded(PeerId), + PeerRemoved(PeerId), } pub struct Statistics { diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 6e67938..a61a846 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -161,10 +161,9 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, - /// Enable extended statistics (on peers per torrent) + /// Enable extended statistics (on peers per torrent and on peer clients) /// - /// Will increase time taken for torrent cleaning, since that's when - /// these statistics are collected. + /// Will increase time taken for request handling and torrent cleaning pub extended: bool, /// Print statistics to standard output pub print_to_stdout: bool, diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs index 3e3ed97..af25639 100644 --- a/aquatic_udp/src/workers/statistics/collector.rs +++ b/aquatic_udp/src/workers/statistics/collector.rs @@ -2,6 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Instant; +use crossbeam_channel::Receiver; use hdrhistogram::Histogram; use num_format::{Locale, ToFormattedString}; use serde::Serialize; diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index f54f08b..8a1d5ad 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -2,10 +2,12 @@ mod collector; use std::fs::File; use std::io::Write; -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::Context; -use aquatic_common::PanicSentinel; +use aquatic_common::{IndexMap, PanicSentinel}; +use aquatic_udp_protocol::PeerClient; +use compact_str::{CompactString, ToCompactString}; use crossbeam_channel::Receiver; use serde::Serialize; use time::format_description::well_known::Rfc2822; @@ -35,6 +37,7 @@ struct TemplateData { ipv6: CollectedStatistics, last_updated: String, peer_update_interval: String, + peer_clients: Vec<(CompactString, CompactString, usize)>, } pub fn run_statistics_worker( @@ -68,13 +71,46 @@ pub fn run_statistics_worker( "6".into(), ); + let mut peer_clients: IndexMap = IndexMap::default(); + loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + let start_time = Instant::now(); for message in statistics_receiver.try_iter() { match message { StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), + StatisticsMessage::PeerAdded(peer_id) => { + peer_clients + .entry(peer_id.client()) + .or_insert((0, peer_id.first_8_bytes_hex())) + .0 += 1; + } + StatisticsMessage::PeerRemoved(peer_id) => { + let client = peer_id.client(); + + if let Some((count, _)) = peer_clients.get_mut(&client) { + if *count == 1 { + drop(count); + + peer_clients.remove(&client); + } else { + *count -= 1; + } + } + } + } + } + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint && config.statistics.extended { + for (peer_client, (count, first_8_bytes)) in peer_clients.iter() { + ::metrics::gauge!( + "aquatic_peer_clients", + *count as f64, + "client" => peer_client.to_string(), + "peer_id_prefix_hex" => first_8_bytes.to_string(), + ); } } @@ -107,6 +143,23 @@ pub fn run_statistics_worker( } if let Some(tt) = opt_tt.as_ref() { + let mut peer_clients = if config.statistics.extended { + peer_clients + .iter() + .map(|(peer_client, (count, first_8_bytes))| { + ( + peer_client.to_compact_string(), + first_8_bytes.to_owned(), + *count, + ) + }) + .collect() + } else { + Vec::new() + }; + + peer_clients.sort_unstable_by(|a, b| b.2.cmp(&a.2)); + let template_data = TemplateData { stylesheet: STYLESHEET_CONTENTS.to_string(), ipv4_active: config.network.ipv4_active(), @@ -118,12 +171,19 @@ pub fn run_statistics_worker( .format(&Rfc2822) .unwrap_or("(formatting error)".into()), peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval), + peer_clients, }; if let Err(err) = save_html_to_file(&config, tt, &template_data) { ::log::error!("Couldn't save statistics to file: {:#}", err) } } + + if let Some(time_remaining) = + Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed()) + { + ::std::thread::sleep(time_remaining); + } } } diff --git a/aquatic_udp/src/workers/swarm/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs index bab716e..d4bb3c5 100644 --- a/aquatic_udp/src/workers/swarm/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -50,6 +50,7 @@ pub fn run_swarm_worker( let response = handle_announce_request( &config, &mut rng, + &statistics_sender, &mut torrents.ipv4, request, ip, @@ -62,6 +63,7 @@ pub fn run_swarm_worker( let response = handle_announce_request( &config, &mut rng, + &statistics_sender, &mut torrents.ipv6, request, ip, @@ -88,28 +90,15 @@ pub fn run_swarm_worker( peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - let (ipv4, ipv6) = torrents.clean_and_get_statistics( + torrents.clean_and_update_statistics( &config, + &state, + &statistics_sender, &state.access_list, server_start_instant, + worker_index, ); - if config.statistics.active() { - state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); - - if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err) - } - } - if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err) - } - } - } - last_cleaning = now; } if config.statistics.active() @@ -131,6 +120,7 @@ pub fn run_swarm_worker( fn handle_announce_request( config: &Config, rng: &mut SmallRng, + statistics_sender: &Sender, torrents: &mut TorrentMap, request: AnnounceRequest, peer_ip: I, @@ -150,6 +140,8 @@ fn handle_announce_request( let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); torrent_data.update_peer( + config, + statistics_sender, request.peer_id, peer_ip, request.port, diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index e96416c..9c78021 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -1,5 +1,6 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; +use std::sync::atomic::Ordering; use std::sync::Arc; use aquatic_common::IndexMap; @@ -11,6 +12,7 @@ use aquatic_common::{ }; use aquatic_udp_protocol::*; +use crossbeam_channel::Sender; use hdrhistogram::Histogram; use rand::prelude::SmallRng; @@ -46,6 +48,8 @@ pub struct TorrentData { impl TorrentData { pub fn update_peer( &mut self, + config: &Config, + statistics_sender: &Sender, peer_id: PeerId, ip_address: I, port: Port, @@ -78,6 +82,20 @@ impl TorrentData { PeerStatus::Stopped => self.peers.remove(&peer_id), }; + if config.statistics.extended { + match (status, opt_removed_peer.is_some()) { + // We added a new peer + (PeerStatus::Leeching | PeerStatus::Seeding, false) => { + statistics_sender.try_send(StatisticsMessage::PeerAdded(peer_id)); + } + // We removed an existing peer + (PeerStatus::Stopped, true) => { + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer_id)); + } + _ => (), + } + } + if let Some(Peer { is_seeder: true, .. }) = opt_removed_peer @@ -117,12 +135,22 @@ impl TorrentData { } /// Remove inactive peers and reclaim space - fn clean(&mut self, now: SecondsSinceServerStart) { - self.peers.retain(|_, peer| { + fn clean( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) { + self.peers.retain(|peer_id, peer| { let keep = peer.valid_until.valid(now); - if (!keep) & peer.is_seeder { - self.num_seeders -= 1; + if !keep { + if peer.is_seeder { + self.num_seeders -= 1; + } + if config.statistics.extended { + statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)); + } } keep @@ -151,6 +179,7 @@ impl TorrentMap { fn clean_and_get_statistics( &mut self, config: &Config, + statistics_sender: &Sender, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, now: SecondsSinceServerStart, @@ -178,7 +207,7 @@ impl TorrentMap { return false; } - torrent.clean(now); + torrent.clean(config, statistics_sender, now); num_peers += torrent.peers.len(); @@ -225,28 +254,42 @@ impl Default for TorrentMaps { } impl TorrentMaps { - /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - pub fn clean_and_get_statistics( + /// Remove forbidden or inactive torrents, reclaim space and update statistics + pub fn clean_and_update_statistics( &mut self, config: &Config, + state: &State, + statistics_sender: &Sender, access_list: &Arc, server_start_instant: ServerStartInstant, - ) -> ( - (usize, Option>), - (usize, Option>), + worker_index: SwarmWorkerIndex, ) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; let now = server_start_instant.seconds_elapsed(); - let ipv4 = self - .ipv4 - .clean_and_get_statistics(config, &mut cache, mode, now); - let ipv6 = self - .ipv6 - .clean_and_get_statistics(config, &mut cache, mode, now); + let ipv4 = + self.ipv4 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let ipv6 = + self.ipv6 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - (ipv4, ipv6) + if config.statistics.active() { + state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); + + if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + } } } diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 2007d01..a27ba81 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -251,5 +251,30 @@ {{ endif }} {{ endif }} + + {{ if extended_active }} + +

Peer clients

+ + + + + + + + + + + {{ for value in peer_clients }} + + + + + + {{ endfor }} + +
ClientPeer ID prefix (hex)Count
{ value.0 }{ value.1 }{ value.2 }
+ + {{ endif }} diff --git a/aquatic_udp_protocol/Cargo.toml b/aquatic_udp_protocol/Cargo.toml index eb78d2b..033b65f 100644 --- a/aquatic_udp_protocol/Cargo.toml +++ b/aquatic_udp_protocol/Cargo.toml @@ -11,6 +11,8 @@ readme.workspace = true rust-version.workspace = true [dependencies] +aquatic_peer_id.workspace = true + byteorder = "1" either = "1" diff --git a/aquatic_udp_protocol/src/common.rs b/aquatic_udp_protocol/src/common.rs index 8aab39f..c99a0cb 100644 --- a/aquatic_udp_protocol/src/common.rs +++ b/aquatic_udp_protocol/src/common.rs @@ -1,6 +1,8 @@ use std::fmt::Debug; use std::net::{Ipv4Addr, Ipv6Addr}; +pub use aquatic_peer_id::{PeerClient, PeerId}; + pub trait Ip: Clone + Copy + Debug + PartialEq + Eq {} impl Ip for Ipv4Addr {} @@ -30,9 +32,6 @@ pub struct NumberOfDownloads(pub i32); #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct Port(pub u16); -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, PartialOrd, Ord)] -pub struct PeerId(pub [u8; 20]); - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct PeerKey(pub u32); @@ -55,19 +54,6 @@ impl quickcheck::Arbitrary for InfoHash { } } -#[cfg(test)] -impl quickcheck::Arbitrary for PeerId { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - let mut bytes = [0u8; 20]; - - for byte in bytes.iter_mut() { - *byte = u8::arbitrary(g); - } - - Self(bytes) - } -} - #[cfg(test)] impl quickcheck::Arbitrary for ResponsePeer { fn arbitrary(g: &mut quickcheck::Gen) -> Self { diff --git a/aquatic_udp_protocol/src/request.rs b/aquatic_udp_protocol/src/request.rs index 16be9c8..0ed591c 100644 --- a/aquatic_udp_protocol/src/request.rs +++ b/aquatic_udp_protocol/src/request.rs @@ -5,6 +5,8 @@ use std::net::Ipv4Addr; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; use either::Either; +use aquatic_peer_id::PeerId; + use super::common::*; const PROTOCOL_IDENTIFIER: i64 = 4_497_486_125_440; From c5547c28c022bc7f96020cc28b41bcfeae7e9981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 6 Jun 2023 01:11:37 +0200 Subject: [PATCH 05/20] aquatic_peer_id: require at least one char for PREFIX_RE --- aquatic_peer_id/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index b6f5ed4..454e7c8 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -155,7 +155,7 @@ impl PeerClient { if let Some(caps) = PREFIX_RE .get_or_init(|| { - Regex::new(r"^(?P[a-zA-Z0-9\-]*)\-").expect("compile PREFIX_RE regex") + Regex::new(r"^(?P[a-zA-Z0-9\-]+)\-").expect("compile PREFIX_RE regex") }) .captures(&peer_id.0) { From 4ff65cc6bd43ce39922c508da5a7f095e82f45a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 12:50:23 +0200 Subject: [PATCH 06/20] aquatic_peer_id: shorten som textual client representations --- aquatic_peer_id/src/lib.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 454e7c8..1629abf 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -50,8 +50,8 @@ impl PeerClient { pub fn from_prefix_and_version(prefix: &[u8], version: &[u8]) -> Self { fn three_digits_plus_prerelease(v1: char, v2: char, v3: char, v4: char) -> CompactString { let prerelease = match v4 { - 'A' => " [Alpha]", - 'B' => " [Beta]", + 'A' => " alpha", + 'B' => " beta", _ => "", }; @@ -171,14 +171,14 @@ impl Display for PeerClient { match self { Self::BitTorrent(v) => write!(f, "BitTorrent ({})", v.as_str()), Self::Deluge(v) => write!(f, "Deluge ({})", v.as_str()), - Self::LibTorrentRakshasa(v) => write!(f, "libTorrent (rakshasa) ({})", v.as_str()), - Self::LibTorrentRasterbar(v) => write!(f, "libtorrent (rasterbar) ({})", v.as_str()), + Self::LibTorrentRakshasa(v) => write!(f, "lt (rakshasa) ({})", v.as_str()), + Self::LibTorrentRasterbar(v) => write!(f, "lt (rasterbar) ({})", v.as_str()), Self::QBitTorrent(v) => write!(f, "QBitTorrent ({})", v.as_str()), Self::Transmission(v) => write!(f, "Transmission ({})", v.as_str()), - Self::UTorrent(v) => write!(f, "uTorrent ({})", v.as_str()), - Self::UTorrentEmbedded(v) => write!(f, "uTorrent Embedded ({})", v.as_str()), - Self::UTorrentMac(v) => write!(f, "uTorrent Mac ({})", v.as_str()), - Self::UTorrentWeb(v) => write!(f, "uTorrent Web ({})", v.as_str()), + Self::UTorrent(v) => write!(f, "µTorrent ({})", v.as_str()), + Self::UTorrentEmbedded(v) => write!(f, "µTorrent Emb. ({})", v.as_str()), + Self::UTorrentMac(v) => write!(f, "µTorrent Mac ({})", v.as_str()), + Self::UTorrentWeb(v) => write!(f, "µTorrent Web ({})", v.as_str()), Self::Vuze(v) => write!(f, "Vuze ({})", v.as_str()), Self::WebTorrent(v) => write!(f, "WebTorrent ({})", v.as_str()), Self::WebTorrentDesktop(v) => write!(f, "WebTorrent Desktop ({})", v.as_str()), From 08b28c9e1b8a8e35df706b664e288ea15d80a987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 12:58:41 +0200 Subject: [PATCH 07/20] udp statistics: improve peer client reporting - fix prometheus peer id prefix reporting - don't report peer id prefix in html output (current method was incorrect and output would become huge) --- .../src/workers/statistics/collector.rs | 1 - aquatic_udp/src/workers/statistics/mod.rs | 60 ++++++++++--------- aquatic_udp/templates/statistics.html | 2 - 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs index af25639..3e3ed97 100644 --- a/aquatic_udp/src/workers/statistics/collector.rs +++ b/aquatic_udp/src/workers/statistics/collector.rs @@ -2,7 +2,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Instant; -use crossbeam_channel::Receiver; use hdrhistogram::Histogram; use num_format::{Locale, ToFormattedString}; use serde::Serialize; diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index 8a1d5ad..aa453e1 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -37,7 +37,7 @@ struct TemplateData { ipv6: CollectedStatistics, last_updated: String, peer_update_interval: String, - peer_clients: Vec<(CompactString, CompactString, usize)>, + peer_clients: Vec<(CompactString, usize)>, } pub fn run_statistics_worker( @@ -71,7 +71,7 @@ pub fn run_statistics_worker( "6".into(), ); - let mut peer_clients: IndexMap = IndexMap::default(); + let mut peer_clients: IndexMap = IndexMap::default(); loop { let start_time = Instant::now(); @@ -81,15 +81,35 @@ pub fn run_statistics_worker( StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), StatisticsMessage::PeerAdded(peer_id) => { - peer_clients - .entry(peer_id.client()) - .or_insert((0, peer_id.first_8_bytes_hex())) - .0 += 1; + let client = peer_id.client(); + let first_8_bytes_hex = peer_id.first_8_bytes_hex(); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + "peer_id_prefix_hex" => first_8_bytes_hex.to_string(), + ); + } + + *peer_clients.entry(client).or_insert(0) += 1; } StatisticsMessage::PeerRemoved(peer_id) => { let client = peer_id.client(); - if let Some((count, _)) = peer_clients.get_mut(&client) { + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::decrement_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), + ); + } + + if let Some(count) = peer_clients.get_mut(&client) { if *count == 1 { drop(count); @@ -102,18 +122,6 @@ pub fn run_statistics_worker( } } - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint && config.statistics.extended { - for (peer_client, (count, first_8_bytes)) in peer_clients.iter() { - ::metrics::gauge!( - "aquatic_peer_clients", - *count as f64, - "client" => peer_client.to_string(), - "peer_id_prefix_hex" => first_8_bytes.to_string(), - ); - } - } - let statistics_ipv4 = ipv4_collector.collect_from_shared( #[cfg(feature = "prometheus")] &config, @@ -146,19 +154,13 @@ pub fn run_statistics_worker( let mut peer_clients = if config.statistics.extended { peer_clients .iter() - .map(|(peer_client, (count, first_8_bytes))| { - ( - peer_client.to_compact_string(), - first_8_bytes.to_owned(), - *count, - ) - }) + .map(|(peer_client, count)| (peer_client.to_compact_string(), *count)) .collect() } else { Vec::new() }; - peer_clients.sort_unstable_by(|a, b| b.2.cmp(&a.2)); + peer_clients.sort_unstable_by(|a, b| b.1.cmp(&a.1)); let template_data = TemplateData { stylesheet: STYLESHEET_CONTENTS.to_string(), @@ -183,6 +185,10 @@ pub fn run_statistics_worker( Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed()) { ::std::thread::sleep(time_remaining); + } else { + ::log::warn!( + "statistics interval not long enough to process all data, output may be misleading" + ); } } } diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index a27ba81..77463f6 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -260,7 +260,6 @@ Client - Peer ID prefix (hex) Count @@ -268,7 +267,6 @@ {{ for value in peer_clients }} { value.0 } - { value.1 } { value.2 } {{ endfor }} From 00c4e61ed926bd6e630102e8d4eee2b978efac73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 13:06:44 +0200 Subject: [PATCH 08/20] udp: fix template error --- aquatic_udp/src/workers/swarm/storage.rs | 21 ++++++++++++++++++--- aquatic_udp/templates/statistics.html | 2 +- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 9c78021..2c864e8 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -86,11 +86,21 @@ impl TorrentData { match (status, opt_removed_peer.is_some()) { // We added a new peer (PeerStatus::Leeching | PeerStatus::Seeding, false) => { - statistics_sender.try_send(StatisticsMessage::PeerAdded(peer_id)); + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerAdded(peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerAdded"); + } } // We removed an existing peer (PeerStatus::Stopped, true) => { - statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer_id)); + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } } _ => (), } @@ -149,7 +159,12 @@ impl TorrentData { self.num_seeders -= 1; } if config.statistics.extended { - statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)); + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } } } diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 77463f6..0fe8930 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -267,7 +267,7 @@ {{ for value in peer_clients }} { value.0 } - { value.2 } + { value.1 } {{ endfor }} From 08239dff1fdca701067c9575de6bc8fdc4d911aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 13:14:56 +0200 Subject: [PATCH 09/20] aquatic_peer_id: be stricter about letters in version numbers Also fix failing test --- aquatic_peer_id/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 1629abf..77f713b 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -131,7 +131,7 @@ impl PeerClient { if let Some(caps) = AZ_RE .get_or_init(|| { - Regex::new(r"^\-(?P[a-zA-Z]{2})(?P[0-9A-Z]{4})") + Regex::new(r"^\-(?P[a-zA-Z]{2})(?P[0-9]{3}[0-9AB])") .expect("compile AZ_RE regex") }) .captures(&peer_id.0) @@ -226,7 +226,7 @@ mod tests { ); assert_eq!( PeerClient::from_peer_id(&create_peer_id(b"-UT123A-k/asdh3")), - PeerClient::UTorrent("1.2.3 [Alpha]".into()) + PeerClient::UTorrent("1.2.3 alpha".into()) ); assert_eq!( PeerClient::from_peer_id(&create_peer_id(b"-TR0012-k/asdh3")), From 47b45f28d51a47509d10d015418becc1021ee894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 13:28:38 +0200 Subject: [PATCH 10/20] udp: add separate config flag for prometheus peer client reports --- aquatic_udp/src/config.rs | 15 +++++++- aquatic_udp/src/workers/statistics/mod.rs | 45 ++++++++++++----------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index a61a846..1eba011 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -161,9 +161,10 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, - /// Enable extended statistics (on peers per torrent and on peer clients) + /// Enable extended statistics (on peers per torrent and on peer clients). + /// Also, see `prometheus_peer_clients`. /// - /// Will increase time taken for request handling and torrent cleaning + /// Will increase time taken for request handling and torrent cleaning. pub extended: bool, /// Print statistics to standard output pub print_to_stdout: bool, @@ -177,6 +178,14 @@ pub struct StatisticsConfig { /// Address to run prometheus endpoint on #[cfg(feature = "prometheus")] pub prometheus_endpoint_address: SocketAddr, + /// Serve information on all peer clients on the prometheus endpoint. + /// Requires extended statistics to be activated. + /// + /// NOT RECOMMENDED. May consume lots of CPU and RAM since data on every + /// single peer client will be kept around by the endpoint, even those + /// which are no longer in the swarm. + #[cfg(feature = "prometheus")] + pub prometheus_peer_clients: bool, } impl StatisticsConfig { @@ -206,6 +215,8 @@ impl Default for StatisticsConfig { run_prometheus_endpoint: false, #[cfg(feature = "prometheus")] prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), + #[cfg(feature = "prometheus")] + prometheus_peer_clients: false, } } } diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index aa453e1..da9a542 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -81,39 +81,42 @@ pub fn run_statistics_worker( StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), StatisticsMessage::PeerAdded(peer_id) => { - let client = peer_id.client(); - let first_8_bytes_hex = peer_id.first_8_bytes_hex(); + let peer_client = peer_id.client(); #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_clients + { ::metrics::increment_gauge!( "aquatic_peer_clients", 1.0, - "client" => client.to_string(), - "peer_id_prefix_hex" => first_8_bytes_hex.to_string(), - ); - } - - *peer_clients.entry(client).or_insert(0) += 1; - } - StatisticsMessage::PeerRemoved(peer_id) => { - let client = peer_id.client(); - - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - ::metrics::decrement_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => client.to_string(), + "client" => peer_client.to_string(), "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), ); } - if let Some(count) = peer_clients.get_mut(&client) { + *peer_clients.entry(peer_client).or_insert(0) += 1; + } + StatisticsMessage::PeerRemoved(peer_id) => { + let peer_client = peer_id.client(); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_clients + { + ::metrics::decrement_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => peer_client.to_string(), + "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), + ); + } + + if let Some(count) = peer_clients.get_mut(&peer_client) { if *count == 1 { drop(count); - peer_clients.remove(&client); + peer_clients.remove(&peer_client); } else { *count -= 1; } From 467f75e1fb9549274f724a7e7f79140272d19b39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 13:32:46 +0200 Subject: [PATCH 11/20] udp: use prettier formatting for html peer client count output --- Cargo.lock | 1 - aquatic_udp/Cargo.toml | 1 - aquatic_udp/src/workers/statistics/mod.rs | 21 ++++++++++++++------- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a64467..8c6a434 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,7 +220,6 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", - "compact_str", "constant_time_eq", "crossbeam-channel", "getrandom", diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 4a302c0..88bc11b 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -30,7 +30,6 @@ aquatic_udp_protocol.workspace = true anyhow = "1" blake3 = "1" cfg-if = "1" -compact_str = { version = "0.7", features = ["serde"] } constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index da9a542..90a730b 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -7,8 +7,8 @@ use std::time::{Duration, Instant}; use anyhow::Context; use aquatic_common::{IndexMap, PanicSentinel}; use aquatic_udp_protocol::PeerClient; -use compact_str::{CompactString, ToCompactString}; use crossbeam_channel::Receiver; +use num_format::{Locale, ToFormattedString}; use serde::Serialize; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; @@ -37,7 +37,7 @@ struct TemplateData { ipv6: CollectedStatistics, last_updated: String, peer_update_interval: String, - peer_clients: Vec<(CompactString, usize)>, + peer_clients: Vec<(String, String)>, } pub fn run_statistics_worker( @@ -155,15 +155,22 @@ pub fn run_statistics_worker( if let Some(tt) = opt_tt.as_ref() { let mut peer_clients = if config.statistics.extended { - peer_clients - .iter() - .map(|(peer_client, count)| (peer_client.to_compact_string(), *count)) - .collect() + peer_clients.iter().collect() } else { Vec::new() }; - peer_clients.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + peer_clients.sort_unstable_by(|a, b| b.1.cmp(a.1)); + + let peer_clients = peer_clients + .into_iter() + .map(|(peer_client, count)| { + ( + peer_client.to_string(), + count.to_formatted_string(&Locale::en), + ) + }) + .collect(); let template_data = TemplateData { stylesheet: STYLESHEET_CONTENTS.to_string(), From 1ddac59feebf09c3a8cc8d84c76137650958fbce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 7 Jun 2023 13:41:20 +0200 Subject: [PATCH 12/20] Update CHANGELOG --- CHANGELOG.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c2a2f0..a05ce30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog +## Unreleased + +### General + +#### Added + +* Add `aquatic_peer_id` crate with peer client information logic + +### aquatic_udp + +#### Added + +* Add support for reporting peer client information + ## 0.8.0 - 2023-03-17 ### General From da25d60a5dc11ad300271e958219163ea4ca86af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 8 Jun 2023 01:06:19 +0200 Subject: [PATCH 13/20] udp: improve peer client statistics --- Cargo.lock | 51 ++++++- aquatic_udp/Cargo.toml | 4 +- aquatic_udp/src/config.rs | 29 ++-- aquatic_udp/src/lib.rs | 6 + .../src/workers/statistics/collector.rs | 2 +- aquatic_udp/src/workers/statistics/mod.rs | 135 ++++++++++-------- aquatic_udp/src/workers/swarm/storage.rs | 7 +- 7 files changed, 159 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8c6a434..cea9d22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,6 +40,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + [[package]] name = "aho-corasick" version = "1.0.1" @@ -220,6 +229,7 @@ dependencies = [ "aquatic_udp_protocol", "blake3", "cfg-if", + "compact_str", "constant_time_eq", "crossbeam-channel", "getrandom", @@ -231,6 +241,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "mio", "num-format", @@ -885,6 +896,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "env_logger" version = "0.8.4" @@ -1672,12 +1689,16 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "111cb375987443c3de8d503580b536f77dc8416d32db62d9456db5d93bd7ac47" dependencies = [ + "aho-corasick 0.7.20", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.2", + "indexmap", "metrics", "num_cpus", + "ordered-float", "quanta", + "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -1735,6 +1756,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.23.2" @@ -1876,6 +1906,15 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "ordered-float" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.5.0" @@ -2068,6 +2107,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2145,7 +2194,7 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.1", "memchr", "regex-syntax", ] diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 88bc11b..7f779bc 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -19,7 +19,7 @@ name = "aquatic_udp" [features] default = ["prometheus"] cpu-pinning = ["aquatic_common/hwloc"] -prometheus = ["metrics", "metrics-exporter-prometheus"] +prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"] io-uring = ["dep:io-uring"] [dependencies] @@ -30,6 +30,7 @@ aquatic_udp_protocol.workspace = true anyhow = "1" blake3 = "1" cfg-if = "1" +compact_str = "0.7" constant_time_eq = "0.2" crossbeam-channel = "0.5" getrandom = "0.2" @@ -40,6 +41,7 @@ io-uring = { version = "0.6", optional = true } libc = "0.2" log = "0.4" metrics = { version = "0.21", optional = true } +metrics-util = { version = "0.15", optional = true } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 1eba011..2be8729 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -161,11 +161,16 @@ impl Default for ProtocolConfig { pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, - /// Enable extended statistics (on peers per torrent and on peer clients). - /// Also, see `prometheus_peer_clients`. + /// Collect statistics on number of peers per torrent /// - /// Will increase time taken for request handling and torrent cleaning. - pub extended: bool, + /// Will increase time taken for torrent cleaning. + pub torrent_peer_histograms: bool, + /// Collect statistics on peer clients. + /// + /// Also, see `prometheus_peer_id_prefixes`. + /// + /// Quite costly when it comes to CPU and RAM. + pub peer_clients: bool, /// Print statistics to standard output pub print_to_stdout: bool, /// Save statistics as HTML to a file @@ -178,14 +183,13 @@ pub struct StatisticsConfig { /// Address to run prometheus endpoint on #[cfg(feature = "prometheus")] pub prometheus_endpoint_address: SocketAddr, - /// Serve information on all peer clients on the prometheus endpoint. - /// Requires extended statistics to be activated. + /// Serve information on all peer id prefixes on the prometheus endpoint. + /// Requires `peer_clients` to be activated. /// - /// NOT RECOMMENDED. May consume lots of CPU and RAM since data on every - /// single peer client will be kept around by the endpoint, even those - /// which are no longer in the swarm. + /// May consume quite a bit of CPU and RAM, since data on every single peer + /// client will be reported continuously on the endpoint #[cfg(feature = "prometheus")] - pub prometheus_peer_clients: bool, + pub prometheus_peer_id_prefixes: bool, } impl StatisticsConfig { @@ -207,7 +211,8 @@ impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 5, - extended: false, + torrent_peer_histograms: false, + peer_clients: false, print_to_stdout: false, write_html_to_file: false, html_file_path: "tmp/statistics.html".into(), @@ -216,7 +221,7 @@ impl Default for StatisticsConfig { #[cfg(feature = "prometheus")] prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), #[cfg(feature = "prometheus")] - prometheus_peer_clients: false, + prometheus_peer_id_prefixes: false, } } } diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index a869c4f..6f8ffe6 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -4,6 +4,7 @@ pub mod workers; use std::collections::BTreeMap; use std::thread::Builder; +use std::time::Duration; use anyhow::Context; use crossbeam_channel::{bounded, unbounded}; @@ -143,8 +144,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { #[cfg(feature = "prometheus")] if config.statistics.run_prometheus_endpoint { use metrics_exporter_prometheus::PrometheusBuilder; + use metrics_util::MetricKindMask; PrometheusBuilder::new() + .idle_timeout( + MetricKindMask::ALL, + Some(Duration::from_secs(config.statistics.interval * 2)), + ) .with_http_listener(config.statistics.prometheus_endpoint_address) .install() .with_context(|| { diff --git a/aquatic_udp/src/workers/statistics/collector.rs b/aquatic_udp/src/workers/statistics/collector.rs index 3e3ed97..33c0a4e 100644 --- a/aquatic_udp/src/workers/statistics/collector.rs +++ b/aquatic_udp/src/workers/statistics/collector.rs @@ -134,7 +134,7 @@ impl StatisticsCollector { ); } - if config.statistics.extended { + if config.statistics.torrent_peer_histograms { self.last_complete_histogram .update_metrics(self.ip_version.clone()); } diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index 90a730b..6d1ee2c 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -6,7 +6,8 @@ use std::time::{Duration, Instant}; use anyhow::Context; use aquatic_common::{IndexMap, PanicSentinel}; -use aquatic_udp_protocol::PeerClient; +use aquatic_udp_protocol::{PeerClient, PeerId}; +use compact_str::CompactString; use crossbeam_channel::Receiver; use num_format::{Locale, ToFormattedString}; use serde::Serialize; @@ -46,6 +47,17 @@ pub fn run_statistics_worker( shared_state: State, statistics_receiver: Receiver, ) { + let process_peer_client_data = { + let mut collect = config.statistics.write_html_to_file; + + #[cfg(feature = "prometheus")] + { + collect |= config.statistics.run_prometheus_endpoint; + } + + collect & config.statistics.peer_clients + }; + let opt_tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); @@ -71,7 +83,7 @@ pub fn run_statistics_worker( "6".into(), ); - let mut peer_clients: IndexMap = IndexMap::default(); + let mut peers: IndexMap = IndexMap::default(); loop { let start_time = Instant::now(); @@ -81,45 +93,16 @@ pub fn run_statistics_worker( StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), StatisticsMessage::PeerAdded(peer_id) => { - let peer_client = peer_id.client(); + if process_peer_client_data { + let peer_client = peer_id.client(); + let prefix = peer_id.first_8_bytes_hex(); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint - && config.statistics.prometheus_peer_clients - { - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => peer_client.to_string(), - "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), - ); + peers.insert(peer_id, (peer_client, prefix)); } - - *peer_clients.entry(peer_client).or_insert(0) += 1; } StatisticsMessage::PeerRemoved(peer_id) => { - let peer_client = peer_id.client(); - - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint - && config.statistics.prometheus_peer_clients - { - ::metrics::decrement_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => peer_client.to_string(), - "peer_id_prefix_hex" => peer_id.first_8_bytes_hex().to_string(), - ); - } - - if let Some(count) = peer_clients.get_mut(&peer_client) { - if *count == 1 { - drop(count); - - peer_clients.remove(&peer_client); - } else { - *count -= 1; - } + if process_peer_client_data { + peers.remove(&peer_id); } } } @@ -134,6 +117,60 @@ pub fn run_statistics_worker( &config, ); + let peer_clients = if process_peer_client_data { + let mut clients: IndexMap = IndexMap::default(); + + #[cfg(feature = "prometheus")] + let mut prefixes: IndexMap = IndexMap::default(); + + for (peer_client, prefix) in peers.values() { + *clients.entry(peer_client.to_owned()).or_insert(0) += 1; + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_id_prefixes + { + *prefixes.entry(prefix.to_owned()).or_insert(0) += 1; + } + } + + clients.sort_unstable_by(|_, a, _, b| b.cmp(a)); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint + && config.statistics.prometheus_peer_id_prefixes + { + for (prefix, count) in prefixes { + ::metrics::gauge!( + "aquatic_peer_id_prefixes", + count as f64, + "prefix_hex" => prefix.to_string(), + ); + } + } + + let mut client_vec = Vec::with_capacity(clients.len()); + + for (client, count) in clients { + if config.statistics.write_html_to_file { + client_vec.push((client.to_string(), count.to_formatted_string(&Locale::en))); + } + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_peer_clients", + count as f64, + "client" => client.to_string(), + ); + } + } + + client_vec + } else { + Vec::new() + }; + if config.statistics.print_to_stdout { println!("General:"); println!( @@ -154,29 +191,11 @@ pub fn run_statistics_worker( } if let Some(tt) = opt_tt.as_ref() { - let mut peer_clients = if config.statistics.extended { - peer_clients.iter().collect() - } else { - Vec::new() - }; - - peer_clients.sort_unstable_by(|a, b| b.1.cmp(a.1)); - - let peer_clients = peer_clients - .into_iter() - .map(|(peer_client, count)| { - ( - peer_client.to_string(), - count.to_formatted_string(&Locale::en), - ) - }) - .collect(); - let template_data = TemplateData { stylesheet: STYLESHEET_CONTENTS.to_string(), ipv4_active: config.network.ipv4_active(), ipv6_active: config.network.ipv6_active(), - extended_active: config.statistics.extended, + extended_active: config.statistics.torrent_peer_histograms, ipv4: statistics_ipv4, ipv6: statistics_ipv6, last_updated: OffsetDateTime::now_utc() @@ -191,6 +210,8 @@ pub fn run_statistics_worker( } } + peers.shrink_to_fit(); + if let Some(time_remaining) = Duration::from_secs(config.statistics.interval).checked_sub(start_time.elapsed()) { @@ -236,7 +257,7 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) { statistics.num_peers, config.cleaning.torrent_cleaning_interval ); - if config.statistics.extended { + if config.statistics.torrent_peer_histograms { println!( " peers per torrent (updated every {}s)", config.cleaning.torrent_cleaning_interval diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 2c864e8..777dba4 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -82,7 +82,7 @@ impl TorrentData { PeerStatus::Stopped => self.peers.remove(&peer_id), }; - if config.statistics.extended { + if config.statistics.peer_clients { match (status, opt_removed_peer.is_some()) { // We added a new peer (PeerStatus::Leeching | PeerStatus::Seeding, false) => { @@ -158,7 +158,7 @@ impl TorrentData { if peer.is_seeder { self.num_seeders -= 1; } - if config.statistics.extended { + if config.statistics.peer_clients { if let Err(_) = statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)) { @@ -201,7 +201,8 @@ impl TorrentMap { ) -> (usize, Option>) { let mut num_peers = 0; - let mut opt_histogram: Option> = if config.statistics.extended { + let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms + { match Histogram::new(3) { Ok(histogram) => Some(histogram), Err(err) => { From e6b0585372722539f32f757c697243498d20f8a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 8 Jun 2023 01:25:58 +0200 Subject: [PATCH 14/20] udp: update config comments --- aquatic_udp/src/config.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 2be8729..4aac6cb 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -169,7 +169,8 @@ pub struct StatisticsConfig { /// /// Also, see `prometheus_peer_id_prefixes`. /// - /// Quite costly when it comes to CPU and RAM. + /// Expect a certain CPU hit (maybe 5% higher consumption) and a bit higher + /// memory use pub peer_clients: bool, /// Print statistics to standard output pub print_to_stdout: bool, @@ -184,6 +185,7 @@ pub struct StatisticsConfig { #[cfg(feature = "prometheus")] pub prometheus_endpoint_address: SocketAddr, /// Serve information on all peer id prefixes on the prometheus endpoint. + /// /// Requires `peer_clients` to be activated. /// /// May consume quite a bit of CPU and RAM, since data on every single peer From 8323d963c9d8ddad1699a556acee28523a03942e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 10 Jun 2023 00:05:23 +0200 Subject: [PATCH 15/20] udp: don't incorrectly remove peers in client counting --- aquatic_udp/src/workers/statistics/mod.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/aquatic_udp/src/workers/statistics/mod.rs b/aquatic_udp/src/workers/statistics/mod.rs index 6d1ee2c..c950850 100644 --- a/aquatic_udp/src/workers/statistics/mod.rs +++ b/aquatic_udp/src/workers/statistics/mod.rs @@ -83,7 +83,9 @@ pub fn run_statistics_worker( "6".into(), ); - let mut peers: IndexMap = IndexMap::default(); + // Store a count to enable not removing peers from the count completely + // just because they were removed from one torrent + let mut peers: IndexMap = IndexMap::default(); loop { let start_time = Instant::now(); @@ -94,15 +96,21 @@ pub fn run_statistics_worker( StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), StatisticsMessage::PeerAdded(peer_id) => { if process_peer_client_data { - let peer_client = peer_id.client(); - let prefix = peer_id.first_8_bytes_hex(); - - peers.insert(peer_id, (peer_client, prefix)); + peers + .entry(peer_id) + .or_insert_with(|| (0, peer_id.client(), peer_id.first_8_bytes_hex())) + .0 += 1; } } StatisticsMessage::PeerRemoved(peer_id) => { if process_peer_client_data { - peers.remove(&peer_id); + if let Some((count, _, _)) = peers.get_mut(&peer_id) { + *count -= 1; + + if *count == 0 { + peers.remove(&peer_id); + } + } } } } @@ -123,7 +131,8 @@ pub fn run_statistics_worker( #[cfg(feature = "prometheus")] let mut prefixes: IndexMap = IndexMap::default(); - for (peer_client, prefix) in peers.values() { + // Only count peer_ids once, even if they are in multiple torrents + for (_, peer_client, prefix) in peers.values() { *clients.entry(peer_client.to_owned()).or_insert(0) += 1; #[cfg(feature = "prometheus")] From 4baba7b34b24d9ad605152a39708330037873e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 14 Jun 2023 09:44:44 +0200 Subject: [PATCH 16/20] peer_id: fix deluge version parsing --- aquatic_peer_id/src/lib.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 77f713b..8c71082 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, sync::OnceLock}; +use std::{borrow::Cow, fmt::Display, sync::OnceLock}; use compact_str::{format_compact, CompactString}; use regex::bytes::Regex; @@ -49,10 +49,13 @@ pub enum PeerClient { impl PeerClient { pub fn from_prefix_and_version(prefix: &[u8], version: &[u8]) -> Self { fn three_digits_plus_prerelease(v1: char, v2: char, v3: char, v4: char) -> CompactString { - let prerelease = match v4 { - 'A' => " alpha", - 'B' => " beta", - _ => "", + let prerelease: Cow = match v4 { + 'd' | 'D' => " dev".into(), + 'a' | 'A' => " alpha".into(), + 'b' | 'B' => " beta".into(), + 'r' | 'R' => " rc".into(), + 's' | 'S' => " stable".into(), + other => format_compact!("{}", other).into(), }; format_compact!("{}.{}.{}{}", v1, v2, v3, prerelease) @@ -80,7 +83,7 @@ impl PeerClient { match prefix { b"AZ" => Self::Vuze(format_compact!("{}.{}.{}.{}", v1, v2, v3, v4)), b"BT" => Self::BitTorrent(three_digits_plus_prerelease(v1, v2, v3, v4)), - b"DE" => Self::Deluge(format_compact!("{}.{}.{}", v1, v2, v3)), + b"DE" => Self::Deluge(three_digits_plus_prerelease(v1, v2, v3, v4)), b"lt" => Self::LibTorrentRakshasa(format_compact!("{}.{}{}.{}", v1, v2, v3, v4)), b"LT" => Self::LibTorrentRasterbar(format_compact!("{}.{}{}.{}", v1, v2, v3, v4)), b"qB" => Self::QBitTorrent(format_compact!("{}.{}.{}", v1, v2, v3)), @@ -131,7 +134,7 @@ impl PeerClient { if let Some(caps) = AZ_RE .get_or_init(|| { - Regex::new(r"^\-(?P[a-zA-Z]{2})(?P[0-9]{3}[0-9AB])") + Regex::new(r"^\-(?P[a-zA-Z]{2})(?P[0-9]{3}[0-9a-zA-Z])") .expect("compile AZ_RE regex") }) .captures(&peer_id.0) @@ -224,6 +227,14 @@ mod tests { PeerClient::from_peer_id(&create_peer_id(b"-lt1234-k/asdh3")), PeerClient::LibTorrentRakshasa("1.23.4".into()) ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-DE123s-k/asdh3")), + PeerClient::Deluge("1.2.3 stable".into()) + ); + assert_eq!( + PeerClient::from_peer_id(&create_peer_id(b"-DE123r-k/asdh3")), + PeerClient::Deluge("1.2.3 rc".into()) + ); assert_eq!( PeerClient::from_peer_id(&create_peer_id(b"-UT123A-k/asdh3")), PeerClient::UTorrent("1.2.3 alpha".into()) From bbfe54670ab9df40c607491bf1fe7a9918f0905e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 14 Jun 2023 09:46:22 +0200 Subject: [PATCH 17/20] peer_id: don't surround version with parentheses when displaying --- aquatic_peer_id/src/lib.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 8c71082..6d3c790 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -172,20 +172,20 @@ impl PeerClient { impl Display for PeerClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::BitTorrent(v) => write!(f, "BitTorrent ({})", v.as_str()), - Self::Deluge(v) => write!(f, "Deluge ({})", v.as_str()), - Self::LibTorrentRakshasa(v) => write!(f, "lt (rakshasa) ({})", v.as_str()), - Self::LibTorrentRasterbar(v) => write!(f, "lt (rasterbar) ({})", v.as_str()), - Self::QBitTorrent(v) => write!(f, "QBitTorrent ({})", v.as_str()), - Self::Transmission(v) => write!(f, "Transmission ({})", v.as_str()), - Self::UTorrent(v) => write!(f, "µTorrent ({})", v.as_str()), - Self::UTorrentEmbedded(v) => write!(f, "µTorrent Emb. ({})", v.as_str()), - Self::UTorrentMac(v) => write!(f, "µTorrent Mac ({})", v.as_str()), - Self::UTorrentWeb(v) => write!(f, "µTorrent Web ({})", v.as_str()), - Self::Vuze(v) => write!(f, "Vuze ({})", v.as_str()), - Self::WebTorrent(v) => write!(f, "WebTorrent ({})", v.as_str()), - Self::WebTorrentDesktop(v) => write!(f, "WebTorrent Desktop ({})", v.as_str()), - Self::Mainline(v) => write!(f, "Mainline ({})", v.as_str()), + Self::BitTorrent(v) => write!(f, "BitTorrent {}", v.as_str()), + Self::Deluge(v) => write!(f, "Deluge {}", v.as_str()), + Self::LibTorrentRakshasa(v) => write!(f, "lt (rakshasa) {}", v.as_str()), + Self::LibTorrentRasterbar(v) => write!(f, "lt (rasterbar) {}", v.as_str()), + Self::QBitTorrent(v) => write!(f, "QBitTorrent {}", v.as_str()), + Self::Transmission(v) => write!(f, "Transmission {}", v.as_str()), + Self::UTorrent(v) => write!(f, "µTorrent {}", v.as_str()), + Self::UTorrentEmbedded(v) => write!(f, "µTorrent Emb. {}", v.as_str()), + Self::UTorrentMac(v) => write!(f, "µTorrent Mac {}", v.as_str()), + Self::UTorrentWeb(v) => write!(f, "µTorrent Web {}", v.as_str()), + Self::Vuze(v) => write!(f, "Vuze {}", v.as_str()), + Self::WebTorrent(v) => write!(f, "WebTorrent {}", v.as_str()), + Self::WebTorrentDesktop(v) => write!(f, "WebTorrent Desktop {}", v.as_str()), + Self::Mainline(v) => write!(f, "Mainline {}", v.as_str()), Self::OtherWithPrefixAndVersion { prefix, version } => { write!(f, "Other ({}) ({})", prefix.as_str(), version.as_str()) } From 32aa34366c1fc6521cb18709917c7616efaa9bcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 14 Jun 2023 09:49:05 +0200 Subject: [PATCH 18/20] peer_id: make quickcheck optional but default feature --- aquatic_peer_id/Cargo.toml | 5 ++++- aquatic_peer_id/src/lib.rs | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/aquatic_peer_id/Cargo.toml b/aquatic_peer_id/Cargo.toml index 36f5553..5563c1d 100644 --- a/aquatic_peer_id/Cargo.toml +++ b/aquatic_peer_id/Cargo.toml @@ -12,9 +12,12 @@ rust-version.workspace = true [lib] name = "aquatic_peer_id" +[features] +default = ["quickcheck"] + [dependencies] compact_str = "0.7" hex = "0.4" regex = "1" serde = { version = "1", features = ["derive"] } -quickcheck = "1" \ No newline at end of file +quickcheck = { version = "1", optional = true } \ No newline at end of file diff --git a/aquatic_peer_id/src/lib.rs b/aquatic_peer_id/src/lib.rs index 6d3c790..ce1a463 100644 --- a/aquatic_peer_id/src/lib.rs +++ b/aquatic_peer_id/src/lib.rs @@ -195,6 +195,7 @@ impl Display for PeerClient { } } +#[cfg(feature = "quickcheck")] impl quickcheck::Arbitrary for PeerId { fn arbitrary(g: &mut quickcheck::Gen) -> Self { let mut bytes = [0u8; 20]; @@ -207,6 +208,7 @@ impl quickcheck::Arbitrary for PeerId { } } +#[cfg(feature = "quickcheck")] #[cfg(test)] mod tests { use super::*; From 6675126d08ddf3fe100a8f82898609602f3d457e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 14 Jun 2023 11:09:23 +0200 Subject: [PATCH 19/20] ws: add prometheus peer client metrics --- CHANGELOG.md | 6 +++ Cargo.lock | 2 + aquatic_ws/Cargo.toml | 4 +- aquatic_ws/src/config.rs | 5 +++ aquatic_ws/src/lib.rs | 12 ++++++ aquatic_ws/src/workers/socket.rs | 67 ++++++++++++++++++++++++++++++++ 6 files changed, 95 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a05ce30..3d6e8f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,12 @@ * Add support for reporting peer client information +### aquatic_ws + +#### Added + +* Add support for reporting peer client information + ## 0.8.0 - 2023-03-17 ### General diff --git a/Cargo.lock b/Cargo.lock index cea9d22..5136f89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -310,6 +310,7 @@ version = "0.8.0" dependencies = [ "anyhow", "aquatic_common", + "aquatic_peer_id", "aquatic_toml_config", "aquatic_ws_protocol", "async-tungstenite", @@ -323,6 +324,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "privdrop", "quickcheck", diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 7a65872..90b656c 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -19,10 +19,11 @@ name = "aquatic_ws" [features] default = ["prometheus"] prometheus = ["metrics", "metrics-exporter-prometheus"] -metrics = ["dep:metrics"] +metrics = ["dep:metrics", "metrics-util"] [dependencies] aquatic_common = { workspace = true, features = ["rustls", "glommio"] } +aquatic_peer_id.workspace = true aquatic_toml_config.workspace = true aquatic_ws_protocol.workspace = true @@ -37,6 +38,7 @@ hashbrown = { version = "0.13", features = ["serde"] } httparse = "1" log = "0.4" metrics = { version = "0.21", optional = true } +metrics-util = { version = "0.15", optional = true } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 42b34f7..36bb828 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -156,6 +156,10 @@ pub struct MetricsConfig { pub prometheus_endpoint_address: SocketAddr, /// Update metrics for torrent count this often (seconds) pub torrent_count_update_interval: u64, + /// Collect data on peer clients. + /// + /// Expect a certain CPU hit + pub peer_clients: bool, } #[cfg(feature = "metrics")] @@ -165,6 +169,7 @@ impl Default for MetricsConfig { run_prometheus_endpoint: false, prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), torrent_count_update_interval: 10, + peer_clients: false, } } } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 1263175..423b006 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -3,6 +3,7 @@ pub mod config; pub mod workers; use std::sync::Arc; +use std::time::Duration; use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; @@ -39,7 +40,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { if config.metrics.run_prometheus_endpoint { use metrics_exporter_prometheus::PrometheusBuilder; + let idle_timeout = config + .cleaning + .connection_cleaning_interval + .max(config.cleaning.torrent_cleaning_interval) + .max(config.metrics.torrent_count_update_interval) + * 2; + PrometheusBuilder::new() + .idle_timeout( + metrics_util::MetricKindMask::GAUGE, + Some(Duration::from_secs(idle_timeout)), + ) .with_http_listener(config.metrics.prometheus_endpoint_address) .install() .with_context(|| { diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index f0c76ae..8559612 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -11,6 +11,7 @@ use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, A use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{PanicSentinel, ServerStartInstant}; +use aquatic_peer_id::PeerClient; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -51,6 +52,7 @@ struct ConnectionReference { valid_until: ValidUntil, announced_info_hashes: HashMap, ip_version: IpVersion, + opt_peer_client: Option, } pub async fn run_socket_worker( @@ -154,6 +156,7 @@ pub async fn run_socket_worker( ), announced_info_hashes: Default::default(), ip_version, + opt_peer_client: None, }); ::log::trace!("accepting stream, assigning id {}", key); @@ -221,6 +224,15 @@ pub async fn run_socket_worker( .await .unwrap(); } + + #[cfg(feature = "prometheus")] + if let Some(peer_client) = reference.opt_peer_client { + ::metrics::decrement_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => peer_client.to_string(), + ); + } } }), tq_regular) .unwrap() @@ -246,6 +258,18 @@ async fn clean_connections( connection_slab.borrow_mut().retain(|_, reference| { if reference.valid_until.valid(now) { + #[cfg(feature = "prometheus")] + if let Some(peer_client) = &reference.opt_peer_client { + // As long as connection is still alive, increment peer client + // gauge by zero to prevent it from being removed due to + // idleness + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 0.0, + "client" => peer_client.to_string(), + ); + } + true } else { if let Some(ref handle) = reference.task_handle { @@ -258,6 +282,31 @@ async fn clean_connections( connection_slab.borrow_mut().shrink_to_fit(); + #[cfg(feature = "metrics")] + { + // Increment gauges by zero to prevent them from being removed due to + // idleness + + let worker_index = WORKER_INDEX.with(|index| index.get()).to_string(); + + if config.network.address.is_ipv4() || !config.network.only_ipv6 { + ::metrics::increment_gauge!( + "aquatic_active_connections", + 0.0, + "ip_version" => "4", + "worker_index" => worker_index.clone(), + ); + } + if config.network.address.is_ipv6() { + ::metrics::increment_gauge!( + "aquatic_active_connections", + 0.0, + "ip_version" => "6", + "worker_index" => worker_index, + ); + } + } + Some(Duration::from_secs( config.cleaning.connection_cleaning_interval, )) @@ -575,6 +624,24 @@ impl ConnectionReader { } } Entry::Vacant(entry) => { + #[cfg(feature = "prometheus")] + if self.config.metrics.run_prometheus_endpoint + && self.config.metrics.peer_clients + && connection_reference.opt_peer_client.is_none() + { + let client = + aquatic_peer_id::PeerId(announce_request.peer_id.0) + .client(); + + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + ); + + connection_reference.opt_peer_client = Some(client); + }; + entry.insert(announce_request.peer_id); } } From 0204b6fcc2dea04d85f667fb0e321d9a0e0c7c52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 14 Jun 2023 11:36:08 +0200 Subject: [PATCH 20/20] ws: support serving metrics on peer id prefixes --- aquatic_ws/src/config.rs | 9 ++++++- aquatic_ws/src/workers/socket.rs | 42 ++++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 36bb828..25651e1 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -156,10 +156,16 @@ pub struct MetricsConfig { pub prometheus_endpoint_address: SocketAddr, /// Update metrics for torrent count this often (seconds) pub torrent_count_update_interval: u64, - /// Collect data on peer clients. + /// Serve information on peer clients /// /// Expect a certain CPU hit pub peer_clients: bool, + /// Serve information on all peer id prefixes + /// + /// Requires `peer_clients` to be activated. + /// + /// Expect a certain CPU hit + pub peer_id_prefixes: bool, } #[cfg(feature = "metrics")] @@ -170,6 +176,7 @@ impl Default for MetricsConfig { prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), torrent_count_update_interval: 10, peer_clients: false, + peer_id_prefixes: false, } } } diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 8559612..09c1884 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -52,7 +52,7 @@ struct ConnectionReference { valid_until: ValidUntil, announced_info_hashes: HashMap, ip_version: IpVersion, - opt_peer_client: Option, + opt_peer_client: Option<(PeerClient, String)>, } pub async fn run_socket_worker( @@ -226,12 +226,20 @@ pub async fn run_socket_worker( } #[cfg(feature = "prometheus")] - if let Some(peer_client) = reference.opt_peer_client { + if let Some((peer_client, prefix)) = reference.opt_peer_client { ::metrics::decrement_gauge!( "aquatic_peer_clients", 1.0, "client" => peer_client.to_string(), ); + + if config.metrics.peer_id_prefixes { + ::metrics::decrement_gauge!( + "aquatic_peer_id_prefixes", + 1.0, + "prefix_hex" => prefix.to_string(), + ); + } } } }), tq_regular) @@ -259,15 +267,24 @@ async fn clean_connections( connection_slab.borrow_mut().retain(|_, reference| { if reference.valid_until.valid(now) { #[cfg(feature = "prometheus")] - if let Some(peer_client) = &reference.opt_peer_client { + if let Some((peer_client, prefix)) = &reference.opt_peer_client { // As long as connection is still alive, increment peer client - // gauge by zero to prevent it from being removed due to + // gauges by zero to prevent them from being removed due to // idleness + ::metrics::increment_gauge!( "aquatic_peer_clients", 0.0, "client" => peer_client.to_string(), ); + + if config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 0.0, + "prefix_hex" => prefix.to_string(), + ); + } } true @@ -629,9 +646,10 @@ impl ConnectionReader { && self.config.metrics.peer_clients && connection_reference.opt_peer_client.is_none() { - let client = - aquatic_peer_id::PeerId(announce_request.peer_id.0) - .client(); + let peer_id = + aquatic_peer_id::PeerId(announce_request.peer_id.0); + let client = peer_id.client(); + let prefix = peer_id.first_8_bytes_hex().to_string(); ::metrics::increment_gauge!( "aquatic_peer_clients", @@ -639,7 +657,15 @@ impl ConnectionReader { "client" => client.to_string(), ); - connection_reference.opt_peer_client = Some(client); + if self.config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 1.0, + "prefix_hex" => prefix.to_string(), + ); + } + + connection_reference.opt_peer_client = Some((client, prefix)); }; entry.insert(announce_request.peer_id);