diff --git a/CHANGELOG.md b/CHANGELOG.md index 09708fc..49183b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ * Index peers by packet source IP and provided port, instead of by peer_id. This prevents users from impersonating others and is likely also slightly faster for IPv4 peers. +* Store torrents with up to two peers without an extra heap allocation for the + peers. * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed diff --git a/Cargo.lock b/Cargo.lock index 3968b40..184c491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,7 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", + "arrayvec", "blake3", "cfg-if", "compact_str", diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 4e9eac3..599fa39 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -32,6 +32,7 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" +arrayvec = "0.7" blake3 = "1" cfg-if = "1" compact_str = "0.7" diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 4feb80d..28984b1 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -10,6 +10,7 @@ use aquatic_common::{ }; use aquatic_udp_protocol::*; +use arrayvec::ArrayVec; use crossbeam_channel::Sender; use hdrhistogram::Histogram; use rand::prelude::SmallRng; @@ -18,6 +19,8 @@ use rand::Rng; use crate::common::*; use crate::config::Config; +const SMALL_PEER_MAP_CAPACITY: usize = 2; + pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, @@ -84,7 +87,11 @@ impl TorrentMap { .0 .get(&info_hash) .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| create_torrent_scrape_statistics(0, 0)); + .unwrap_or_else(|| TorrentScrapeStatistics { + seeders: NumberOfPeers::new(0), + leechers: NumberOfPeers::new(0), + completed: NumberOfDownloads::new(0), + }); (i, stats) }); @@ -100,7 +107,7 @@ impl TorrentMap { access_list_mode: AccessListMode, now: SecondsSinceServerStart, ) -> (usize, Option>) { - let mut num_peers = 0; + let mut total_num_peers = 0; let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms { @@ -124,17 +131,27 @@ impl TorrentMap { return false; } - torrent.clean(config, statistics_sender, now); + let num_peers = match torrent { + TorrentData::Small(peer_map) => { + peer_map.clean_and_get_num_peers(config, statistics_sender, now) + } + TorrentData::Large(peer_map) => { + let num_peers = + peer_map.clean_and_get_num_peers(config, statistics_sender, now); - num_peers += torrent.peers.len(); + if let Some(peer_map) = peer_map.try_shrink() { + *torrent = TorrentData::Small(peer_map); + } + + num_peers + } + }; + + total_num_peers += num_peers; match opt_histogram { - Some(ref mut histogram) if torrent.peers.len() != 0 => { - let n = torrent - .peers - .len() - .try_into() - .expect("Couldn't fit usize into u64"); + Some(ref mut histogram) if num_peers > 0 => { + let n = num_peers.try_into().expect("Couldn't fit usize into u64"); if let Err(err) = histogram.record(n) { ::log::error!("Couldn't record {} to histogram: {:#}", n, err); @@ -143,12 +160,12 @@ impl TorrentMap { _ => (), } - !torrent.peers.is_empty() + num_peers > 0 }); self.0.shrink_to_fit(); - (num_peers, opt_histogram) + (total_num_peers, opt_histogram) } pub fn num_torrents(&self) -> usize { @@ -156,9 +173,9 @@ impl TorrentMap { } } -pub struct TorrentData { - peers: IndexMap, Peer>, - num_seeders: usize, +pub enum TorrentData { + Small(SmallPeerMap), + Large(LargePeerMap), } impl TorrentData { @@ -189,60 +206,75 @@ impl TorrentData { port: request.port, }; - let opt_removed_peer = self.peers.remove(&peer_map_key); - - if let Some(Peer { - is_seeder: true, .. - }) = opt_removed_peer - { - self.num_seeders -= 1; - } - // Create the response before inserting the peer. This means that we // don't have to filter it out from the response peers, and that the // reported number of seeders/leechers will not include it + let opt_removed_peer = match self { + Self::Small(peer_map) => { + let opt_removed_peer = peer_map.remove(&peer_map_key); - response.fixed = AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new(config.protocol.peer_announce_interval), - leechers: NumberOfPeers::new(self.num_leechers().try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(self.num_seeders().try_into().unwrap_or(i32::MAX)), + let (seeders, leechers) = peer_map.num_seeders_leechers(); + + response.fixed = AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }; + + peer_map.extract_response_peers(max_num_peers_to_take, &mut response.peers); + + // Convert peer map to large variant if it is full and + // announcing peer is not stopped and will therefore be + // inserted + if peer_map.is_full() && status != PeerStatus::Stopped { + *self = Self::Large(peer_map.to_large()); + } + + opt_removed_peer + } + Self::Large(peer_map) => { + let opt_removed_peer = peer_map.remove_peer(&peer_map_key); + + let (seeders, leechers) = peer_map.num_seeders_leechers(); + + response.fixed = AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }; + + peer_map.extract_response_peers(rng, max_num_peers_to_take, &mut response.peers); + + // Try shrinking the map if announcing peer is stopped and + // will therefore not be inserted + if status == PeerStatus::Stopped { + if let Some(peer_map) = peer_map.try_shrink() { + *self = Self::Small(peer_map); + } + } + + opt_removed_peer + } }; - extract_response_peers( - rng, - &self.peers, - max_num_peers_to_take, - |k, _| *k, - &mut response.peers, - ); - match status { - PeerStatus::Leeching => { + PeerStatus::Leeching | PeerStatus::Seeding => { let peer = Peer { peer_id: request.peer_id, - is_seeder: false, + is_seeder: status == PeerStatus::Seeding, valid_until, }; - self.peers.insert(peer_map_key, peer); - - if config.statistics.peer_clients && opt_removed_peer.is_none() { - statistics_sender - .try_send(StatisticsMessage::PeerAdded(request.peer_id)) - .expect("statistics channel should be unbounded"); + match self { + Self::Small(peer_map) => peer_map.insert(peer_map_key, peer), + Self::Large(peer_map) => peer_map.insert(peer_map_key, peer), } - } - PeerStatus::Seeding => { - let peer = Peer { - peer_id: request.peer_id, - is_seeder: true, - valid_until, - }; - - self.peers.insert(peer_map_key, peer); - - self.num_seeders += 1; if config.statistics.peer_clients && opt_removed_peer.is_none() { statistics_sender @@ -260,28 +292,180 @@ impl TorrentData { }; } - pub fn num_leechers(&self) -> usize { - self.peers.len() - self.num_seeders - } - - pub fn num_seeders(&self) -> usize { - self.num_seeders - } - pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { - create_torrent_scrape_statistics( - self.num_seeders.try_into().unwrap_or(i32::MAX), - self.num_leechers().try_into().unwrap_or(i32::MAX), - ) + let (seeders, leechers) = match self { + Self::Small(peer_map) => peer_map.num_seeders_leechers(), + Self::Large(peer_map) => peer_map.num_seeders_leechers(), + }; + + TorrentScrapeStatistics { + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + completed: NumberOfDownloads::new(0), + } + } +} + +impl Default for TorrentData { + fn default() -> Self { + Self::Small(SmallPeerMap(ArrayVec::default())) + } +} + +/// Store torrents with up to two peers without an extra heap allocation +/// +/// On public open trackers, this is likely to be the majority of torrents. +#[derive(Default, Debug)] +pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); + +impl SmallPeerMap { + fn is_full(&self) -> bool { + self.0.is_full() } - /// Remove inactive peers and reclaim space - fn clean( + fn num_seeders_leechers(&self) -> (usize, usize) { + let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count(); + let leechers = self.0.len() - seeders; + + (seeders, leechers) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + self.0.push((key, peer)); + } + + fn remove(&mut self, key: &ResponsePeer) -> Option { + for (i, (k, _)) in self.0.iter().enumerate() { + if k == key { + return Some(self.0.remove(i).1); + } + } + + None + } + + fn extract_response_peers( + &self, + max_num_peers_to_take: usize, + peers: &mut Vec>, + ) { + peers.extend(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| k)) + } + + fn clean_and_get_num_peers( &mut self, config: &Config, statistics_sender: &Sender, now: SecondsSinceServerStart, + ) -> usize { + self.0.retain(|(_, peer)| { + let keep = peer.valid_until.valid(now); + + if !keep && config.statistics.peer_clients { + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } + } + + keep + }); + + self.0.len() + } + + fn to_large(&self) -> LargePeerMap { + let (num_seeders, _) = self.num_seeders_leechers(); + let peers = self.0.iter().copied().collect(); + + LargePeerMap { peers, num_seeders } + } +} + +#[derive(Default)] +pub struct LargePeerMap { + peers: IndexMap, Peer>, + num_seeders: usize, +} + +impl LargePeerMap { + fn num_seeders_leechers(&self) -> (usize, usize) { + (self.num_seeders, self.peers.len() - self.num_seeders) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + if peer.is_seeder { + self.num_seeders += 1; + } + + self.peers.insert(key, peer); + } + + fn remove_peer(&mut self, key: &ResponsePeer) -> Option { + let opt_removed_peer = self.peers.remove(key); + + if let Some(Peer { + is_seeder: true, .. + }) = opt_removed_peer + { + self.num_seeders -= 1; + } + + opt_removed_peer + } + + /// Extract response peers + /// + /// If there are more peers in map than `max_num_peers_to_take`, do a random + /// selection of peers from first and second halves of map in order to avoid + /// returning too homogeneous peers. + /// + /// Does NOT filter out announcing peer. + pub fn extract_response_peers( + &self, + rng: &mut impl Rng, + max_num_peers_to_take: usize, + peers: &mut Vec>, ) { + if self.peers.len() <= max_num_peers_to_take { + peers.extend(self.peers.keys()); + } else { + let middle_index = self.peers.len() / 2; + let num_to_take_per_half = max_num_peers_to_take / 2; + + let offset_half_one = { + let from = 0; + let to = usize::max(1, middle_index - num_to_take_per_half); + + rng.gen_range(from..to) + }; + let offset_half_two = { + let from = middle_index; + let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half); + + rng.gen_range(from..to) + }; + + let end_half_one = offset_half_one + num_to_take_per_half; + let end_half_two = offset_half_two + num_to_take_per_half; + + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { + peers.extend(slice.keys()); + } + if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { + peers.extend(slice.keys()); + } + } + } + + fn clean_and_get_num_peers( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) -> usize { self.peers.retain(|_, peer| { let keep = peer.valid_until.valid(now); @@ -305,79 +489,22 @@ impl TorrentData { if !self.peers.is_empty() { self.peers.shrink_to_fit(); } + + self.peers.len() + } + + fn try_shrink(&mut self) -> Option> { + (self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| { + SmallPeerMap(ArrayVec::from_iter( + self.peers.iter().map(|(k, v)| (*k, *v)), + )) + }) } } -impl Default for TorrentData { - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - } - } -} - -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] struct Peer { peer_id: PeerId, is_seeder: bool, valid_until: ValidUntil, } - -/// Extract response peers -/// -/// If there are more peers in map than `max_num_peers_to_take`, do a random -/// selection of peers from first and second halves of map in order to avoid -/// returning too homogeneous peers. -/// -/// Does NOT filter out announcing peer. -#[inline] -pub fn extract_response_peers( - rng: &mut impl Rng, - peer_map: &IndexMap, - max_num_peers_to_take: usize, - peer_conversion_function: F, - peers: &mut Vec, -) where - K: Eq + ::std::hash::Hash, - F: Fn(&K, &V) -> R, -{ - if peer_map.len() <= max_num_peers_to_take { - peers.extend(peer_map.iter().map(|(k, v)| peer_conversion_function(k, v))); - } else { - let middle_index = peer_map.len() / 2; - let num_to_take_per_half = max_num_peers_to_take / 2; - - let offset_half_one = { - let from = 0; - let to = usize::max(1, middle_index - num_to_take_per_half); - - rng.gen_range(from..to) - }; - let offset_half_two = { - let from = middle_index; - let to = usize::max(middle_index + 1, peer_map.len() - num_to_take_per_half); - - rng.gen_range(from..to) - }; - - let end_half_one = offset_half_one + num_to_take_per_half; - let end_half_two = offset_half_two + num_to_take_per_half; - - if let Some(slice) = peer_map.get_range(offset_half_one..end_half_one) { - peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v))); - } - if let Some(slice) = peer_map.get_range(offset_half_two..end_half_two) { - peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v))); - } - } -} - -#[inline(always)] -fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers::new(seeders), - completed: NumberOfDownloads::new(0), // No implementation planned - leechers: NumberOfPeers::new(leechers), - } -}