From 63ae98f67c26787599f18125f99f9fdff4455ea4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 24 Jan 2024 22:51:53 +0100 Subject: [PATCH 01/12] http: index peer map by IP and port instead of IP and peer id --- CHANGELOG.md | 5 ++++ crates/http/src/workers/swarm/storage.rs | 37 +++++------------------- crates/http_protocol/src/response.rs | 2 +- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a8def4..4e62a12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,11 @@ * Support running without TLS * Support running behind reverse proxy +#### Changed + +* Index peers by packet source IP and provided port instead of by source ip + and peer id. This is likely slightly faster. + #### Fixed * Fix bug where clean up after closing connections wasn't always done diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index d9ff867..fd8bacf 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -245,33 +245,29 @@ impl TorrentData { let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); - let peer_map_key = PeerMapKey { - peer_id: request.peer_id, - ip: peer_ip_address, + let peer_map_key = ResponsePeer { + ip_address: peer_ip_address, + port: request.port, }; let opt_removed_peer = match peer_status { PeerStatus::Leeching => { let peer = Peer { - ip_address: peer_ip_address, - port: request.port, valid_until, seeder: false, }; - self.peers.insert(peer_map_key.clone(), peer) + self.peers.insert(peer_map_key, peer) } PeerStatus::Seeding => { self.num_seeders += 1; let peer = Peer { - ip_address: peer_ip_address, - port: request.port, valid_until, seeder: true, }; - self.peers.insert(peer_map_key.clone(), peer) + self.peers.insert(peer_map_key, peer) } PeerStatus::Stopped => self.peers.remove(&peer_map_key), }; @@ -314,7 +310,7 @@ impl TorrentData { &self.peers, max_num_peers_to_take, peer_map_key, - Peer::to_response_peer, + |k, _| *k, ) }; @@ -322,31 +318,14 @@ impl TorrentData { } } -type PeerMap = IndexMap, Peer>; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct PeerMapKey { - pub peer_id: PeerId, - pub ip: I, -} +type PeerMap = IndexMap, Peer>; #[derive(Debug, Clone, Copy)] -struct Peer { - pub ip_address: I, - pub port: u16, +struct Peer { pub valid_until: ValidUntil, pub seeder: bool, } -impl Peer { - fn to_response_peer(_: &PeerMapKey, peer: &Self) -> ResponsePeer { - ResponsePeer { - ip_address: peer.ip_address, - port: peer.port, - } - } -} - #[derive(PartialEq, Eq, Clone, Copy, Debug)] enum PeerStatus { Seeding, diff --git a/crates/http_protocol/src/response.rs b/crates/http_protocol/src/response.rs index cc1ee92..ed36b73 100644 --- a/crates/http_protocol/src/response.rs +++ b/crates/http_protocol/src/response.rs @@ -8,7 +8,7 @@ use std::collections::BTreeMap; use super::common::*; use super::utils::*; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct ResponsePeer { pub ip_address: I, pub port: u16, From d346cf97aa9e627f2938d47b825f8dad4ce3015a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 24 Jan 2024 23:04:26 +0100 Subject: [PATCH 02/12] Update TODO --- TODO.md | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/TODO.md b/TODO.md index 79b4e4e..96631fc 100644 --- a/TODO.md +++ b/TODO.md @@ -2,22 +2,28 @@ ## High priority -* if peer_clients is on, add task to generate prometheus exports on regular - interval to clean up data +* udp + * if peer_clients is on, add task to generate prometheus exports on regular + interval to clean up data + +* http + * extract response peers while peer is removed, as in udp implementation + * consider storing small number of peers without extra heap allocation + * add CI transfer test for http without TLS + +* ws + * extract offer receiver peers while peer is removed, as in udp implementation + * consider storing small number of peers without extra heap allocation + * if peer_clients is on, add task to generate prometheus exports on regular + interval to clean up data * aquatic_bench * Opentracker "slow to get up to speed", is it due to getting faster once inserts are rarely needed since most ip-port combinations have been sent? In that case, a shorter duration (e.g., 30 seconds) would be a good idea. - * Maybe investigate aquatic memory use. - * Would it use significantly less memory to store peers in an ArrayVec if - there are only, say, 2 of them? -* CI transfer test - * add HTTP without TLS - -* http - * panic sentinel not working +* general + * panic sentinel not working? at least seemingly not in http? ## Medium priority @@ -42,10 +48,6 @@ * aquatic_ws * Add cleaning task for ConnectionHandle.announced_info_hashes? - * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity - * replacing indexmap_amortized / simd_json with equivalents doesn't help - * SinkExt::send maybe doesn't wake up properly? - * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? * Performance hyperoptimization (receive interrupts on correct core) * If there is no network card RSS support, do eBPF XDP CpuMap redirect based on packet info, to @@ -63,7 +65,6 @@ * thiserror? * CI * uring load test? - * what poll event capacity is actually needed? * load test * move additional request sending to for each received response, maybe with probability 0.2 From 73eeb22f662b1aa3e771c082922e53a5a9f12eaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 24 Jan 2024 23:33:15 +0100 Subject: [PATCH 03/12] http: extract response peers while announcing peer removed This improves performance by avoiding lots of comparisons --- TODO.md | 1 - crates/http/src/workers/swarm/storage.rs | 157 ++++++++++++++--------- 2 files changed, 94 insertions(+), 64 deletions(-) diff --git a/TODO.md b/TODO.md index 96631fc..2886dca 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ interval to clean up data * http - * extract response peers while peer is removed, as in udp implementation * consider storing small number of peers without extra heap allocation * add CI transfer test for http without TLS diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index fd8bacf..3bf17de 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -6,8 +6,7 @@ use rand::Rng; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::{ - extract_response_peers, CanonicalSocketAddr, IndexMap, SecondsSinceServerStart, - ServerStartInstant, ValidUntil, + CanonicalSocketAddr, IndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil, }; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::*; @@ -212,7 +211,7 @@ impl TorrentMaps { pub type TorrentMap = IndexMap>; pub struct TorrentData { - peers: PeerMap, + peers: IndexMap, Peer>, num_seeders: usize, } @@ -240,8 +239,6 @@ impl TorrentData { request: AnnounceRequest, valid_until: ValidUntil, ) -> (usize, usize, Vec>) { - // Insert/update/remove peer who sent this request - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); @@ -250,75 +247,109 @@ impl TorrentData { port: request.port, }; - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - let peer = Peer { - valid_until, - seeder: false, - }; + let opt_removed_peer = self.peers.remove(&peer_map_key); - self.peers.insert(peer_map_key, peer) - } - PeerStatus::Seeding => { - self.num_seeders += 1; - - let peer = Peer { - valid_until, - seeder: true, - }; - - self.peers.insert(peer_map_key, peer) - } - PeerStatus::Stopped => self.peers.remove(&peer_map_key), - }; - - if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { + if let Some(Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { self.num_seeders -= 1; } - #[cfg(feature = "metrics")] - match peer_status { - PeerStatus::Stopped if opt_removed_peer.is_some() => { - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - } - PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { - ::metrics::increment_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - } - _ => {} - } + let response_peers = match peer_status { + PeerStatus::Seeding | PeerStatus::Leeching => { + #[cfg(feature = "metrics")] + if opt_removed_peer.is_none() { + ::metrics::increment_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } - let response_peers = if let PeerStatus::Stopped = peer_status { - Vec::new() - } else { - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; - extract_response_peers( - rng, - &self.peers, - max_num_peers_to_take, - peer_map_key, - |k, _| *k, - ) + let response_peers = self.extract_response_peers(rng, max_num_peers_to_take); + + let peer = Peer { + valid_until, + seeder: peer_status == PeerStatus::Seeding, + }; + + self.peers.insert(peer_map_key, peer); + + if peer_status == PeerStatus::Seeding { + self.num_seeders += 1; + } + + response_peers + } + PeerStatus::Stopped => { + #[cfg(feature = "metrics")] + if opt_removed_peer.is_some() { + ::metrics::decrement_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } + + Vec::new() + } }; (self.num_seeders, self.num_leechers(), response_peers) } -} -type PeerMap = IndexMap, Peer>; + /// Extract response peers + /// + /// If there are more peers in map than `max_num_peers_to_take`, do a random + /// selection of peers from first and second halves of map in order to avoid + /// returning too homogeneous peers. + /// + /// Does NOT filter out announcing peer. + pub fn extract_response_peers( + &self, + rng: &mut impl Rng, + max_num_peers_to_take: usize, + ) -> Vec> { + if self.peers.len() <= max_num_peers_to_take { + self.peers.keys().copied().collect() + } else { + let middle_index = self.peers.len() / 2; + let num_to_take_per_half = max_num_peers_to_take / 2; + + let offset_half_one = { + let from = 0; + let to = usize::max(1, middle_index - num_to_take_per_half); + + rng.gen_range(from..to) + }; + let offset_half_two = { + let from = middle_index; + let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half); + + rng.gen_range(from..to) + }; + + let end_half_one = offset_half_one + num_to_take_per_half; + let end_half_two = offset_half_two + num_to_take_per_half; + + let mut peers = Vec::with_capacity(max_num_peers_to_take); + + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { + peers.extend(slice.keys()); + } + if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { + peers.extend(slice.keys()); + } + + peers + } + } +} #[derive(Debug, Clone, Copy)] struct Peer { From fe6a7ef8b5ad381c71c4c7f8b7a2f90411825020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 24 Jan 2024 23:37:35 +0100 Subject: [PATCH 04/12] http: update crate description --- crates/http/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 5b98dcb..60930a8 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aquatic_http" -description = "High-performance open BitTorrent tracker (HTTP over TLS)" +description = "High-performance open HTTP BitTorrent tracker (with optional TLS)" keywords = ["http", "server", "peer-to-peer", "torrent", "bittorrent"] version.workspace = true authors.workspace = true From c7f7f010ca9f50c11588e0f73a53dc59f5c0f4bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 00:03:05 +0100 Subject: [PATCH 05/12] http: upgrade metrics crate to 0.22 --- Cargo.lock | 116 +++---------------- crates/http/Cargo.toml | 4 +- crates/http/src/workers/socket/connection.rs | 30 +++-- crates/http/src/workers/socket/mod.rs | 19 +-- crates/http/src/workers/swarm/mod.rs | 14 +-- crates/http/src/workers/swarm/storage.rs | 68 +++++++---- 6 files changed, 90 insertions(+), 161 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53a4b60..adfcd6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,8 +194,8 @@ dependencies = [ "libc", "log", "memchr", - "metrics 0.21.1", - "metrics-exporter-prometheus 0.12.2", + "metrics", + "metrics-exporter-prometheus", "mimalloc", "once_cell", "privdrop", @@ -306,9 +306,9 @@ dependencies = [ "io-uring", "libc", "log", - "metrics 0.22.0", - "metrics-exporter-prometheus 0.13.0", - "metrics-util 0.16.0", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "mio", "num-format", @@ -376,9 +376,9 @@ dependencies = [ "httparse", "indexmap 2.1.0", "log", - "metrics 0.22.0", - "metrics-exporter-prometheus 0.13.0", - "metrics-util 0.16.0", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "mimalloc", "privdrop", "quickcheck", @@ -1696,15 +1696,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "mach2" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" -dependencies = [ - "libc", -] - [[package]] name = "memchr" version = "2.7.1" @@ -1738,17 +1729,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metrics" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" -dependencies = [ - "ahash 0.8.7", - "metrics-macros", - "portable-atomic", -] - [[package]] name = "metrics" version = "0.22.0" @@ -1759,23 +1739,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "metrics-exporter-prometheus" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d4fa7ce7c4862db464a37b0b31d89bca874562f034bd7993895572783d02950" -dependencies = [ - "base64", - "hyper", - "indexmap 1.9.3", - "ipnet", - "metrics 0.21.1", - "metrics-util 0.15.1", - "quanta 0.11.1", - "thiserror", - "tokio", -] - [[package]] name = "metrics-exporter-prometheus" version = "0.13.0" @@ -1786,39 +1749,13 @@ dependencies = [ "hyper", "indexmap 1.9.3", "ipnet", - "metrics 0.22.0", - "metrics-util 0.16.0", - "quanta 0.12.2", + "metrics", + "metrics-util", + "quanta", "thiserror", "tokio", ] -[[package]] -name = "metrics-macros" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.48", -] - -[[package]] -name = "metrics-util" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", - "hashbrown 0.13.1", - "metrics 0.21.1", - "num_cpus", - "quanta 0.11.1", - "sketches-ddsketch 0.2.1", -] - [[package]] name = "metrics-util" version = "0.16.0" @@ -1830,10 +1767,10 @@ dependencies = [ "crossbeam-utils", "hashbrown 0.13.1", "indexmap 1.9.3", - "metrics 0.22.0", + "metrics", "num_cpus", "ordered-float", - "quanta 0.12.2", + "quanta", "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -2199,22 +2136,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quanta" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" -dependencies = [ - "crossbeam-utils", - "libc", - "mach2", - "once_cell", - "raw-cpuid 10.7.0", - "wasi", - "web-sys", - "winapi 0.3.9", -] - [[package]] name = "quanta" version = "0.12.2" @@ -2224,7 +2145,7 @@ dependencies = [ "crossbeam-utils", "libc", "once_cell", - "raw-cpuid 11.0.1", + "raw-cpuid", "wasi", "web-sys", "winapi 0.3.9", @@ -2311,15 +2232,6 @@ dependencies = [ "rand", ] -[[package]] -name = "raw-cpuid" -version = "10.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "raw-cpuid" version = "11.0.1" diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 60930a8..8c0a33b 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -39,8 +39,8 @@ httparse = "1" itoa = "1" libc = "0.2" log = "0.4" -metrics = { version = "0.21", optional = true } -metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } +metrics = { version = "0.22", optional = true } +metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } mimalloc = { version = "0.1", default-features = false } memchr = "2" privdrop = "0.5" diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs index d44508a..4ab82ab 100644 --- a/crates/http/src/workers/socket/connection.rs +++ b/crates/http/src/workers/socket/connection.rs @@ -26,9 +26,9 @@ use once_cell::sync::Lazy; use crate::common::*; use crate::config::Config; -use super::request::{parse_request, RequestParseError}; #[cfg(feature = "metrics")] -use super::{peer_addr_to_ip_version_str, WORKER_INDEX}; +use super::peer_addr_to_ip_version_str; +use super::request::{parse_request, RequestParseError}; const REQUEST_BUFFER_SIZE: usize = 2048; const RESPONSE_BUFFER_SIZE: usize = 4096; @@ -67,6 +67,7 @@ pub enum ConnectionError { Other(#[from] anyhow::Error), } +#[allow(clippy::too_many_arguments)] pub(super) async fn run_connection( config: Rc, access_list: Arc, @@ -75,6 +76,7 @@ pub(super) async fn run_connection( opt_tls_config: Option>>, valid_until: Rc>, stream: TcpStream, + worker_index: usize, ) -> Result<(), ConnectionError> { let access_list_cache = create_access_list_cache(&access_list); let request_buffer = Box::new([0u8; REQUEST_BUFFER_SIZE]); @@ -114,6 +116,7 @@ pub(super) async fn run_connection( request_buffer_position: 0, response_buffer, stream, + worker_index_string: worker_index.to_string(), }; conn.run().await @@ -130,6 +133,7 @@ pub(super) async fn run_connection( request_buffer_position: 0, response_buffer, stream, + worker_index_string: worker_index.to_string(), }; conn.run().await @@ -148,6 +152,7 @@ struct Connection { request_buffer_position: usize, response_buffer: Box<[u8; RESPONSE_BUFFER_SIZE]>, stream: S, + worker_index_string: String, } impl Connection @@ -244,12 +249,13 @@ where match request { Request::Announce(request) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_requests_total", "type" => "announce", "ip_version" => peer_addr_to_ip_version_str(&peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + "worker_index" => self.worker_index_string.clone(), + ) + .increment(1); let info_hash = request.info_hash; @@ -291,12 +297,13 @@ where } Request::Scrape(ScrapeRequest { info_hashes }) => { #[cfg(feature = "metrics")] - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_requests_total", "type" => "scrape", "ip_version" => peer_addr_to_ip_version_str(&peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + "worker_index" => self.worker_index_string.clone(), + ) + .increment(1); let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); @@ -438,12 +445,13 @@ where .opt_peer_addr .expect("peer addr should already have been extracted by now"); - ::metrics::increment_counter!( + ::metrics::counter!( "aquatic_responses_total", "type" => response_type, "ip_version" => peer_addr_to_ip_version_str(&peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + "worker_index" => self.worker_index_string.clone(), + ) + .increment(1); } Ok(()) diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index 025b4a4..a289607 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -25,9 +25,6 @@ use crate::common::*; use crate::config::Config; use crate::workers::socket::connection::{run_connection, ConnectionError}; -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - struct ConnectionHandle { close_conn_sender: LocalSender<()>, valid_until: Rc>, @@ -44,9 +41,6 @@ pub async fn run_socket_worker( server_start_instant: ServerStartInstant, worker_index: usize, ) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - let config = Rc::new(config); let access_list = state.access_list; @@ -93,12 +87,14 @@ pub async fn run_socket_worker( ) async move { #[cfg(feature = "metrics")] - ::metrics::increment_gauge!( + let active_connections_gauge = ::metrics::gauge!( "aquatic_active_connections", - 1.0, "worker_index" => worker_index.to_string(), ); + #[cfg(feature = "metrics")] + active_connections_gauge.increment(1.0); + let f1 = async { run_connection( config, access_list, @@ -107,6 +103,7 @@ pub async fn run_socket_worker( opt_tls_config, valid_until.clone(), stream, + worker_index, ).await }; let f2 = async { @@ -118,11 +115,7 @@ pub async fn run_socket_worker( let result = race(f1, f2).await; #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!( - "aquatic_active_connections", - 1.0, - "worker_index" => worker_index.to_string(), - ); + active_connections_gauge.decrement(1.0); match result { Ok(()) => (), diff --git a/crates/http/src/workers/swarm/mod.rs b/crates/http/src/workers/swarm/mod.rs index c93c028..5df1c46 100644 --- a/crates/http/src/workers/swarm/mod.rs +++ b/crates/http/src/workers/swarm/mod.rs @@ -18,9 +18,6 @@ use crate::config::Config; use self::storage::TorrentMaps; -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, @@ -29,12 +26,9 @@ pub async fn run_swarm_worker( server_start_instant: ServerStartInstant, worker_index: usize, ) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index))); let access_list = state.access_list; // Periodically clean torrents @@ -69,16 +63,14 @@ pub async fn run_swarm_worker( ::metrics::gauge!( "aquatic_torrents", - torrents.ipv4.len() as f64, "ip_version" => "4", "worker_index" => worker_index.to_string(), - ); + ).set(torrents.ipv4.len() as f64); ::metrics::gauge!( "aquatic_torrents", - torrents.ipv6.len() as f64, "ip_version" => "6", "worker_index" => worker_index.to_string(), - ); + ).set(torrents.ipv6.len() as f64); Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) })() diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index 3bf17de..b49b1ea 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -16,7 +16,6 @@ use aquatic_http_protocol::response::*; use crate::config::Config; #[cfg(feature = "metrics")] -use crate::workers::swarm::WORKER_INDEX; pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { #[cfg(feature = "metrics")] @@ -36,13 +35,35 @@ impl Ip for Ipv6Addr { } } -#[derive(Default)] pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, + #[cfg(feature = "metrics")] + pub ipv4_peer_gauge: metrics::Gauge, + #[cfg(feature = "metrics")] + pub ipv6_peer_gauge: metrics::Gauge, } impl TorrentMaps { + pub fn new(worker_index: usize) -> Self { + Self { + ipv4: Default::default(), + ipv6: Default::default(), + #[cfg(feature = "metrics")] + ipv4_peer_gauge: ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ), + #[cfg(feature = "metrics")] + ipv6_peer_gauge: ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ), + } + } + pub fn handle_announce_request( &mut self, config: &Config, @@ -63,6 +84,8 @@ impl TorrentMaps { peer_ip_address, request, valid_until, + #[cfg(feature = "metrics")] + &self.ipv4_peer_gauge, ); AnnounceResponse { @@ -85,6 +108,8 @@ impl TorrentMaps { peer_ip_address, request, valid_until, + #[cfg(feature = "metrics")] + &self.ipv6_peer_gauge, ); AnnounceResponse { @@ -157,8 +182,20 @@ impl TorrentMaps { let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); + Self::clean_torrent_map( + config, + &mut access_list_cache, + &mut self.ipv4, + now, + &self.ipv4_peer_gauge, + ); + Self::clean_torrent_map( + config, + &mut access_list_cache, + &mut self.ipv6, + now, + &self.ipv6_peer_gauge, + ); } fn clean_torrent_map( @@ -166,6 +203,7 @@ impl TorrentMaps { access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) { let mut total_num_peers = 0; @@ -197,12 +235,7 @@ impl TorrentMaps { let total_num_peers = total_num_peers as f64; #[cfg(feature = "metrics")] - ::metrics::gauge!( - "aquatic_peers", - total_num_peers, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peer_gauge.set(total_num_peers); torrent_map.shrink_to_fit(); } @@ -238,6 +271,7 @@ impl TorrentData { peer_ip_address: I, request: AnnounceRequest, valid_until: ValidUntil, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) -> (usize, usize, Vec>) { let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); @@ -257,12 +291,7 @@ impl TorrentData { PeerStatus::Seeding | PeerStatus::Leeching => { #[cfg(feature = "metrics")] if opt_removed_peer.is_none() { - ::metrics::increment_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peer_gauge.increment(1.0); } let max_num_peers_to_take = match request.numwant { @@ -288,12 +317,7 @@ impl TorrentData { PeerStatus::Stopped => { #[cfg(feature = "metrics")] if opt_removed_peer.is_some() { - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + peer_gauge.decrement(1.0); } Vec::new() From 09c61b884c28a4a3f921afef19bf8d3d80377982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 18:59:06 +0100 Subject: [PATCH 06/12] Update TODO --- TODO.md | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/TODO.md b/TODO.md index 2886dca..0464cc9 100644 --- a/TODO.md +++ b/TODO.md @@ -2,20 +2,14 @@ ## High priority -* udp - * if peer_clients is on, add task to generate prometheus exports on regular - interval to clean up data +* general + * add task to generate prometheus exports on regular interval to clean up + data. this is important if peer_clients is activated * http * consider storing small number of peers without extra heap allocation * add CI transfer test for http without TLS -* ws - * extract offer receiver peers while peer is removed, as in udp implementation - * consider storing small number of peers without extra heap allocation - * if peer_clients is on, add task to generate prometheus exports on regular - interval to clean up data - * aquatic_bench * Opentracker "slow to get up to speed", is it due to getting faster once inserts are rarely needed since most ip-port combinations have been sent? From 238cce9b16fca774a7cbdb8a0dcf843ff4d298fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 19:05:27 +0100 Subject: [PATCH 07/12] Move common/extract_response_peers to ws since it is only user --- crates/common/src/lib.rs | 137 ------------------------ crates/ws/src/workers/swarm/storage.rs | 140 ++++++++++++++++++++++++- 2 files changed, 137 insertions(+), 140 deletions(-) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 6d0dbb6..c8f4243 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -139,140 +139,3 @@ impl CanonicalSocketAddr { self.0.is_ipv4() } } - -/// Extract response peers -/// -/// If there are more peers in map than `max_num_peers_to_take`, do a random -/// selection of peers from first and second halves of map in order to avoid -/// returning too homogeneous peers. -#[inline] -pub fn extract_response_peers( - rng: &mut impl Rng, - peer_map: &IndexMap, - max_num_peers_to_take: usize, - sender_peer_map_key: K, - peer_conversion_function: F, -) -> Vec -where - K: Eq + ::std::hash::Hash, - F: Fn(&K, &V) -> R, -{ - if peer_map.len() <= max_num_peers_to_take + 1 { - // This branch: number of peers in map (minus sender peer) is less than - // or equal to number of peers to take, so return all except sender - // peer. - let mut peers = Vec::with_capacity(peer_map.len()); - - peers.extend(peer_map.iter().filter_map(|(k, v)| { - (*k != sender_peer_map_key).then_some(peer_conversion_function(k, v)) - })); - - // Handle the case when sender peer is not in peer list. Typically, - // this function will not be called when this is the case. - if peers.len() > max_num_peers_to_take { - peers.pop(); - } - - peers - } else { - // Note: if this branch is taken, the peer map contains at least two - // more peers than max_num_peers_to_take - - let middle_index = peer_map.len() / 2; - // Add one to take two extra peers in case sender peer is among - // selected peers and will need to be filtered out - let num_to_take_per_half = (max_num_peers_to_take / 2) + 1; - - let offset_half_one = { - let from = 0; - let to = usize::max(1, middle_index - num_to_take_per_half); - - rng.gen_range(from..to) - }; - let offset_half_two = { - let from = middle_index; - let to = usize::max(middle_index + 1, peer_map.len() - num_to_take_per_half); - - rng.gen_range(from..to) - }; - - let end_half_one = offset_half_one + num_to_take_per_half; - let end_half_two = offset_half_two + num_to_take_per_half; - - let mut peers = Vec::with_capacity(max_num_peers_to_take + 2); - - if let Some(slice) = peer_map.get_range(offset_half_one..end_half_one) { - peers.extend(slice.iter().filter_map(|(k, v)| { - (*k != sender_peer_map_key).then_some(peer_conversion_function(k, v)) - })); - } - if let Some(slice) = peer_map.get_range(offset_half_two..end_half_two) { - peers.extend(slice.iter().filter_map(|(k, v)| { - (*k != sender_peer_map_key).then_some(peer_conversion_function(k, v)) - })); - } - - while peers.len() > max_num_peers_to_take { - peers.pop(); - } - - peers - } -} - -#[cfg(test)] -mod tests { - use ahash::HashSet; - - use rand::{rngs::SmallRng, SeedableRng}; - - use super::*; - - #[test] - fn test_extract_response_peers() { - let mut rng = SmallRng::from_entropy(); - - for num_peers_in_map in 0..50 { - for max_num_peers_to_take in 0..50 { - for sender_peer_map_key in 0..50 { - test_extract_response_peers_helper( - &mut rng, - num_peers_in_map, - max_num_peers_to_take, - sender_peer_map_key, - ); - } - } - } - } - - fn test_extract_response_peers_helper( - rng: &mut SmallRng, - num_peers_in_map: usize, - max_num_peers_to_take: usize, - sender_peer_map_key: usize, - ) { - let peer_map = IndexMap::from_iter((0..num_peers_in_map).map(|i| (i, i))); - - let response_peers = extract_response_peers( - rng, - &peer_map, - max_num_peers_to_take, - sender_peer_map_key, - |_, p| *p, - ); - - if num_peers_in_map > max_num_peers_to_take + 1 { - assert_eq!(response_peers.len(), max_num_peers_to_take); - } else { - assert!(response_peers.len() <= max_num_peers_to_take); - } - - assert!(!response_peers.contains(&sender_peer_map_key)); - - assert_eq!( - response_peers.len(), - HashSet::from_iter(response_peers.iter().copied()).len() - ); - } -} diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index d579ad9..9eff053 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -10,10 +10,9 @@ use hashbrown::HashMap; use metrics::Gauge; use rand::rngs::SmallRng; -use aquatic_common::{ - extract_response_peers, IndexMap, SecondsSinceServerStart, ServerStartInstant, -}; +use aquatic_common::{IndexMap, SecondsSinceServerStart, ServerStartInstant}; use aquatic_ws_protocol::common::*; +use rand::Rng; use crate::common::*; use crate::config::Config; @@ -505,3 +504,138 @@ impl PeerStatus { } } } + +/// Extract response peers +/// +/// If there are more peers in map than `max_num_peers_to_take`, do a random +/// selection of peers from first and second halves of map in order to avoid +/// returning too homogeneous peers. +#[inline] +pub fn extract_response_peers( + rng: &mut impl Rng, + peer_map: &IndexMap, + max_num_peers_to_take: usize, + sender_peer_map_key: K, + peer_conversion_function: F, +) -> Vec +where + K: Eq + ::std::hash::Hash, + F: Fn(&K, &V) -> R, +{ + if peer_map.len() <= max_num_peers_to_take + 1 { + // This branch: number of peers in map (minus sender peer) is less than + // or equal to number of peers to take, so return all except sender + // peer. + let mut peers = Vec::with_capacity(peer_map.len()); + + peers.extend(peer_map.iter().filter_map(|(k, v)| { + (*k != sender_peer_map_key).then_some(peer_conversion_function(k, v)) + })); + + // Handle the case when sender peer is not in peer list. Typically, + // this function will not be called when this is the case. + if peers.len() > max_num_peers_to_take { + peers.pop(); + } + + peers + } else { + // Note: if this branch is taken, the peer map contains at least two + // more peers than max_num_peers_to_take + + let middle_index = peer_map.len() / 2; + // Add one to take two extra peers in case sender peer is among + // selected peers and will need to be filtered out + let num_to_take_per_half = (max_num_peers_to_take / 2) + 1; + + let offset_half_one = { + let from = 0; + let to = usize::max(1, middle_index - num_to_take_per_half); + + rng.gen_range(from..to) + }; + let offset_half_two = { + let from = middle_index; + let to = usize::max(middle_index + 1, peer_map.len() - num_to_take_per_half); + + rng.gen_range(from..to) + }; + + let end_half_one = offset_half_one + num_to_take_per_half; + let end_half_two = offset_half_two + num_to_take_per_half; + + let mut peers = Vec::with_capacity(max_num_peers_to_take + 2); + + if let Some(slice) = peer_map.get_range(offset_half_one..end_half_one) { + peers.extend(slice.iter().filter_map(|(k, v)| { + (*k != sender_peer_map_key).then_some(peer_conversion_function(k, v)) + })); + } + if let Some(slice) = peer_map.get_range(offset_half_two..end_half_two) { + peers.extend(slice.iter().filter_map(|(k, v)| { + (*k != sender_peer_map_key).then_some(peer_conversion_function(k, v)) + })); + } + + while peers.len() > max_num_peers_to_take { + peers.pop(); + } + + peers + } +} + +#[cfg(test)] +mod tests { + use hashbrown::HashSet; + use rand::{rngs::SmallRng, SeedableRng}; + + use super::*; + + #[test] + fn test_extract_response_peers() { + let mut rng = SmallRng::from_entropy(); + + for num_peers_in_map in 0..50 { + for max_num_peers_to_take in 0..50 { + for sender_peer_map_key in 0..50 { + test_extract_response_peers_helper( + &mut rng, + num_peers_in_map, + max_num_peers_to_take, + sender_peer_map_key, + ); + } + } + } + } + + fn test_extract_response_peers_helper( + rng: &mut SmallRng, + num_peers_in_map: usize, + max_num_peers_to_take: usize, + sender_peer_map_key: usize, + ) { + let peer_map = IndexMap::from_iter((0..num_peers_in_map).map(|i| (i, i))); + + let response_peers = extract_response_peers( + rng, + &peer_map, + max_num_peers_to_take, + sender_peer_map_key, + |_, p| *p, + ); + + if num_peers_in_map > max_num_peers_to_take + 1 { + assert_eq!(response_peers.len(), max_num_peers_to_take); + } else { + assert!(response_peers.len() <= max_num_peers_to_take); + } + + assert!(!response_peers.contains(&sender_peer_map_key)); + + let unique: HashSet<_> = response_peers.iter().copied().collect(); + + assert_eq!(response_peers.len(), unique.len(),); + } +} From e6e663761cb04173c31457a6674a6460b6167a43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 19:35:45 +0100 Subject: [PATCH 08/12] ws: refactor swarm worker storage code for improved readability --- crates/ws/src/workers/swarm/mod.rs | 8 +- crates/ws/src/workers/swarm/storage.rs | 287 +++++++++++++------------ 2 files changed, 152 insertions(+), 143 deletions(-) diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index f26c34d..f885b97 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -21,9 +21,6 @@ use crate::SHARED_IN_CHANNEL_SIZE; use self::storage::TorrentMaps; -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - #[allow(clippy::too_many_arguments)] pub async fn run_swarm_worker( _sentinel: PanicSentinel, @@ -35,9 +32,6 @@ pub async fn run_swarm_worker( server_start_instant: ServerStartInstant, worker_index: usize, ) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) .await @@ -48,7 +42,7 @@ pub async fn run_swarm_worker( let out_message_senders = Rc::new(out_message_senders); - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index))); let access_list = state.access_list; // Periodically clean torrents diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 9eff053..5ef51b5 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -7,7 +7,6 @@ use aquatic_ws_protocol::outgoing::{ OutMessage, ScrapeResponse, ScrapeStatistics, }; use hashbrown::HashMap; -use metrics::Gauge; use rand::rngs::SmallRng; use aquatic_common::{IndexMap, SecondsSinceServerStart, ServerStartInstant}; @@ -16,50 +15,20 @@ use rand::Rng; use crate::common::*; use crate::config::Config; -use crate::workers::swarm::WORKER_INDEX; - -type TorrentMap = IndexMap; -type PeerMap = IndexMap; pub struct TorrentMaps { ipv4: TorrentMap, ipv6: TorrentMap, - peers_gauge_ipv4: Gauge, - peers_gauge_ipv6: Gauge, - torrents_gauge_ipv4: Gauge, - torrents_gauge_ipv6: Gauge, -} - -impl Default for TorrentMaps { - fn default() -> Self { - Self { - ipv4: Default::default(), - ipv6: Default::default(), - peers_gauge_ipv4: ::metrics::gauge!( - "aquatic_peers", - "ip_version" => "4", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - peers_gauge_ipv6: ::metrics::gauge!( - "aquatic_peers", - "ip_version" => "6", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - torrents_gauge_ipv4: ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => "4", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - torrents_gauge_ipv6: ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => "6", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - } - } } impl TorrentMaps { + pub fn new(worker_index: usize) -> Self { + Self { + ipv4: TorrentMap::new(worker_index, IpVersion::V4), + ipv6: TorrentMap::new(worker_index, IpVersion::V6), + } + } + pub fn handle_announce_request( &mut self, config: &Config, @@ -69,11 +38,121 @@ impl TorrentMaps { request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { - let torrent_data = if let IpVersion::V4 = request_sender_meta.ip_version { - self.ipv4.entry(request.info_hash).or_default() - } else { - self.ipv6.entry(request.info_hash).or_default() + self.get_torrent_map_by_ip_version(request_sender_meta.ip_version) + .handle_announce_request( + config, + rng, + out_messages, + server_start_instant, + request_sender_meta, + request, + ); + } + + pub fn handle_scrape_request( + &mut self, + config: &Config, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + meta: InMessageMeta, + request: ScrapeRequest, + ) { + self.get_torrent_map_by_ip_version(meta.ip_version) + .handle_scrape_request(config, out_messages, meta, request); + } + + pub fn clean( + &mut self, + config: &Config, + access_list: &Arc, + server_start_instant: ServerStartInstant, + ) { + let mut access_list_cache = create_access_list_cache(access_list); + let now = server_start_instant.seconds_elapsed(); + + self.ipv4.clean(config, &mut access_list_cache, now); + self.ipv6.clean(config, &mut access_list_cache, now); + } + + #[cfg(feature = "metrics")] + pub fn update_torrent_count_metrics(&self) { + self.ipv4.update_torrent_gauge(); + self.ipv6.update_torrent_gauge(); + } + + pub fn handle_connection_closed( + &mut self, + info_hash: InfoHash, + peer_id: PeerId, + ip_version: IpVersion, + ) { + self.get_torrent_map_by_ip_version(ip_version) + .handle_connection_closed(info_hash, peer_id); + } + + fn get_torrent_map_by_ip_version(&mut self, ip_version: IpVersion) -> &mut TorrentMap { + match ip_version { + IpVersion::V4 => &mut self.ipv4, + IpVersion::V6 => &mut self.ipv6, + } + } +} + +struct TorrentMap { + torrents: IndexMap, + #[cfg(feature = "metrics")] + torrent_gauge: ::metrics::Gauge, + #[cfg(feature = "metrics")] + peer_gauge: ::metrics::Gauge, +} + +impl TorrentMap { + pub fn new(worker_index: usize, ip_version: IpVersion) -> Self { + #[cfg(feature = "metrics")] + let peer_gauge = match ip_version { + IpVersion::V4 => ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ), + IpVersion::V6 => ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ), }; + #[cfg(feature = "metrics")] + let torrent_gauge = match ip_version { + IpVersion::V4 => ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ), + IpVersion::V6 => ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ), + }; + + Self { + torrents: Default::default(), + #[cfg(feature = "metrics")] + peer_gauge, + #[cfg(feature = "metrics")] + torrent_gauge, + } + } + + pub fn handle_announce_request( + &mut self, + config: &Config, + rng: &mut SmallRng, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + server_start_instant: ServerStartInstant, + request_sender_meta: InMessageMeta, + request: AnnounceRequest, + ) { + let torrent_data = self.torrents.entry(request.info_hash).or_default(); let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); @@ -126,10 +205,7 @@ impl TorrentMaps { } #[cfg(feature = "metrics")] - match request_sender_meta.ip_version { - IpVersion::V4 => self.peers_gauge_ipv4.decrement(1.0), - IpVersion::V6 => self.peers_gauge_ipv6.decrement(1.0), - } + self.peer_gauge.decrement(1.0); return; } @@ -147,10 +223,7 @@ impl TorrentMaps { entry.insert(peer); #[cfg(feature = "metrics")] - match request_sender_meta.ip_version { - IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0), - IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0), - } + self.peer_gauge.increment(1.0) } PeerStatus::Seeding => { torrent_data.num_seeders += 1; @@ -166,10 +239,7 @@ impl TorrentMaps { entry.insert(peer); #[cfg(feature = "metrics")] - match request_sender_meta.ip_version { - IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0), - IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0), - } + self.peer_gauge.increment(1.0); } PeerStatus::Stopped => return, }, @@ -316,14 +386,8 @@ impl TorrentMaps { files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version { - &mut self.ipv4 - } else { - &mut self.ipv6 - }; - for info_hash in info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = torrent_map.get(&info_hash) { + if let Some(torrent_data) = self.torrents.get(&info_hash) { let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned @@ -337,41 +401,33 @@ impl TorrentMaps { out_messages.push((meta.into(), OutMessage::ScrapeResponse(out_message))); } - pub fn clean( - &mut self, - config: &Config, - access_list: &Arc, - server_start_instant: ServerStartInstant, - ) { - let mut access_list_cache = create_access_list_cache(access_list); - let now = server_start_instant.seconds_elapsed(); + pub fn handle_connection_closed(&mut self, info_hash: InfoHash, peer_id: PeerId) { + if let Some(torrent_data) = self.torrents.get_mut(&info_hash) { + if let Some(peer) = torrent_data.peers.remove(&peer_id) { + if peer.seeder { + torrent_data.num_seeders -= 1; + } - Self::clean_torrent_map( - config, - &mut access_list_cache, - &mut self.ipv4, - now, - &self.peers_gauge_ipv4, - ); - Self::clean_torrent_map( - config, - &mut access_list_cache, - &mut self.ipv6, - now, - &self.peers_gauge_ipv6, - ); + #[cfg(feature = "metrics")] + self.peer_gauge.decrement(1.0); + } + } } - fn clean_torrent_map( + #[cfg(feature = "metrics")] + pub fn update_torrent_gauge(&self) { + self.torrent_gauge.set(self.torrents.len() as f64); + } + + fn clean( + &mut self, config: &Config, access_list_cache: &mut AccessListCache, - torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, - peers_gauge: &Gauge, ) { let mut total_num_peers = 0u64; - torrent_map.retain(|info_hash, torrent_data| { + self.torrents.retain(|info_hash, torrent_data| { if !access_list_cache .load() .allows(config.access_list.mode, &info_hash.0) @@ -402,66 +458,23 @@ impl TorrentMaps { !torrent_data.peers.is_empty() }); - torrent_map.shrink_to_fit(); + self.torrents.shrink_to_fit(); #[cfg(feature = "metrics")] - peers_gauge.set(total_num_peers as f64) - } + self.peer_gauge.set(total_num_peers as f64); - #[cfg(feature = "metrics")] - pub fn update_torrent_count_metrics(&self) { - self.torrents_gauge_ipv4.set(self.ipv4.len() as f64); - self.torrents_gauge_ipv6.set(self.ipv6.len() as f64); - } - - pub fn handle_connection_closed( - &mut self, - info_hash: InfoHash, - peer_id: PeerId, - ip_version: IpVersion, - ) { - ::log::debug!("Removing peer from torrents because connection was closed"); - - if let IpVersion::V4 = ip_version { - if let Some(torrent_data) = self.ipv4.get_mut(&info_hash) { - torrent_data.remove_peer(peer_id); - - #[cfg(feature = "metrics")] - self.peers_gauge_ipv4.decrement(1.0); - } - } else if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) { - torrent_data.remove_peer(peer_id); - - #[cfg(feature = "metrics")] - self.peers_gauge_ipv6.decrement(1.0); - } + #[cfg(feature = "metrics")] + self.update_torrent_gauge(); } } +#[derive(Default)] struct TorrentData { - peers: PeerMap, + peers: IndexMap, num_seeders: usize, } -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - } - } -} - impl TorrentData { - fn remove_peer(&mut self, peer_id: PeerId) { - if let Some(peer) = self.peers.remove(&peer_id) { - if peer.seeder { - self.num_seeders -= 1; - } - } - } - fn num_leechers(&self) -> usize { self.peers.len() - self.num_seeders } @@ -510,6 +523,8 @@ impl PeerStatus { /// If there are more peers in map than `max_num_peers_to_take`, do a random /// selection of peers from first and second halves of map in order to avoid /// returning too homogeneous peers. +/// +/// Filters out announcing peer. #[inline] pub fn extract_response_peers( rng: &mut impl Rng, From f050467c2a3d13efbe537acadf06d43fa8ca12b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 23:24:10 +0100 Subject: [PATCH 09/12] ws: further refactor of swarm worker to improve readability --- crates/ws/src/workers/swarm/storage.rs | 420 +++++++++++--------- crates/ws_protocol/src/incoming/announce.rs | 2 +- 2 files changed, 236 insertions(+), 186 deletions(-) diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 5ef51b5..2566abb 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -1,7 +1,9 @@ use std::sync::Arc; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_ws_protocol::incoming::{AnnounceEvent, AnnounceRequest, ScrapeRequest}; +use aquatic_ws_protocol::incoming::{ + AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, ScrapeRequest, +}; use aquatic_ws_protocol::outgoing::{ AnnounceResponse, AnswerOutMessage, ErrorResponse, ErrorResponseAction, OfferOutMessage, OutMessage, ScrapeResponse, ScrapeStatistics, @@ -154,8 +156,6 @@ impl TorrentMap { ) { let torrent_data = self.torrents.entry(request.info_hash).or_default(); - let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); - // If there is already a peer with this peer_id, check that connection id // is same as that of request sender. Otherwise, ignore request. Since // peers have access to each others peer_id's, they could send requests @@ -168,194 +168,48 @@ impl TorrentMap { ::log::trace!("received request from {:?}", request_sender_meta); - // Insert/update/remove peer who sent this request - { - let peer_status = PeerStatus::from_event_and_bytes_left( - request.event.unwrap_or_default(), - request.bytes_left, - ); + let peer_status = torrent_data.insert_or_update_peer( + config, + server_start_instant, + request_sender_meta, + &request, + &self.peer_gauge, + ); - match torrent_data.peers.entry(request.peer_id) { - ::indexmap::map::Entry::Occupied(mut entry) => match peer_status { - PeerStatus::Leeching => { - let peer = entry.get_mut(); - - if peer.seeder { - torrent_data.num_seeders -= 1; - } - - peer.seeder = false; - peer.valid_until = valid_until; - } - PeerStatus::Seeding => { - let peer = entry.get_mut(); - - if !peer.seeder { - torrent_data.num_seeders += 1; - } - - peer.seeder = true; - peer.valid_until = valid_until; - } - PeerStatus::Stopped => { - let peer = entry.remove(); - - if peer.seeder { - torrent_data.num_seeders -= 1; - } - - #[cfg(feature = "metrics")] - self.peer_gauge.decrement(1.0); - - return; - } - }, - ::indexmap::map::Entry::Vacant(entry) => match peer_status { - PeerStatus::Leeching => { - let peer = Peer { - connection_id: request_sender_meta.connection_id, - consumer_id: request_sender_meta.out_message_consumer_id, - seeder: false, - valid_until, - expecting_answers: Default::default(), - }; - - entry.insert(peer); - - #[cfg(feature = "metrics")] - self.peer_gauge.increment(1.0) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - let peer = Peer { - connection_id: request_sender_meta.connection_id, - consumer_id: request_sender_meta.out_message_consumer_id, - seeder: true, - valid_until, - expecting_answers: Default::default(), - }; - - entry.insert(peer); - - #[cfg(feature = "metrics")] - self.peer_gauge.increment(1.0); - } - PeerStatus::Stopped => return, - }, - } - }; - - // If peer sent offers, send them on to random peers - if let Some(offers) = request.offers { - // FIXME: config: also maybe check this when parsing request - let max_num_peers_to_take = offers.len().min(config.protocol.max_offers); - - #[inline] - fn convert_offer_receiver_peer( - peer_id: &PeerId, - peer: &Peer, - ) -> (PeerId, ConnectionId, ConsumerId) { - (*peer_id, peer.connection_id, peer.consumer_id) + if peer_status != PeerStatus::Stopped { + if let Some(offers) = request.offers { + torrent_data.handle_offers( + config, + rng, + server_start_instant, + request.info_hash, + request.peer_id, + offers, + out_messages, + ); } - let offer_receivers: Vec<(PeerId, ConnectionId, ConsumerId)> = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - request.peer_id, - convert_offer_receiver_peer, - ); + if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) = ( + request.answer, + request.answer_to_peer_id, + request.answer_offer_id, + ) { + let opt_out_message = torrent_data.handle_answer( + request_sender_meta, + request.info_hash, + request.peer_id, + answer_receiver_id, + offer_id, + answer, + ); - if let Some(peer) = torrent_data.peers.get_mut(&request.peer_id) { - for ( - offer, - ( - offer_receiver_peer_id, - offer_receiver_connection_id, - offer_receiver_consumer_id, - ), - ) in offers.into_iter().zip(offer_receivers) - { - let offer_out_message = OfferOutMessage { - action: AnnounceAction::Announce, - info_hash: request.info_hash, - peer_id: request.peer_id, - offer: offer.offer, - offer_id: offer.offer_id, - }; - - let meta = OutMessageMeta { - out_message_consumer_id: offer_receiver_consumer_id, - connection_id: offer_receiver_connection_id, - pending_scrape_id: None, - }; - - out_messages.push((meta, OutMessage::OfferOutMessage(offer_out_message))); - ::log::trace!("sending OfferOutMessage to {:?}", meta); - - peer.expecting_answers.insert( - ExpectingAnswer { - from_peer_id: offer_receiver_peer_id, - regarding_offer_id: offer.offer_id, - }, - ValidUntil::new(server_start_instant, config.cleaning.max_offer_age), - ); + if let Some(out_message) = opt_out_message { + out_messages.push(out_message); } } } - // If peer sent answer, send it on to relevant peer - if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) = ( - request.answer, - request.answer_to_peer_id, - request.answer_offer_id, - ) { - if let Some(answer_receiver) = torrent_data.peers.get_mut(&answer_receiver_id) { - let expecting_answer = ExpectingAnswer { - from_peer_id: request.peer_id, - regarding_offer_id: offer_id, - }; - - if answer_receiver - .expecting_answers - .remove(&expecting_answer) - .is_some() - { - let answer_out_message = AnswerOutMessage { - action: AnnounceAction::Announce, - peer_id: request.peer_id, - info_hash: request.info_hash, - answer, - offer_id, - }; - - let meta = OutMessageMeta { - out_message_consumer_id: answer_receiver.consumer_id, - connection_id: answer_receiver.connection_id, - pending_scrape_id: None, - }; - - out_messages.push((meta, OutMessage::AnswerOutMessage(answer_out_message))); - ::log::trace!("sending AnswerOutMessage to {:?}", meta); - } else { - let error_message = ErrorResponse { - action: Some(ErrorResponseAction::Announce), - info_hash: Some(request.info_hash), - failure_reason: - "Could not find the offer corresponding to your answer. It may have expired." - .into(), - }; - - out_messages.push(( - request_sender_meta.into(), - OutMessage::ErrorResponse(error_message), - )); - } - } - } - - let out_message = OutMessage::AnnounceResponse(AnnounceResponse { + let response = OutMessage::AnnounceResponse(AnnounceResponse { action: AnnounceAction::Announce, info_hash: request.info_hash, complete: torrent_data.num_seeders, @@ -363,7 +217,7 @@ impl TorrentMap { announce_interval: config.protocol.peer_announce_interval, }); - out_messages.push((request_sender_meta.into(), out_message)); + out_messages.push((request_sender_meta.into(), response)); } pub fn handle_scrape_request( @@ -478,6 +332,202 @@ impl TorrentData { fn num_leechers(&self) -> usize { self.peers.len() - self.num_seeders } + + pub fn insert_or_update_peer( + &mut self, + config: &Config, + server_start_instant: ServerStartInstant, + request_sender_meta: InMessageMeta, + request: &AnnounceRequest, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, + ) -> PeerStatus { + let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); + + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event.unwrap_or_default(), + request.bytes_left, + ); + + match self.peers.entry(request.peer_id) { + ::indexmap::map::Entry::Occupied(mut entry) => match peer_status { + PeerStatus::Leeching => { + let peer = entry.get_mut(); + + if peer.seeder { + self.num_seeders -= 1; + } + + peer.seeder = false; + peer.valid_until = valid_until; + } + PeerStatus::Seeding => { + let peer = entry.get_mut(); + + if !peer.seeder { + self.num_seeders += 1; + } + + peer.seeder = true; + peer.valid_until = valid_until; + } + PeerStatus::Stopped => { + let peer = entry.remove(); + + if peer.seeder { + self.num_seeders -= 1; + } + + #[cfg(feature = "metrics")] + peer_gauge.decrement(1.0); + } + }, + ::indexmap::map::Entry::Vacant(entry) => match peer_status { + PeerStatus::Leeching => { + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: false, + valid_until, + expecting_answers: Default::default(), + }; + + entry.insert(peer); + + #[cfg(feature = "metrics")] + peer_gauge.increment(1.0) + } + PeerStatus::Seeding => { + self.num_seeders += 1; + + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: true, + valid_until, + expecting_answers: Default::default(), + }; + + entry.insert(peer); + + #[cfg(feature = "metrics")] + peer_gauge.increment(1.0); + } + PeerStatus::Stopped => (), + }, + } + + peer_status + } + + /// Pass on offers to random peers + #[allow(clippy::too_many_arguments)] + pub fn handle_offers( + &mut self, + config: &Config, + rng: &mut SmallRng, + server_start_instant: ServerStartInstant, + info_hash: InfoHash, + sender_peer_id: PeerId, + offers: Vec, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + ) { + let max_num_peers_to_take = offers.len().min(config.protocol.max_offers); + + let offer_receivers: Vec<(PeerId, ConnectionId, ConsumerId)> = extract_response_peers( + rng, + &self.peers, + max_num_peers_to_take, + sender_peer_id, + |peer_id, peer| (*peer_id, peer.connection_id, peer.consumer_id), + ); + + if let Some(peer) = self.peers.get_mut(&sender_peer_id) { + for ( + offer, + (offer_receiver_peer_id, offer_receiver_connection_id, offer_receiver_consumer_id), + ) in offers.into_iter().zip(offer_receivers) + { + peer.expecting_answers.insert( + ExpectingAnswer { + from_peer_id: offer_receiver_peer_id, + regarding_offer_id: offer.offer_id, + }, + ValidUntil::new(server_start_instant, config.cleaning.max_offer_age), + ); + + let offer_out_message = OfferOutMessage { + action: AnnounceAction::Announce, + info_hash, + peer_id: sender_peer_id, + offer: offer.offer, + offer_id: offer.offer_id, + }; + + let meta = OutMessageMeta { + out_message_consumer_id: offer_receiver_consumer_id, + connection_id: offer_receiver_connection_id, + pending_scrape_id: None, + }; + + out_messages.push((meta, OutMessage::OfferOutMessage(offer_out_message))); + } + } + } + + /// Pass on answer to relevant peer + fn handle_answer( + &mut self, + request_sender_meta: InMessageMeta, + info_hash: InfoHash, + peer_id: PeerId, + answer_receiver_id: PeerId, + offer_id: OfferId, + answer: RtcAnswer, + ) -> Option<(OutMessageMeta, OutMessage)> { + if let Some(answer_receiver) = self.peers.get_mut(&answer_receiver_id) { + let expecting_answer = ExpectingAnswer { + from_peer_id: peer_id, + regarding_offer_id: offer_id, + }; + + if answer_receiver + .expecting_answers + .remove(&expecting_answer) + .is_some() + { + let answer_out_message = AnswerOutMessage { + action: AnnounceAction::Announce, + peer_id, + info_hash, + answer, + offer_id, + }; + + let meta = OutMessageMeta { + out_message_consumer_id: answer_receiver.consumer_id, + connection_id: answer_receiver.connection_id, + pending_scrape_id: None, + }; + + Some((meta, OutMessage::AnswerOutMessage(answer_out_message))) + } else { + let error_message = ErrorResponse { + action: Some(ErrorResponseAction::Announce), + info_hash: Some(info_hash), + failure_reason: + "Could not find the offer corresponding to your answer. It may have expired." + .into(), + }; + + Some(( + request_sender_meta.into(), + OutMessage::ErrorResponse(error_message), + )) + } + } else { + None + } + } } #[derive(Clone, Debug)] @@ -523,7 +573,7 @@ impl PeerStatus { /// If there are more peers in map than `max_num_peers_to_take`, do a random /// selection of peers from first and second halves of map in order to avoid /// returning too homogeneous peers. -/// +/// /// Filters out announcing peer. #[inline] pub fn extract_response_peers( diff --git a/crates/ws_protocol/src/incoming/announce.rs b/crates/ws_protocol/src/incoming/announce.rs index 3a93eb9..36aef31 100644 --- a/crates/ws_protocol/src/incoming/announce.rs +++ b/crates/ws_protocol/src/incoming/announce.rs @@ -57,7 +57,7 @@ pub struct AnnounceRequest { pub answer_offer_id: Option, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum AnnounceEvent { Started, From e4422cf3ff86f52d96d3aa7c5633ed9d1845ff59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 23:40:19 +0100 Subject: [PATCH 10/12] ws: swarm worker: more readability refactors --- crates/ws/src/workers/swarm/storage.rs | 95 +++++++++++++++----------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 2566abb..abb6a22 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -40,15 +40,16 @@ impl TorrentMaps { request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { - self.get_torrent_map_by_ip_version(request_sender_meta.ip_version) - .handle_announce_request( - config, - rng, - out_messages, - server_start_instant, - request_sender_meta, - request, - ); + let torrent_map = self.get_torrent_map_by_ip_version(request_sender_meta.ip_version); + + torrent_map.handle_announce_request( + config, + rng, + out_messages, + server_start_instant, + request_sender_meta, + request, + ); } pub fn handle_scrape_request( @@ -58,8 +59,9 @@ impl TorrentMaps { meta: InMessageMeta, request: ScrapeRequest, ) { - self.get_torrent_map_by_ip_version(meta.ip_version) - .handle_scrape_request(config, out_messages, meta, request); + let torrent_map = self.get_torrent_map_by_ip_version(meta.ip_version); + + torrent_map.handle_scrape_request(config, out_messages, meta, request); } pub fn clean( @@ -87,8 +89,9 @@ impl TorrentMaps { peer_id: PeerId, ip_version: IpVersion, ) { - self.get_torrent_map_by_ip_version(ip_version) - .handle_connection_closed(info_hash, peer_id); + let torrent_map = self.get_torrent_map_by_ip_version(ip_version); + + torrent_map.handle_connection_closed(info_hash, peer_id); } fn get_torrent_map_by_ip_version(&mut self, ip_version: IpVersion) -> &mut TorrentMap { @@ -257,14 +260,7 @@ impl TorrentMap { pub fn handle_connection_closed(&mut self, info_hash: InfoHash, peer_id: PeerId) { if let Some(torrent_data) = self.torrents.get_mut(&info_hash) { - if let Some(peer) = torrent_data.peers.remove(&peer_id) { - if peer.seeder { - torrent_data.num_seeders -= 1; - } - - #[cfg(feature = "metrics")] - self.peer_gauge.decrement(1.0); - } + torrent_data.handle_connection_closed(peer_id, &self.peer_gauge); } } @@ -289,27 +285,11 @@ impl TorrentMap { return false; } - let num_seeders = &mut torrent_data.num_seeders; + let num_peers = torrent_data.clean_and_get_num_peers(now); - torrent_data.peers.retain(|_, peer| { - peer.expecting_answers - .retain(|_, valid_until| valid_until.valid(now)); - peer.expecting_answers.shrink_to_fit(); + total_num_peers += num_peers as u64; - let keep = peer.valid_until.valid(now); - - if (!keep) & peer.seeder { - *num_seeders -= 1; - } - - keep - }); - - total_num_peers += torrent_data.peers.len() as u64; - - torrent_data.peers.shrink_to_fit(); - - !torrent_data.peers.is_empty() + num_peers > 0 }); self.torrents.shrink_to_fit(); @@ -528,6 +508,41 @@ impl TorrentData { None } } + + pub fn handle_connection_closed( + &mut self, + peer_id: PeerId, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, + ) { + if let Some(peer) = self.peers.remove(&peer_id) { + if peer.seeder { + self.num_seeders -= 1; + } + + #[cfg(feature = "metrics")] + peer_gauge.decrement(1.0); + } + } + + fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize { + self.peers.retain(|_, peer| { + peer.expecting_answers + .retain(|_, valid_until| valid_until.valid(now)); + peer.expecting_answers.shrink_to_fit(); + + let keep = peer.valid_until.valid(now); + + if (!keep) & peer.seeder { + self.num_seeders -= 1; + } + + keep + }); + + self.peers.shrink_to_fit(); + + self.peers.len() + } } #[derive(Clone, Debug)] From 9fc7abd568e5a7a1c61a925beef4c645b83443b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 23:43:48 +0100 Subject: [PATCH 11/12] aquatic_common: remove unused import --- crates/common/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index c8f4243..6ef4240 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Instant; use ahash::RandomState; -use rand::Rng; pub mod access_list; pub mod cli; From 3b22e9d82979f36e3dfc4360359450cfe2902b86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 23:51:38 +0100 Subject: [PATCH 12/12] Update CHANGELOG --- CHANGELOG.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e62a12..ce509a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,10 +15,6 @@ #### Added * Add support for reporting peer client information -* Speed up parsing and serialization of requests and responses with - [zerocopy](https://crates.io/crates/zerocopy) -* Store torrents with up to two peers without an extra heap allocation for the - peers. #### Changed @@ -28,10 +24,16 @@ * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed +* Avoid a heap allocation for torrents with two or less peers. This can save + a lot of memory if many torrents are tracked +* Improve announce performance by avoiding having to filter response peers +* In announce response statistics, don't include announcing peer * Distribute announce responses from swarm workers over socket workers to decrease performance loss due to underutilized threads * Harden ConnectionValidator to make IP spoofing even more costly * Remove config key `network.poll_event_capacity` (always use 1) +* Speed up parsing and serialization of requests and responses by using + [zerocopy](https://crates.io/crates/zerocopy) ### aquatic_http @@ -45,6 +47,8 @@ * Index peers by packet source IP and provided port instead of by source ip and peer id. This is likely slightly faster. +* Improve announce performance by avoiding having to filter response peers +* In announce response statistics, don't include announcing peer #### Fixed