From 73eeb22f662b1aa3e771c082922e53a5a9f12eaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 24 Jan 2024 23:33:15 +0100 Subject: [PATCH] http: extract response peers while announcing peer removed This improves performance by avoiding lots of comparisons --- TODO.md | 1 - crates/http/src/workers/swarm/storage.rs | 157 ++++++++++++++--------- 2 files changed, 94 insertions(+), 64 deletions(-) diff --git a/TODO.md b/TODO.md index 96631fc..2886dca 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ interval to clean up data * http - * extract response peers while peer is removed, as in udp implementation * consider storing small number of peers without extra heap allocation * add CI transfer test for http without TLS diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index fd8bacf..3bf17de 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -6,8 +6,7 @@ use rand::Rng; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::{ - extract_response_peers, CanonicalSocketAddr, IndexMap, SecondsSinceServerStart, - ServerStartInstant, ValidUntil, + CanonicalSocketAddr, IndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil, }; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::*; @@ -212,7 +211,7 @@ impl TorrentMaps { pub type TorrentMap = IndexMap>; pub struct TorrentData { - peers: PeerMap, + peers: IndexMap, Peer>, num_seeders: usize, } @@ -240,8 +239,6 @@ impl TorrentData { request: AnnounceRequest, valid_until: ValidUntil, ) -> (usize, usize, Vec>) { - // Insert/update/remove peer who sent this request - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); @@ -250,75 +247,109 @@ impl TorrentData { port: request.port, }; - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - let peer = Peer { - valid_until, - seeder: false, - }; + let opt_removed_peer = self.peers.remove(&peer_map_key); - self.peers.insert(peer_map_key, peer) - } - PeerStatus::Seeding => { - self.num_seeders += 1; - - let peer = Peer { - valid_until, - seeder: true, - }; - - self.peers.insert(peer_map_key, peer) - } - PeerStatus::Stopped => self.peers.remove(&peer_map_key), - }; - - if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { + if let Some(Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { self.num_seeders -= 1; } - #[cfg(feature = "metrics")] - match peer_status { - PeerStatus::Stopped if opt_removed_peer.is_some() => { - ::metrics::decrement_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - } - PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { - ::metrics::increment_gauge!( - "aquatic_peers", - 1.0, - "ip_version" => I::ip_version_str(), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - } - _ => {} - } + let response_peers = match peer_status { + PeerStatus::Seeding | PeerStatus::Leeching => { + #[cfg(feature = "metrics")] + if opt_removed_peer.is_none() { + ::metrics::increment_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } - let response_peers = if let PeerStatus::Stopped = peer_status { - Vec::new() - } else { - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; - extract_response_peers( - rng, - &self.peers, - max_num_peers_to_take, - peer_map_key, - |k, _| *k, - ) + let response_peers = self.extract_response_peers(rng, max_num_peers_to_take); + + let peer = Peer { + valid_until, + seeder: peer_status == PeerStatus::Seeding, + }; + + self.peers.insert(peer_map_key, peer); + + if peer_status == PeerStatus::Seeding { + self.num_seeders += 1; + } + + response_peers + } + PeerStatus::Stopped => { + #[cfg(feature = "metrics")] + if opt_removed_peer.is_some() { + ::metrics::decrement_gauge!( + "aquatic_peers", + 1.0, + "ip_version" => I::ip_version_str(), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } + + Vec::new() + } }; (self.num_seeders, self.num_leechers(), response_peers) } -} -type PeerMap = IndexMap, Peer>; + /// Extract response peers + /// + /// If there are more peers in map than `max_num_peers_to_take`, do a random + /// selection of peers from first and second halves of map in order to avoid + /// returning too homogeneous peers. + /// + /// Does NOT filter out announcing peer. + pub fn extract_response_peers( + &self, + rng: &mut impl Rng, + max_num_peers_to_take: usize, + ) -> Vec> { + if self.peers.len() <= max_num_peers_to_take { + self.peers.keys().copied().collect() + } else { + let middle_index = self.peers.len() / 2; + let num_to_take_per_half = max_num_peers_to_take / 2; + + let offset_half_one = { + let from = 0; + let to = usize::max(1, middle_index - num_to_take_per_half); + + rng.gen_range(from..to) + }; + let offset_half_two = { + let from = middle_index; + let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half); + + rng.gen_range(from..to) + }; + + let end_half_one = offset_half_one + num_to_take_per_half; + let end_half_two = offset_half_two + num_to_take_per_half; + + let mut peers = Vec::with_capacity(max_num_peers_to_take); + + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { + peers.extend(slice.keys()); + } + if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { + peers.extend(slice.keys()); + } + + peers + } + } +} #[derive(Debug, Clone, Copy)] struct Peer {