diff --git a/CHANGELOG.md b/CHANGELOG.md index ce509a0..aa9ff32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,8 @@ * Index peers by packet source IP and provided port instead of by source ip and peer id. This is likely slightly faster. +* Avoid a heap allocation for torrents with four or less peers. This can save + a lot of memory if many torrents are tracked * Improve announce performance by avoiding having to filter response peers * In announce response statistics, don't include announcing peer diff --git a/Cargo.lock b/Cargo.lock index adfcd6d..8caf354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,6 +183,7 @@ dependencies = [ "aquatic_http_protocol", "aquatic_toml_config", "arc-swap", + "arrayvec", "cfg-if", "either", "futures", diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 8c0a33b..3967ce2 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -28,6 +28,7 @@ aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true anyhow = "1" +arrayvec = "0.7" arc-swap = "1" cfg-if = "1" either = "1" diff --git a/crates/http/src/workers/swarm/mod.rs b/crates/http/src/workers/swarm/mod.rs index 5df1c46..d50797e 100644 --- a/crates/http/src/workers/swarm/mod.rs +++ b/crates/http/src/workers/swarm/mod.rs @@ -58,19 +58,8 @@ pub async fn run_swarm_worker( // Periodically update torrent count metrics #[cfg(feature = "metrics")] TimerActionRepeat::repeat(enclose!((config, torrents) move || { - enclose!((config, torrents, worker_index) move || async move { - let torrents = torrents.borrow_mut(); - - ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => "4", - "worker_index" => worker_index.to_string(), - ).set(torrents.ipv4.len() as f64); - ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => "6", - "worker_index" => worker_index.to_string(), - ).set(torrents.ipv6.len() as f64); + enclose!((config, torrents) move || async move { + torrents.borrow_mut().update_torrent_metrics(); Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) })() diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index b49b1ea..eab15be 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -2,6 +2,7 @@ use std::collections::BTreeMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::Arc; +use arrayvec::ArrayVec; use rand::Rng; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; @@ -15,52 +16,23 @@ use aquatic_http_protocol::response::*; use crate::config::Config; -#[cfg(feature = "metrics")] +const SMALL_PEER_MAP_CAPACITY: usize = 4; -pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { - #[cfg(feature = "metrics")] - fn ip_version_str() -> &'static str; -} +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} -impl Ip for Ipv4Addr { - #[cfg(feature = "metrics")] - fn ip_version_str() -> &'static str { - "4" - } -} -impl Ip for Ipv6Addr { - #[cfg(feature = "metrics")] - fn ip_version_str() -> &'static str { - "6" - } -} +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, - #[cfg(feature = "metrics")] - pub ipv4_peer_gauge: metrics::Gauge, - #[cfg(feature = "metrics")] - pub ipv6_peer_gauge: metrics::Gauge, } impl TorrentMaps { pub fn new(worker_index: usize) -> Self { Self { - ipv4: Default::default(), - ipv6: Default::default(), - #[cfg(feature = "metrics")] - ipv4_peer_gauge: ::metrics::gauge!( - "aquatic_peers", - "ip_version" => "4", - "worker_index" => worker_index.to_string(), - ), - #[cfg(feature = "metrics")] - ipv6_peer_gauge: ::metrics::gauge!( - "aquatic_peers", - "ip_version" => "6", - "worker_index" => worker_index.to_string(), - ), + ipv4: TorrentMap::new(worker_index, true), + ipv6: TorrentMap::new(worker_index, false), } } @@ -74,18 +46,13 @@ impl TorrentMaps { ) -> AnnounceResponse { match peer_addr.get().ip() { IpAddr::V4(peer_ip_address) => { - let (seeders, leechers, response_peers) = self - .ipv4 - .entry(request.info_hash) - .or_default() - .upsert_peer_and_get_response_peers( + let (seeders, leechers, response_peers) = + self.ipv4.upsert_peer_and_get_response_peers( config, rng, + valid_until, peer_ip_address, request, - valid_until, - #[cfg(feature = "metrics")] - &self.ipv4_peer_gauge, ); AnnounceResponse { @@ -98,18 +65,13 @@ impl TorrentMaps { } } IpAddr::V6(peer_ip_address) => { - let (seeders, leechers, response_peers) = self - .ipv6 - .entry(request.info_hash) - .or_default() - .upsert_peer_and_get_response_peers( + let (seeders, leechers, response_peers) = + self.ipv6.upsert_peer_and_get_response_peers( config, rng, + valid_until, peer_ip_address, request, - valid_until, - #[cfg(feature = "metrics")] - &self.ipv6_peer_gauge, ); AnnounceResponse { @@ -130,46 +92,16 @@ impl TorrentMaps { peer_addr: CanonicalSocketAddr, request: ScrapeRequest, ) -> ScrapeResponse { - let num_to_take = request - .info_hashes - .len() - .min(config.protocol.max_scrape_torrents); - - let mut response = ScrapeResponse { - files: BTreeMap::new(), - }; - - let peer_ip = peer_addr.get().ip(); - - // If request.info_hashes is empty, don't return scrape for all - // torrents, even though reference server does it. It is too expensive. - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = self.ipv4.get(&info_hash) { - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers(), - }; - - response.files.insert(info_hash, stats); - } - } + if peer_addr.get().ip().is_ipv4() { + self.ipv4.handle_scrape_request(config, request) } else { - for info_hash in request.info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = self.ipv6.get(&info_hash) { - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers(), - }; + self.ipv6.handle_scrape_request(config, request) + } + } - response.files.insert(info_hash, stats); - } - } - }; - - response + pub fn update_torrent_metrics(&self) { + self.ipv4.torrent_gauge.set(self.ipv4.torrents.len() as f64); + self.ipv6.torrent_gauge.set(self.ipv6.torrents.len() as f64); } pub fn clean( @@ -182,32 +114,117 @@ impl TorrentMaps { let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map( - config, - &mut access_list_cache, - &mut self.ipv4, - now, - &self.ipv4_peer_gauge, - ); - Self::clean_torrent_map( - config, - &mut access_list_cache, - &mut self.ipv6, - now, - &self.ipv6_peer_gauge, - ); + self.ipv4.clean(config, &mut access_list_cache, now); + self.ipv6.clean(config, &mut access_list_cache, now); + } +} + +pub struct TorrentMap { + torrents: IndexMap>, + #[cfg(feature = "metrics")] + peer_gauge: ::metrics::Gauge, + #[cfg(feature = "metrics")] + torrent_gauge: ::metrics::Gauge, +} + +impl TorrentMap { + fn new(worker_index: usize, ipv4: bool) -> Self { + #[cfg(feature = "metrics")] + let peer_gauge = if ipv4 { + ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ) + } else { + ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ) + }; + #[cfg(feature = "metrics")] + let torrent_gauge = if ipv4 { + ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ) + } else { + ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ) + }; + + Self { + torrents: Default::default(), + #[cfg(feature = "metrics")] + peer_gauge, + #[cfg(feature = "metrics")] + torrent_gauge, + } } - fn clean_torrent_map( + fn upsert_peer_and_get_response_peers( + &mut self, + config: &Config, + rng: &mut impl Rng, + valid_until: ValidUntil, + peer_ip_address: I, + request: AnnounceRequest, + ) -> (usize, usize, Vec>) { + self.torrents + .entry(request.info_hash) + .or_default() + .upsert_peer_and_get_response_peers( + config, + rng, + request, + peer_ip_address, + valid_until, + #[cfg(feature = "metrics")] + &self.peer_gauge, + ) + } + + fn handle_scrape_request(&mut self, config: &Config, request: ScrapeRequest) -> ScrapeResponse { + let num_to_take = request + .info_hashes + .len() + .min(config.protocol.max_scrape_torrents); + + let mut response = ScrapeResponse { + files: BTreeMap::new(), + }; + + for info_hash in request.info_hashes.into_iter().take(num_to_take) { + let stats = self + .torrents + .get(&info_hash) + .map(|torrent_data| torrent_data.scrape_statistics()) + .unwrap_or(ScrapeStatistics { + complete: 0, + incomplete: 0, + downloaded: 0, + }); + + response.files.insert(info_hash, stats); + } + + response + } + + fn clean( + &mut self, config: &Config, access_list_cache: &mut AccessListCache, - torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, - #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) { let mut total_num_peers = 0; - torrent_map.retain(|info_hash, torrent_data| { + self.torrents.retain(|info_hash, torrent_data| { if !access_list_cache .load() .allows(config.access_list.mode, &info_hash.0) @@ -215,116 +232,216 @@ impl TorrentMaps { return false; } - let num_seeders = &mut torrent_data.num_seeders; + let num_peers = match torrent_data { + TorrentData::Small(t) => t.clean_and_get_num_peers(now), + TorrentData::Large(t) => t.clean_and_get_num_peers(now), + }; - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.valid(now); + total_num_peers += num_peers as u64; - if (!keep) & peer.seeder { - *num_seeders -= 1; - } - - keep - }); - - total_num_peers += torrent_data.peers.len() as u64; - - !torrent_data.peers.is_empty() + num_peers > 0 }); - let total_num_peers = total_num_peers as f64; + self.torrents.shrink_to_fit(); #[cfg(feature = "metrics")] - peer_gauge.set(total_num_peers); - - torrent_map.shrink_to_fit(); + self.peer_gauge.set(total_num_peers as f64); } } -pub type TorrentMap = IndexMap>; - -pub struct TorrentData { - peers: IndexMap, Peer>, - num_seeders: usize, -} - -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - } - } +pub enum TorrentData { + Small(SmallPeerMap), + Large(LargePeerMap), } impl TorrentData { - fn num_leechers(&self) -> usize { - self.peers.len() - self.num_seeders - } - - /// Insert/update peer. Return num_seeders, num_leechers and response peers fn upsert_peer_and_get_response_peers( &mut self, config: &Config, rng: &mut impl Rng, - peer_ip_address: I, request: AnnounceRequest, + ip_address: I, valid_until: ValidUntil, #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, ) -> (usize, usize, Vec>) { - let peer_status = - PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; + + let status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); let peer_map_key = ResponsePeer { - ip_address: peer_ip_address, + ip_address, port: request.port, }; - let opt_removed_peer = self.peers.remove(&peer_map_key); + // Create the response before inserting the peer. This means that we + // don't have to filter it out from the response peers, and that the + // reported number of seeders/leechers will not include it + let (response_data, opt_removed_peer) = match self { + Self::Small(peer_map) => { + let opt_removed_peer = peer_map.remove(&peer_map_key); - if let Some(Peer { seeder: true, .. }) = opt_removed_peer.as_ref() { - self.num_seeders -= 1; - } + let (seeders, leechers) = peer_map.num_seeders_leechers(); + let response_peers = peer_map.extract_response_peers(max_num_peers_to_take); - let response_peers = match peer_status { - PeerStatus::Seeding | PeerStatus::Leeching => { + // Convert peer map to large variant if it is full and + // announcing peer is not stopped and will therefore be + // inserted + if peer_map.is_full() && status != PeerStatus::Stopped { + *self = Self::Large(peer_map.to_large()); + } + + ((seeders, leechers, response_peers), opt_removed_peer) + } + Self::Large(peer_map) => { + let opt_removed_peer = peer_map.remove_peer(&peer_map_key); + + let (seeders, leechers) = peer_map.num_seeders_leechers(); + let response_peers = peer_map.extract_response_peers(rng, max_num_peers_to_take); + + // Try shrinking the map if announcing peer is stopped and + // will therefore not be inserted + if status == PeerStatus::Stopped { + if let Some(peer_map) = peer_map.try_shrink() { + *self = Self::Small(peer_map); + } + } + + ((seeders, leechers, response_peers), opt_removed_peer) + } + }; + + match status { + PeerStatus::Leeching | PeerStatus::Seeding => { #[cfg(feature = "metrics")] if opt_removed_peer.is_none() { peer_gauge.increment(1.0); } - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; - - let response_peers = self.extract_response_peers(rng, max_num_peers_to_take); - let peer = Peer { + is_seeder: status == PeerStatus::Seeding, valid_until, - seeder: peer_status == PeerStatus::Seeding, }; - self.peers.insert(peer_map_key, peer); - - if peer_status == PeerStatus::Seeding { - self.num_seeders += 1; + match self { + Self::Small(peer_map) => peer_map.insert(peer_map_key, peer), + Self::Large(peer_map) => peer_map.insert(peer_map_key, peer), } - - response_peers } - PeerStatus::Stopped => { + PeerStatus::Stopped => + { #[cfg(feature = "metrics")] if opt_removed_peer.is_some() { peer_gauge.decrement(1.0); } - - Vec::new() } }; - (self.num_seeders, self.num_leechers(), response_peers) + response_data + } + + fn scrape_statistics(&self) -> ScrapeStatistics { + let (seeders, leechers) = match self { + Self::Small(peer_map) => peer_map.num_seeders_leechers(), + Self::Large(peer_map) => peer_map.num_seeders_leechers(), + }; + + ScrapeStatistics { + complete: seeders, + incomplete: leechers, + downloaded: 0, + } + } +} + +impl Default for TorrentData { + fn default() -> Self { + Self::Small(SmallPeerMap(ArrayVec::default())) + } +} + +/// Store torrents with very few peers without an extra heap allocation +/// +/// On public open trackers, this is likely to be the majority of torrents. +#[derive(Default, Debug)] +pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); + +impl SmallPeerMap { + fn is_full(&self) -> bool { + self.0.is_full() + } + + fn num_seeders_leechers(&self) -> (usize, usize) { + let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count(); + let leechers = self.0.len() - seeders; + + (seeders, leechers) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + self.0.push((key, peer)); + } + + fn remove(&mut self, key: &ResponsePeer) -> Option { + for (i, (k, _)) in self.0.iter().enumerate() { + if k == key { + return Some(self.0.remove(i).1); + } + } + + None + } + + fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec> { + Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k)) + } + + fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize { + self.0.retain(|(_, peer)| peer.valid_until.valid(now)); + + self.0.len() + } + + fn to_large(&self) -> LargePeerMap { + let (num_seeders, _) = self.num_seeders_leechers(); + let peers = self.0.iter().copied().collect(); + + LargePeerMap { peers, num_seeders } + } +} + +#[derive(Default)] +pub struct LargePeerMap { + peers: IndexMap, Peer>, + num_seeders: usize, +} + +impl LargePeerMap { + fn num_seeders_leechers(&self) -> (usize, usize) { + (self.num_seeders, self.peers.len() - self.num_seeders) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + if peer.is_seeder { + self.num_seeders += 1; + } + + self.peers.insert(key, peer); + } + + fn remove_peer(&mut self, key: &ResponsePeer) -> Option { + let opt_removed_peer = self.peers.remove(key); + + if let Some(Peer { + is_seeder: true, .. + }) = opt_removed_peer + { + self.num_seeders -= 1; + } + + opt_removed_peer } /// Extract response peers @@ -373,12 +490,36 @@ impl TorrentData { peers } } + + fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize { + self.peers.retain(|_, peer| { + let keep = peer.valid_until.valid(now); + + if (!keep) & peer.is_seeder { + self.num_seeders -= 1; + } + + keep + }); + + self.peers.shrink_to_fit(); + + self.peers.len() + } + + fn try_shrink(&mut self) -> Option> { + (self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| { + SmallPeerMap(ArrayVec::from_iter( + self.peers.iter().map(|(k, v)| (*k, *v)), + )) + }) + } } #[derive(Debug, Clone, Copy)] struct Peer { pub valid_until: ValidUntil, - pub seeder: bool, + pub is_seeder: bool, } #[derive(PartialEq, Eq, Clone, Copy, Debug)] @@ -389,14 +530,10 @@ enum PeerStatus { } impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: usize) -> Self { if let AnnounceEvent::Stopped = event { Self::Stopped - } else if let Some(0) = opt_bytes_left { + } else if bytes_left == 0 { Self::Seeding } else { Self::Leeching