From 53497308f1932783647ee1b45e10a7be9dc21fae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 10 Feb 2024 10:01:45 +0100 Subject: [PATCH] udp: create file with thread-shared torrent map implementation --- Cargo.lock | 33 ++ crates/udp/Cargo.toml | 1 + crates/udp/src/lib.rs | 1 + crates/udp/src/swarm.rs | 700 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 735 insertions(+) create mode 100644 crates/udp/src/swarm.rs diff --git a/Cargo.lock b/Cargo.lock index bc446f5..b82edb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -315,6 +315,7 @@ dependencies = [ "mimalloc", "mio", "num-format", + "parking_lot", "quickcheck", "quickcheck_macros", "rand", @@ -2001,6 +2002,29 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.48.5", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -2269,6 +2293,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "ref-cast" version = "1.0.22" diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 72c1f92..47cfa06 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -48,6 +48,7 @@ log = "0.4" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } num-format = "0.4" +parking_lot = "0.12" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index cf6a588..49c8aa1 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -1,5 +1,6 @@ pub mod common; pub mod config; +pub mod swarm; pub mod workers; use std::collections::BTreeMap; diff --git a/crates/udp/src/swarm.rs b/crates/udp/src/swarm.rs new file mode 100644 index 0000000..1c671e1 --- /dev/null +++ b/crates/udp/src/swarm.rs @@ -0,0 +1,700 @@ +use std::iter::repeat_with; +use std::net::IpAddr; +use std::ops::DerefMut; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use aquatic_common::SecondsSinceServerStart; +use aquatic_common::{ + access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, + ValidUntil, +}; +use aquatic_common::{CanonicalSocketAddr, IndexMap}; + +use aquatic_udp_protocol::*; +use arrayvec::ArrayVec; +use crossbeam_channel::Sender; +use hdrhistogram::Histogram; +use rand::prelude::SmallRng; +use rand::Rng; + +use crate::common::*; +use crate::config::Config; + +const SMALL_PEER_MAP_CAPACITY: usize = 2; + +use aquatic_udp_protocol::InfoHash; +use parking_lot::RwLock; + +type TorrentMapShard = IndexMap>>; + +#[derive(Clone)] +pub struct TorrentMaps { + ipv4: TorrentMapShards, + ipv6: TorrentMapShards, +} + +impl TorrentMaps { + pub fn new(config: &Config) -> Self { + let num_shards = 128usize; + + Self { + ipv4: TorrentMapShards::new(num_shards), + ipv6: TorrentMapShards::new(num_shards), + } + } + + pub fn announce( + &self, + config: &Config, + statistics_sender: &Sender, + rng: &mut SmallRng, + request: &AnnounceRequest, + ip_address: CanonicalSocketAddr, + valid_until: ValidUntil, + ) -> Response { + match ip_address.get().ip() { + IpAddr::V4(ip_address) => Response::AnnounceIpv4(self.ipv4.announce( + config, + statistics_sender, + rng, + request, + ip_address.into(), + valid_until, + )), + IpAddr::V6(ip_address) => Response::AnnounceIpv6(self.ipv6.announce( + config, + statistics_sender, + rng, + request, + ip_address.into(), + valid_until, + )), + } + } + + pub fn scrape(&self, ip_addr: CanonicalSocketAddr, request: ScrapeRequest) -> ScrapeResponse { + if ip_addr.is_ipv4() { + self.ipv4.scrape(request) + } else { + self.ipv6.scrape(request) + } + } + /// Remove forbidden or inactive torrents, reclaim space and update statistics + pub fn clean_and_update_statistics( + &mut self, + config: &Config, + state: &State, + statistics: &CachePaddedArc>, + statistics_sender: &Sender, + access_list: &Arc, + ) { + let mut cache = create_access_list_cache(access_list); + let mode = config.access_list.mode; + let now = state.server_start_instant.seconds_elapsed(); + + let ipv4 = + self.ipv4 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let ipv6 = + self.ipv6 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + + if config.statistics.active() { + statistics.ipv4.peers.store(ipv4.0, Ordering::Relaxed); + statistics.ipv6.peers.store(ipv6.0, Ordering::Relaxed); + + if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + } + } +} + +#[derive(Clone)] +pub struct TorrentMapShards(Arc<[RwLock>]>); + +impl TorrentMapShards { + fn new(num_shards: usize) -> Self { + Self( + repeat_with(Default::default) + .take(num_shards) + .collect::>() + .into_boxed_slice() + .into(), + ) + } + + fn announce( + &self, + config: &Config, + statistics_sender: &Sender, + rng: &mut SmallRng, + request: &AnnounceRequest, + ip_address: I, + valid_until: ValidUntil, + ) -> AnnounceResponse { + let torrent_map_shard = self.get_shard(&request.info_hash); + + // Clone Arc here to avoid keeping lock on whole shard + let torrent_data = + if let Some(torrent_data) = torrent_map_shard.read().get(&request.info_hash) { + torrent_data.clone() + } else { + // Don't overwrite entry if created in the meantime + torrent_map_shard + .write() + .entry(request.info_hash) + .or_default() + .clone() + }; + + let mut peer_map = torrent_data.peer_map.write(); + + peer_map.announce( + config, + statistics_sender, + rng, + request, + ip_address, + valid_until, + ) + } + + fn scrape(&self, request: ScrapeRequest) -> ScrapeResponse { + let mut response = ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: Vec::with_capacity(request.info_hashes.len()), + }; + + for info_hash in request.info_hashes { + let torrent_map_shard = self.get_shard(&info_hash); + + let statistics = if let Some(torrent_data) = torrent_map_shard.read().get(&info_hash) { + torrent_data.peer_map.read().scrape_statistics() + } else { + TorrentScrapeStatistics { + seeders: NumberOfPeers::new(0), + leechers: NumberOfPeers::new(0), + completed: NumberOfDownloads::new(0), + } + }; + + response.torrent_stats.push(statistics); + } + + response + } + + fn clean_and_get_statistics( + &mut self, + config: &Config, + statistics_sender: &Sender, + access_list_cache: &mut AccessListCache, + access_list_mode: AccessListMode, + now: SecondsSinceServerStart, + ) -> (usize, Option>) { + let mut total_num_peers = 0; + + let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms + { + match Histogram::new(3) { + Ok(histogram) => Some(histogram), + Err(err) => { + ::log::error!("Couldn't create peer histogram: {:#}", err); + + None + } + } + } else { + None + }; + + for torrent_map_shard in self.0.iter() { + for torrent_data in torrent_map_shard.read().values() { + let mut peer_map = torrent_data.peer_map.write(); + + let num_peers = match peer_map.deref_mut() { + PeerMap::Small(small_peer_map) => { + small_peer_map.clean_and_get_num_peers(config, statistics_sender, now) + } + PeerMap::Large(large_peer_map) => { + let num_peers = + large_peer_map.clean_and_get_num_peers(config, statistics_sender, now); + + if let Some(small_peer_map) = large_peer_map.try_shrink() { + *peer_map = PeerMap::Small(small_peer_map); + } + + num_peers + } + }; + + drop(peer_map); + + match opt_histogram { + Some(ref mut histogram) if num_peers > 0 => { + let n = num_peers.try_into().expect("Couldn't fit usize into u64"); + + if let Err(err) = histogram.record(n) { + ::log::error!("Couldn't record {} to histogram: {:#}", n, err); + } + } + _ => (), + } + + torrent_data + .pending_removal + .store(num_peers == 0, Ordering::Release); + + total_num_peers += num_peers; + } + + let mut torrent_map_shard = torrent_map_shard.write(); + + torrent_map_shard.retain(|info_hash, torrent_data| { + if !access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + { + return false; + } + + // Only remove if no peers have been added since previous + // cleaning step + if torrent_data + .pending_removal + .fetch_and(false, Ordering::Acquire) + && torrent_data.peer_map.read().is_empty() + { + return false; + } + + true + }); + + torrent_map_shard.shrink_to_fit(); + } + + (total_num_peers, opt_histogram) + } + + fn get_shard(&self, info_hash: &InfoHash) -> &RwLock> { + self.0.get(info_hash.0[0] as usize % self.0.len()).unwrap() + } +} + +pub struct TorrentData { + peer_map: RwLock>, + pending_removal: AtomicBool, +} + +impl Default for TorrentData { + fn default() -> Self { + Self { + peer_map: Default::default(), + pending_removal: Default::default(), + } + } +} + +pub enum PeerMap { + Small(SmallPeerMap), + Large(LargePeerMap), +} + +impl PeerMap { + fn announce( + &mut self, + config: &Config, + statistics_sender: &Sender, + rng: &mut SmallRng, + request: &AnnounceRequest, + ip_address: I, + valid_until: ValidUntil, + ) -> AnnounceResponse { + let max_num_peers_to_take: usize = if request.peers_wanted.0.get() <= 0 { + config.protocol.max_response_peers + } else { + ::std::cmp::min( + config.protocol.max_response_peers, + request.peers_wanted.0.get().try_into().unwrap(), + ) + }; + + let status = + PeerStatus::from_event_and_bytes_left(request.event.into(), request.bytes_left); + + let peer_map_key = ResponsePeer { + ip_address, + port: request.port, + }; + + // Create the response before inserting the peer. This means that we + // don't have to filter it out from the response peers, and that the + // reported number of seeders/leechers will not include it + let (response, opt_removed_peer) = match self { + Self::Small(peer_map) => { + let opt_removed_peer = peer_map.remove(&peer_map_key); + + let (seeders, leechers) = peer_map.num_seeders_leechers(); + + let response = AnnounceResponse { + fixed: AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }, + peers: peer_map.extract_response_peers(max_num_peers_to_take), + }; + + // Convert peer map to large variant if it is full and + // announcing peer is not stopped and will therefore be + // inserted + if peer_map.is_full() && status != PeerStatus::Stopped { + *self = Self::Large(peer_map.to_large()); + } + + (response, opt_removed_peer) + } + Self::Large(peer_map) => { + let opt_removed_peer = peer_map.remove_peer(&peer_map_key); + + let (seeders, leechers) = peer_map.num_seeders_leechers(); + + let response = AnnounceResponse { + fixed: AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }, + peers: peer_map.extract_response_peers(rng, max_num_peers_to_take), + }; + + // Try shrinking the map if announcing peer is stopped and + // will therefore not be inserted + if status == PeerStatus::Stopped { + if let Some(peer_map) = peer_map.try_shrink() { + *self = Self::Small(peer_map); + } + } + + (response, opt_removed_peer) + } + }; + + match status { + PeerStatus::Leeching | PeerStatus::Seeding => { + let peer = Peer { + peer_id: request.peer_id, + is_seeder: status == PeerStatus::Seeding, + valid_until, + }; + + match self { + Self::Small(peer_map) => peer_map.insert(peer_map_key, peer), + Self::Large(peer_map) => peer_map.insert(peer_map_key, peer), + } + + if config.statistics.peer_clients && opt_removed_peer.is_none() { + statistics_sender + .try_send(StatisticsMessage::PeerAdded(request.peer_id)) + .expect("statistics channel should be unbounded"); + } + } + PeerStatus::Stopped => { + if config.statistics.peer_clients && opt_removed_peer.is_some() { + statistics_sender + .try_send(StatisticsMessage::PeerRemoved(request.peer_id)) + .expect("statistics channel should be unbounded"); + } + } + }; + + response + } + + fn scrape_statistics(&self) -> TorrentScrapeStatistics { + let (seeders, leechers) = match self { + Self::Small(peer_map) => peer_map.num_seeders_leechers(), + Self::Large(peer_map) => peer_map.num_seeders_leechers(), + }; + + TorrentScrapeStatistics { + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + completed: NumberOfDownloads::new(0), + } + } + + fn is_empty(&self) -> bool { + match self { + Self::Small(peer_map) => peer_map.0.is_empty(), + Self::Large(peer_map) => peer_map.peers.is_empty(), + } + } +} + +impl Default for PeerMap { + fn default() -> Self { + Self::Small(SmallPeerMap(ArrayVec::default())) + } +} + +/// Store torrents with up to two peers without an extra heap allocation +/// +/// On public open trackers, this is likely to be the majority of torrents. +#[derive(Default, Debug)] +pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); + +impl SmallPeerMap { + fn is_full(&self) -> bool { + self.0.is_full() + } + + fn num_seeders_leechers(&self) -> (usize, usize) { + let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count(); + let leechers = self.0.len() - seeders; + + (seeders, leechers) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + self.0.push((key, peer)); + } + + fn remove(&mut self, key: &ResponsePeer) -> Option { + for (i, (k, _)) in self.0.iter().enumerate() { + if k == key { + return Some(self.0.remove(i).1); + } + } + + None + } + + fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec> { + Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k)) + } + + fn clean_and_get_num_peers( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) -> usize { + self.0.retain(|(_, peer)| { + let keep = peer.valid_until.valid(now); + + if !keep + && config.statistics.peer_clients + && statistics_sender + .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) + .is_err() + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } + + keep + }); + + self.0.len() + } + + fn to_large(&self) -> LargePeerMap { + let (num_seeders, _) = self.num_seeders_leechers(); + let peers = self.0.iter().copied().collect(); + + LargePeerMap { peers, num_seeders } + } +} + +#[derive(Default)] +pub struct LargePeerMap { + peers: IndexMap, Peer>, + num_seeders: usize, +} + +impl LargePeerMap { + fn num_seeders_leechers(&self) -> (usize, usize) { + (self.num_seeders, self.peers.len() - self.num_seeders) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + if peer.is_seeder { + self.num_seeders += 1; + } + + self.peers.insert(key, peer); + } + + fn remove_peer(&mut self, key: &ResponsePeer) -> Option { + let opt_removed_peer = self.peers.swap_remove(key); + + if let Some(Peer { + is_seeder: true, .. + }) = opt_removed_peer + { + self.num_seeders -= 1; + } + + opt_removed_peer + } + + /// Extract response peers + /// + /// If there are more peers in map than `max_num_peers_to_take`, do a random + /// selection of peers from first and second halves of map in order to avoid + /// returning too homogeneous peers. + /// + /// Does NOT filter out announcing peer. + fn extract_response_peers( + &self, + rng: &mut impl Rng, + max_num_peers_to_take: usize, + ) -> Vec> { + if self.peers.len() <= max_num_peers_to_take { + self.peers.keys().copied().collect() + } else { + let middle_index = self.peers.len() / 2; + let num_to_take_per_half = max_num_peers_to_take / 2; + + let offset_half_one = { + let from = 0; + let to = usize::max(1, middle_index - num_to_take_per_half); + + rng.gen_range(from..to) + }; + let offset_half_two = { + let from = middle_index; + let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half); + + rng.gen_range(from..to) + }; + + let end_half_one = offset_half_one + num_to_take_per_half; + let end_half_two = offset_half_two + num_to_take_per_half; + + let mut peers = Vec::with_capacity(max_num_peers_to_take); + + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { + peers.extend(slice.keys()); + } + if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { + peers.extend(slice.keys()); + } + + peers + } + } + + fn clean_and_get_num_peers( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) -> usize { + self.peers.retain(|_, peer| { + let keep = peer.valid_until.valid(now); + + if !keep { + if peer.is_seeder { + self.num_seeders -= 1; + } + if config.statistics.peer_clients + && statistics_sender + .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) + .is_err() + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } + } + + keep + }); + + if !self.peers.is_empty() { + self.peers.shrink_to_fit(); + } + + self.peers.len() + } + + fn try_shrink(&mut self) -> Option> { + (self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| { + SmallPeerMap(ArrayVec::from_iter( + self.peers.iter().map(|(k, v)| (*k, *v)), + )) + }) + } +} + +#[derive(Clone, Copy, Debug)] +struct Peer { + peer_id: PeerId, + is_seeder: bool, + valid_until: ValidUntil, +} + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: NumberOfBytes) -> Self { + if event == AnnounceEvent::Stopped { + Self::Stopped + } else if bytes_left.0.get() == 0 { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_peer_status_from_event_and_bytes_left() { + use PeerStatus::*; + + let f = PeerStatus::from_event_and_bytes_left; + + assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(0))); + assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(1))); + + assert_eq!(Seeding, f(AnnounceEvent::Started, NumberOfBytes::new(0))); + assert_eq!(Leeching, f(AnnounceEvent::Started, NumberOfBytes::new(1))); + + assert_eq!(Seeding, f(AnnounceEvent::Completed, NumberOfBytes::new(0))); + assert_eq!(Leeching, f(AnnounceEvent::Completed, NumberOfBytes::new(1))); + + assert_eq!(Seeding, f(AnnounceEvent::None, NumberOfBytes::new(0))); + assert_eq!(Leeching, f(AnnounceEvent::None, NumberOfBytes::new(1))); + } +}