From 7fa143964efed7add60c157bfa31c3ee221ccfac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 10 Feb 2024 11:40:11 +0100 Subject: [PATCH] udp: remove swarm worker and related logic --- crates/udp/src/common.rs | 141 +----- crates/udp/src/lib.rs | 10 +- crates/udp/src/workers/mod.rs | 1 - crates/udp/src/workers/socket/mod.rs | 4 +- crates/udp/src/workers/socket/storage.rs | 218 --------- crates/udp/src/workers/swarm/mod.rs | 149 ------ crates/udp/src/workers/swarm/storage.rs | 563 ----------------------- 7 files changed, 4 insertions(+), 1082 deletions(-) delete mode 100644 crates/udp/src/workers/socket/storage.rs delete mode 100644 crates/udp/src/workers/swarm/mod.rs delete mode 100644 crates/udp/src/workers/swarm/storage.rs diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index d26d519..7e3f7a3 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -1,13 +1,9 @@ -use std::collections::BTreeMap; -use std::hash::Hash; use std::iter::repeat_with; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use crossbeam_channel::{Receiver, SendError, Sender, TrySendError}; - use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::{CanonicalSocketAddr, ServerStartInstant}; +use aquatic_common::ServerStartInstant; use aquatic_udp_protocol::*; use crossbeam_utils::CachePadded; use hdrhistogram::Histogram; @@ -33,141 +29,6 @@ impl IpVersion { } } -#[derive(Clone, Copy, Debug)] -pub struct SocketWorkerIndex(pub usize); - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct SwarmWorkerIndex(pub usize); - -impl SwarmWorkerIndex { - pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.swarm_workers) - } -} - -#[derive(Debug)] -pub struct PendingScrapeRequest { - pub slab_key: usize, - pub info_hashes: BTreeMap, -} - -#[derive(Debug)] -pub struct PendingScrapeResponse { - pub slab_key: usize, - pub torrent_stats: BTreeMap, -} - -#[derive(Debug)] -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(PendingScrapeRequest), -} - -#[derive(Debug)] -pub enum ConnectedResponse { - AnnounceIpv4(AnnounceResponse), - AnnounceIpv6(AnnounceResponse), - Scrape(PendingScrapeResponse), -} - -pub struct ConnectedRequestSender { - index: SocketWorkerIndex, - senders: Vec>, -} - -impl ConnectedRequestSender { - pub fn new( - index: SocketWorkerIndex, - senders: Vec>, - ) -> Self { - Self { index, senders } - } - - pub fn try_send_to( - &self, - index: SwarmWorkerIndex, - request: ConnectedRequest, - addr: CanonicalSocketAddr, - ) -> Result<(), (SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)> { - match self.senders[index.0].try_send((self.index, request, addr)) { - Ok(()) => Ok(()), - Err(TrySendError::Full(r)) => Err((index, r.1, r.2)), - Err(TrySendError::Disconnected(_)) => { - panic!("Request channel {} is disconnected", index.0); - } - } - } -} - -pub struct ConnectedResponseSender { - senders: Vec>, - to_any_last_index_picked: usize, -} - -impl ConnectedResponseSender { - pub fn new(senders: Vec>) -> Self { - Self { - senders, - to_any_last_index_picked: 0, - } - } - - pub fn try_send_to( - &self, - index: SocketWorkerIndex, - addr: CanonicalSocketAddr, - response: ConnectedResponse, - ) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> { - self.senders[index.0].try_send((addr, response)) - } - - pub fn send_to( - &self, - index: SocketWorkerIndex, - addr: CanonicalSocketAddr, - response: ConnectedResponse, - ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { - self.senders[index.0].send((addr, response)) - } - - pub fn send_to_any( - &mut self, - addr: CanonicalSocketAddr, - response: ConnectedResponse, - ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { - let start = self.to_any_last_index_picked + 1; - - let mut message = Some((addr, response)); - - for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { - match self.senders[i].try_send(message.take().unwrap()) { - Ok(()) => { - self.to_any_last_index_picked = i; - - return Ok(()); - } - Err(TrySendError::Full(msg)) => { - message = Some(msg); - } - Err(TrySendError::Disconnected(_)) => { - panic!("ConnectedResponseReceiver disconnected"); - } - } - } - - let (addr, response) = message.unwrap(); - - self.to_any_last_index_picked = start % self.senders.len(); - self.send_to( - SocketWorkerIndex(self.to_any_last_index_picked), - addr, - response, - ) - } -} - -pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>; - #[derive(Clone)] pub struct Statistics { pub socket: Vec>>, diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index f2a7ec8..6e27fc4 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,13 +3,12 @@ pub mod config; pub mod swarm; pub mod workers; -use std::collections::BTreeMap; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use anyhow::Context; use aquatic_common::WorkerType; -use crossbeam_channel::{bounded, unbounded}; +use crossbeam_channel::unbounded; use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; @@ -18,14 +17,9 @@ use aquatic_common::access_list::update_access_list; use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use common::{ - ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, Statistics, - SwarmWorkerIndex, -}; +use common::{State, Statistics}; use config::Config; -use swarm::TorrentMaps; use workers::socket::ConnectionValidator; -use workers::swarm::SwarmWorker; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/udp/src/workers/mod.rs b/crates/udp/src/workers/mod.rs index 5446a1f..02af829 100644 --- a/crates/udp/src/workers/mod.rs +++ b/crates/udp/src/workers/mod.rs @@ -1,3 +1,2 @@ pub mod socket; pub mod statistics; -pub mod swarm; diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index 2d71fba..ef1adea 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -1,5 +1,4 @@ mod mio; -mod storage; #[cfg(all(target_os = "linux", feature = "io-uring"))] mod uring; mod validator; @@ -11,8 +10,7 @@ use socket2::{Domain, Protocol, Socket, Type}; use crate::{ common::{ - CachePaddedArc, ConnectedRequestSender, ConnectedResponseReceiver, IpVersionStatistics, - SocketWorkerStatistics, State, StatisticsMessage, + CachePaddedArc, IpVersionStatistics, SocketWorkerStatistics, State, StatisticsMessage, }, config::Config, }; diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs deleted file mode 100644 index 84c11a7..0000000 --- a/crates/udp/src/workers/socket/storage.rs +++ /dev/null @@ -1,218 +0,0 @@ -use std::collections::BTreeMap; - -use hashbrown::HashMap; -use slab::Slab; - -use aquatic_common::{SecondsSinceServerStart, ValidUntil}; -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -#[derive(Debug)] -pub struct PendingScrapeResponseSlabEntry { - num_pending: usize, - valid_until: ValidUntil, - torrent_stats: BTreeMap, - transaction_id: TransactionId, -} - -#[derive(Default)] -pub struct PendingScrapeResponseSlab(Slab); - -impl PendingScrapeResponseSlab { - pub fn prepare_split_requests( - &mut self, - config: &Config, - request: ScrapeRequest, - valid_until: ValidUntil, - ) -> impl IntoIterator { - let capacity = config.swarm_workers.min(request.info_hashes.len()); - let mut split_requests: HashMap = - HashMap::with_capacity(capacity); - - if request.info_hashes.is_empty() { - ::log::warn!( - "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" - ); - - return split_requests; - } - - let vacant_entry = self.0.vacant_entry(); - let slab_key = vacant_entry.key(); - - for (i, info_hash) in request.info_hashes.into_iter().enumerate() { - let split_request = split_requests - .entry(SwarmWorkerIndex::from_info_hash(config, info_hash)) - .or_insert_with(|| PendingScrapeRequest { - slab_key, - info_hashes: BTreeMap::new(), - }); - - split_request.info_hashes.insert(i, info_hash); - } - - vacant_entry.insert(PendingScrapeResponseSlabEntry { - num_pending: split_requests.len(), - valid_until, - torrent_stats: Default::default(), - transaction_id: request.transaction_id, - }); - - split_requests - } - - pub fn add_and_get_finished( - &mut self, - response: &PendingScrapeResponse, - ) -> Option { - let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { - entry.num_pending -= 1; - - entry.torrent_stats.extend(response.torrent_stats.iter()); - - entry.num_pending == 0 - } else { - ::log::warn!( - "PendingScrapeResponseSlab.add didn't find entry for key {:?}", - response.slab_key - ); - - false - }; - - if finished { - let entry = self.0.remove(response.slab_key); - - Some(ScrapeResponse { - transaction_id: entry.transaction_id, - torrent_stats: entry.torrent_stats.into_values().collect(), - }) - } else { - None - } - } - - pub fn clean(&mut self, now: SecondsSinceServerStart) { - self.0.retain(|k, v| { - if v.valid_until.valid(now) { - true - } else { - ::log::warn!( - "Unconsumed PendingScrapeResponseSlab entry. {:?}: {:?}", - k, - v - ); - - false - } - }); - - self.0.shrink_to_fit(); - } -} - -#[cfg(test)] -mod tests { - use aquatic_common::ServerStartInstant; - use quickcheck::TestResult; - use quickcheck_macros::quickcheck; - - use super::*; - - #[quickcheck] - fn test_pending_scrape_response_slab( - request_data: Vec<(i32, i64, u8)>, - swarm_workers: u8, - ) -> TestResult { - if swarm_workers == 0 { - return TestResult::discard(); - } - - let config = Config { - swarm_workers: swarm_workers as usize, - ..Default::default() - }; - - let valid_until = ValidUntil::new(ServerStartInstant::new(), 1); - - let mut map = PendingScrapeResponseSlab::default(); - - let mut requests = Vec::new(); - - for (t, c, b) in request_data { - if b == 0 { - return TestResult::discard(); - } - - let mut info_hashes = Vec::new(); - - for i in 0..b { - let info_hash = InfoHash([i; 20]); - - info_hashes.push(info_hash); - } - - let request = ScrapeRequest { - transaction_id: TransactionId::new(t), - connection_id: ConnectionId::new(c), - info_hashes, - }; - - requests.push(request); - } - - let mut all_split_requests = Vec::new(); - - for request in requests.iter() { - let split_requests = - map.prepare_split_requests(&config, request.to_owned(), valid_until); - - all_split_requests.push( - split_requests - .into_iter() - .collect::>(), - ); - } - - assert_eq!(map.0.len(), requests.len()); - - let mut responses = Vec::new(); - - for split_requests in all_split_requests { - for (worker_index, split_request) in split_requests { - assert!(worker_index.0 < swarm_workers as usize); - - let torrent_stats = split_request - .info_hashes - .into_iter() - .map(|(i, info_hash)| { - ( - i, - TorrentScrapeStatistics { - seeders: NumberOfPeers::new((info_hash.0[0]) as i32), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), - }, - ) - }) - .collect(); - - let response = PendingScrapeResponse { - slab_key: split_request.slab_key, - torrent_stats, - }; - - if let Some(response) = map.add_and_get_finished(&response) { - responses.push(response); - } - } - } - - assert!(map.0.is_empty()); - assert_eq!(responses.len(), requests.len()); - - TestResult::from_bool(true) - } -} diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs deleted file mode 100644 index ccdab8a..0000000 --- a/crates/udp/src/workers/swarm/mod.rs +++ /dev/null @@ -1,149 +0,0 @@ -mod storage; - -use std::net::IpAddr; -use std::sync::atomic::Ordering; -use std::time::Duration; -use std::time::Instant; - -use crossbeam_channel::Receiver; -use crossbeam_channel::Sender; -use rand::{rngs::SmallRng, SeedableRng}; - -use aquatic_common::{CanonicalSocketAddr, ValidUntil}; - -use crate::common::*; -use crate::config::Config; - -use storage::TorrentMaps; - -pub struct SwarmWorker { - pub config: Config, - pub state: State, - pub statistics: CachePaddedArc>, - pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - pub response_sender: ConnectedResponseSender, - pub statistics_sender: Sender, - pub worker_index: SwarmWorkerIndex, -} - -impl SwarmWorker { - pub fn run(&mut self) -> anyhow::Result<()> { - let mut torrents = TorrentMaps::default(); - let mut rng = SmallRng::from_entropy(); - - let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new( - self.state.server_start_instant, - self.config.cleaning.max_peer_age, - ); - - let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval); - let statistics_update_interval = Duration::from_secs(self.config.statistics.interval); - - let mut last_cleaning = Instant::now(); - let mut last_statistics_update = Instant::now(); - - let mut iter_counter = 0usize; - - loop { - if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't also do blocking - // sends in socket workers (doing both could cause a deadlock) - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - let response = torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &self.config, - &self.statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); - - // It doesn't matter which socket worker receives announce responses - self.response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - let response = torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &self.config, - &self.statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); - - // It doesn't matter which socket worker receives announce responses - self.response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - let response = torrents.ipv4.scrape(request); - - self.response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - let response = torrents.ipv6.scrape(request); - - self.response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - }; - } - - // Run periodic tasks - if iter_counter % 128 == 0 { - let now = Instant::now(); - - peer_valid_until = ValidUntil::new( - self.state.server_start_instant, - self.config.cleaning.max_peer_age, - ); - - if now > last_cleaning + cleaning_interval { - torrents.clean_and_update_statistics( - &self.config, - &self.state, - &self.statistics, - &self.statistics_sender, - &self.state.access_list, - ); - - last_cleaning = now; - } - if self.config.statistics.active() - && now > last_statistics_update + statistics_update_interval - { - self.statistics - .ipv4 - .torrents - .store(torrents.ipv4.num_torrents(), Ordering::Relaxed); - self.statistics - .ipv6 - .torrents - .store(torrents.ipv6.num_torrents(), Ordering::Relaxed); - - last_statistics_update = now; - } - } - - iter_counter = iter_counter.wrapping_add(1); - } - } -} diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs deleted file mode 100644 index 3b042ea..0000000 --- a/crates/udp/src/workers/swarm/storage.rs +++ /dev/null @@ -1,563 +0,0 @@ -use std::sync::atomic::Ordering; -use std::sync::Arc; - -use aquatic_common::IndexMap; -use aquatic_common::SecondsSinceServerStart; -use aquatic_common::{ - access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, - ValidUntil, -}; - -use aquatic_udp_protocol::*; -use arrayvec::ArrayVec; -use crossbeam_channel::Sender; -use hdrhistogram::Histogram; -use rand::prelude::SmallRng; -use rand::Rng; - -use crate::common::*; -use crate::config::Config; - -const SMALL_PEER_MAP_CAPACITY: usize = 2; - -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl Default for TorrentMaps { - fn default() -> Self { - Self { - ipv4: TorrentMap(Default::default()), - ipv6: TorrentMap(Default::default()), - } - } -} - -impl TorrentMaps { - /// Remove forbidden or inactive torrents, reclaim space and update statistics - pub fn clean_and_update_statistics( - &mut self, - config: &Config, - state: &State, - statistics: &CachePaddedArc>, - statistics_sender: &Sender, - access_list: &Arc, - ) { - let mut cache = create_access_list_cache(access_list); - let mode = config.access_list.mode; - let now = state.server_start_instant.seconds_elapsed(); - - let ipv4 = - self.ipv4 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - let ipv6 = - self.ipv6 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - - if config.statistics.active() { - statistics.ipv4.peers.store(ipv4.0, Ordering::Relaxed); - statistics.ipv6.peers.store(ipv6.0, Ordering::Relaxed); - - if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err); - } - } - if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err); - } - } - } - } -} - -#[derive(Default)] -pub struct TorrentMap(pub IndexMap>); - -impl TorrentMap { - pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse { - let torrent_stats = request - .info_hashes - .into_iter() - .map(|(i, info_hash)| { - let stats = self - .0 - .get(&info_hash) - .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| TorrentScrapeStatistics { - seeders: NumberOfPeers::new(0), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), - }); - - (i, stats) - }) - .collect(); - - PendingScrapeResponse { - slab_key: request.slab_key, - torrent_stats, - } - } - /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - fn clean_and_get_statistics( - &mut self, - config: &Config, - statistics_sender: &Sender, - access_list_cache: &mut AccessListCache, - access_list_mode: AccessListMode, - now: SecondsSinceServerStart, - ) -> (usize, Option>) { - let mut total_num_peers = 0; - - let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms - { - match Histogram::new(3) { - Ok(histogram) => Some(histogram), - Err(err) => { - ::log::error!("Couldn't create peer histogram: {:#}", err); - - None - } - } - } else { - None - }; - - self.0.retain(|info_hash, torrent| { - if !access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - { - return false; - } - - let num_peers = match torrent { - TorrentData::Small(peer_map) => { - peer_map.clean_and_get_num_peers(config, statistics_sender, now) - } - TorrentData::Large(peer_map) => { - let num_peers = - peer_map.clean_and_get_num_peers(config, statistics_sender, now); - - if let Some(peer_map) = peer_map.try_shrink() { - *torrent = TorrentData::Small(peer_map); - } - - num_peers - } - }; - - total_num_peers += num_peers; - - match opt_histogram { - Some(ref mut histogram) if num_peers > 0 => { - let n = num_peers.try_into().expect("Couldn't fit usize into u64"); - - if let Err(err) = histogram.record(n) { - ::log::error!("Couldn't record {} to histogram: {:#}", n, err); - } - } - _ => (), - } - - num_peers > 0 - }); - - self.0.shrink_to_fit(); - - (total_num_peers, opt_histogram) - } - - pub fn num_torrents(&self) -> usize { - self.0.len() - } -} - -pub enum TorrentData { - Small(SmallPeerMap), - Large(LargePeerMap), -} - -impl TorrentData { - pub fn announce( - &mut self, - config: &Config, - statistics_sender: &Sender, - rng: &mut SmallRng, - request: &AnnounceRequest, - ip_address: I, - valid_until: ValidUntil, - ) -> AnnounceResponse { - let max_num_peers_to_take: usize = if request.peers_wanted.0.get() <= 0 { - config.protocol.max_response_peers - } else { - ::std::cmp::min( - config.protocol.max_response_peers, - request.peers_wanted.0.get().try_into().unwrap(), - ) - }; - - let status = - PeerStatus::from_event_and_bytes_left(request.event.into(), request.bytes_left); - - let peer_map_key = ResponsePeer { - ip_address, - port: request.port, - }; - - // Create the response before inserting the peer. This means that we - // don't have to filter it out from the response peers, and that the - // reported number of seeders/leechers will not include it - let (response, opt_removed_peer) = match self { - Self::Small(peer_map) => { - let opt_removed_peer = peer_map.remove(&peer_map_key); - - let (seeders, leechers) = peer_map.num_seeders_leechers(); - - let response = AnnounceResponse { - fixed: AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, - ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), - }, - peers: peer_map.extract_response_peers(max_num_peers_to_take), - }; - - // Convert peer map to large variant if it is full and - // announcing peer is not stopped and will therefore be - // inserted - if peer_map.is_full() && status != PeerStatus::Stopped { - *self = Self::Large(peer_map.to_large()); - } - - (response, opt_removed_peer) - } - Self::Large(peer_map) => { - let opt_removed_peer = peer_map.remove_peer(&peer_map_key); - - let (seeders, leechers) = peer_map.num_seeders_leechers(); - - let response = AnnounceResponse { - fixed: AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, - ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), - }, - peers: peer_map.extract_response_peers(rng, max_num_peers_to_take), - }; - - // Try shrinking the map if announcing peer is stopped and - // will therefore not be inserted - if status == PeerStatus::Stopped { - if let Some(peer_map) = peer_map.try_shrink() { - *self = Self::Small(peer_map); - } - } - - (response, opt_removed_peer) - } - }; - - match status { - PeerStatus::Leeching | PeerStatus::Seeding => { - let peer = Peer { - peer_id: request.peer_id, - is_seeder: status == PeerStatus::Seeding, - valid_until, - }; - - match self { - Self::Small(peer_map) => peer_map.insert(peer_map_key, peer), - Self::Large(peer_map) => peer_map.insert(peer_map_key, peer), - } - - if config.statistics.peer_clients && opt_removed_peer.is_none() { - statistics_sender - .try_send(StatisticsMessage::PeerAdded(request.peer_id)) - .expect("statistics channel should be unbounded"); - } - } - PeerStatus::Stopped => { - if config.statistics.peer_clients && opt_removed_peer.is_some() { - statistics_sender - .try_send(StatisticsMessage::PeerRemoved(request.peer_id)) - .expect("statistics channel should be unbounded"); - } - } - }; - - response - } - - pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { - let (seeders, leechers) = match self { - Self::Small(peer_map) => peer_map.num_seeders_leechers(), - Self::Large(peer_map) => peer_map.num_seeders_leechers(), - }; - - TorrentScrapeStatistics { - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - completed: NumberOfDownloads::new(0), - } - } -} - -impl Default for TorrentData { - fn default() -> Self { - Self::Small(SmallPeerMap(ArrayVec::default())) - } -} - -/// Store torrents with up to two peers without an extra heap allocation -/// -/// On public open trackers, this is likely to be the majority of torrents. -#[derive(Default, Debug)] -pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); - -impl SmallPeerMap { - fn is_full(&self) -> bool { - self.0.is_full() - } - - fn num_seeders_leechers(&self) -> (usize, usize) { - let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count(); - let leechers = self.0.len() - seeders; - - (seeders, leechers) - } - - fn insert(&mut self, key: ResponsePeer, peer: Peer) { - self.0.push((key, peer)); - } - - fn remove(&mut self, key: &ResponsePeer) -> Option { - for (i, (k, _)) in self.0.iter().enumerate() { - if k == key { - return Some(self.0.remove(i).1); - } - } - - None - } - - fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec> { - Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k)) - } - - fn clean_and_get_num_peers( - &mut self, - config: &Config, - statistics_sender: &Sender, - now: SecondsSinceServerStart, - ) -> usize { - self.0.retain(|(_, peer)| { - let keep = peer.valid_until.valid(now); - - if !keep - && config.statistics.peer_clients - && statistics_sender - .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - .is_err() - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); - } - - keep - }); - - self.0.len() - } - - fn to_large(&self) -> LargePeerMap { - let (num_seeders, _) = self.num_seeders_leechers(); - let peers = self.0.iter().copied().collect(); - - LargePeerMap { peers, num_seeders } - } -} - -#[derive(Default)] -pub struct LargePeerMap { - peers: IndexMap, Peer>, - num_seeders: usize, -} - -impl LargePeerMap { - fn num_seeders_leechers(&self) -> (usize, usize) { - (self.num_seeders, self.peers.len() - self.num_seeders) - } - - fn insert(&mut self, key: ResponsePeer, peer: Peer) { - if peer.is_seeder { - self.num_seeders += 1; - } - - self.peers.insert(key, peer); - } - - fn remove_peer(&mut self, key: &ResponsePeer) -> Option { - let opt_removed_peer = self.peers.swap_remove(key); - - if let Some(Peer { - is_seeder: true, .. - }) = opt_removed_peer - { - self.num_seeders -= 1; - } - - opt_removed_peer - } - - /// Extract response peers - /// - /// If there are more peers in map than `max_num_peers_to_take`, do a random - /// selection of peers from first and second halves of map in order to avoid - /// returning too homogeneous peers. - /// - /// Does NOT filter out announcing peer. - pub fn extract_response_peers( - &self, - rng: &mut impl Rng, - max_num_peers_to_take: usize, - ) -> Vec> { - if self.peers.len() <= max_num_peers_to_take { - self.peers.keys().copied().collect() - } else { - let middle_index = self.peers.len() / 2; - let num_to_take_per_half = max_num_peers_to_take / 2; - - let offset_half_one = { - let from = 0; - let to = usize::max(1, middle_index - num_to_take_per_half); - - rng.gen_range(from..to) - }; - let offset_half_two = { - let from = middle_index; - let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half); - - rng.gen_range(from..to) - }; - - let end_half_one = offset_half_one + num_to_take_per_half; - let end_half_two = offset_half_two + num_to_take_per_half; - - let mut peers = Vec::with_capacity(max_num_peers_to_take); - - if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { - peers.extend(slice.keys()); - } - if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { - peers.extend(slice.keys()); - } - - peers - } - } - - fn clean_and_get_num_peers( - &mut self, - config: &Config, - statistics_sender: &Sender, - now: SecondsSinceServerStart, - ) -> usize { - self.peers.retain(|_, peer| { - let keep = peer.valid_until.valid(now); - - if !keep { - if peer.is_seeder { - self.num_seeders -= 1; - } - if config.statistics.peer_clients - && statistics_sender - .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - .is_err() - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); - } - } - - keep - }); - - if !self.peers.is_empty() { - self.peers.shrink_to_fit(); - } - - self.peers.len() - } - - fn try_shrink(&mut self) -> Option> { - (self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| { - SmallPeerMap(ArrayVec::from_iter( - self.peers.iter().map(|(k, v)| (*k, *v)), - )) - }) - } -} - -#[derive(Clone, Copy, Debug)] -struct Peer { - peer_id: PeerId, - is_seeder: bool, - valid_until: ValidUntil, -} - -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] -pub enum PeerStatus { - Seeding, - Leeching, - Stopped, -} - -impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: NumberOfBytes) -> Self { - if event == AnnounceEvent::Stopped { - Self::Stopped - } else if bytes_left.0.get() == 0 { - Self::Seeding - } else { - Self::Leeching - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_peer_status_from_event_and_bytes_left() { - use PeerStatus::*; - - let f = PeerStatus::from_event_and_bytes_left; - - assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(0))); - assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(1))); - - assert_eq!(Seeding, f(AnnounceEvent::Started, NumberOfBytes::new(0))); - assert_eq!(Leeching, f(AnnounceEvent::Started, NumberOfBytes::new(1))); - - assert_eq!(Seeding, f(AnnounceEvent::Completed, NumberOfBytes::new(0))); - assert_eq!(Leeching, f(AnnounceEvent::Completed, NumberOfBytes::new(1))); - - assert_eq!(Seeding, f(AnnounceEvent::None, NumberOfBytes::new(0))); - assert_eq!(Leeching, f(AnnounceEvent::None, NumberOfBytes::new(1))); - } -}