From cb39eb69c8e9e9cdcb61438784e12a30eeb63527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 10 Dec 2023 12:13:56 +0100 Subject: [PATCH] udp: reorder code in swarm storage --- crates/udp/src/workers/swarm/storage.rs | 287 ++++++++++++------------ 1 file changed, 143 insertions(+), 144 deletions(-) diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index acc83bd..00452ec 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -18,27 +18,146 @@ use rand::Rng; use crate::common::*; use crate::config::Config; -#[derive(Clone, Debug)] -struct Peer { - ip_address: I, - port: Port, - is_seeder: bool, - valid_until: ValidUntil, +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, } -impl Peer { - fn to_response_peer(_: &PeerId, peer: &Self) -> ResponsePeer { - ResponsePeer { - ip_address: peer.ip_address, - port: peer.port, +impl Default for TorrentMaps { + fn default() -> Self { + Self { + ipv4: TorrentMap(Default::default()), + ipv6: TorrentMap(Default::default()), } } } -type PeerMap = IndexMap>; +impl TorrentMaps { + /// Remove forbidden or inactive torrents, reclaim space and update statistics + pub fn clean_and_update_statistics( + &mut self, + config: &Config, + state: &State, + statistics_sender: &Sender, + access_list: &Arc, + server_start_instant: ServerStartInstant, + worker_index: SwarmWorkerIndex, + ) { + let mut cache = create_access_list_cache(access_list); + let mode = config.access_list.mode; + let now = server_start_instant.seconds_elapsed(); + + let ipv4 = + self.ipv4 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let ipv6 = + self.ipv6 + .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + + if config.statistics.active() { + state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); + + if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Err(err) = statistics_sender.try_send(message) { + ::log::error!("couldn't send statistics message: {:#}", err); + } + } + } + } +} + +#[derive(Default)] +pub struct TorrentMap(pub IndexMap>); + +impl TorrentMap { + pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) { + response.slab_key = request.slab_key; + + let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| { + let stats = self + .0 + .get(&info_hash) + .map(|torrent_data| torrent_data.scrape_statistics()) + .unwrap_or_else(|| create_torrent_scrape_statistics(0, 0)); + + (i, stats) + }); + + response.torrent_stats.extend(torrent_stats); + } + /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers + fn clean_and_get_statistics( + &mut self, + config: &Config, + statistics_sender: &Sender, + access_list_cache: &mut AccessListCache, + access_list_mode: AccessListMode, + now: SecondsSinceServerStart, + ) -> (usize, Option>) { + let mut num_peers = 0; + + let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms + { + match Histogram::new(3) { + Ok(histogram) => Some(histogram), + Err(err) => { + ::log::error!("Couldn't create peer histogram: {:#}", err); + + None + } + } + } else { + None + }; + + self.0.retain(|info_hash, torrent| { + if !access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + { + return false; + } + + torrent.clean(config, statistics_sender, now); + + num_peers += torrent.peers.len(); + + match opt_histogram { + Some(ref mut histogram) if torrent.peers.len() != 0 => { + let n = torrent + .peers + .len() + .try_into() + .expect("Couldn't fit usize into u64"); + + if let Err(err) = histogram.record(n) { + ::log::error!("Couldn't record {} to histogram: {:#}", n, err); + } + } + _ => (), + } + + !torrent.peers.is_empty() + }); + + self.0.shrink_to_fit(); + + (num_peers, opt_histogram) + } + + pub fn num_torrents(&self) -> usize { + self.0.len() + } +} pub struct TorrentData { - peers: PeerMap, + peers: IndexMap>, num_seeders: usize, } @@ -195,143 +314,23 @@ impl Default for TorrentData { } } -#[derive(Default)] -pub struct TorrentMap(pub IndexMap>); - -impl TorrentMap { - pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) { - response.slab_key = request.slab_key; - - let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| { - let stats = self - .0 - .get(&info_hash) - .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| create_torrent_scrape_statistics(0, 0)); - - (i, stats) - }); - - response.torrent_stats.extend(torrent_stats); - } - /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers - fn clean_and_get_statistics( - &mut self, - config: &Config, - statistics_sender: &Sender, - access_list_cache: &mut AccessListCache, - access_list_mode: AccessListMode, - now: SecondsSinceServerStart, - ) -> (usize, Option>) { - let mut num_peers = 0; - - let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms - { - match Histogram::new(3) { - Ok(histogram) => Some(histogram), - Err(err) => { - ::log::error!("Couldn't create peer histogram: {:#}", err); - - None - } - } - } else { - None - }; - - self.0.retain(|info_hash, torrent| { - if !access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - { - return false; - } - - torrent.clean(config, statistics_sender, now); - - num_peers += torrent.peers.len(); - - match opt_histogram { - Some(ref mut histogram) if torrent.peers.len() != 0 => { - let n = torrent - .peers - .len() - .try_into() - .expect("Couldn't fit usize into u64"); - - if let Err(err) = histogram.record(n) { - ::log::error!("Couldn't record {} to histogram: {:#}", n, err); - } - } - _ => (), - } - - !torrent.peers.is_empty() - }); - - self.0.shrink_to_fit(); - - (num_peers, opt_histogram) - } - - pub fn num_torrents(&self) -> usize { - self.0.len() - } +#[derive(Clone, Debug)] +struct Peer { + ip_address: I, + port: Port, + is_seeder: bool, + valid_until: ValidUntil, } -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl Default for TorrentMaps { - fn default() -> Self { - Self { - ipv4: TorrentMap(Default::default()), - ipv6: TorrentMap(Default::default()), +impl Peer { + fn to_response_peer(_: &PeerId, peer: &Self) -> ResponsePeer { + ResponsePeer { + ip_address: peer.ip_address, + port: peer.port, } } } -impl TorrentMaps { - /// Remove forbidden or inactive torrents, reclaim space and update statistics - pub fn clean_and_update_statistics( - &mut self, - config: &Config, - state: &State, - statistics_sender: &Sender, - access_list: &Arc, - server_start_instant: ServerStartInstant, - worker_index: SwarmWorkerIndex, - ) { - let mut cache = create_access_list_cache(access_list); - let mode = config.access_list.mode; - let now = server_start_instant.seconds_elapsed(); - - let ipv4 = - self.ipv4 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - let ipv6 = - self.ipv6 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - - if config.statistics.active() { - state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release); - - if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err); - } - } - if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err); - } - } - } - } -} /// Extract response peers /// /// If there are more peers in map than `max_num_peers_to_take`, do a random