diff --git a/aquatic_udp/src/workers/request/mod.rs b/aquatic_udp/src/workers/request/mod.rs index b45b8be..4b47f4a 100644 --- a/aquatic_udp/src/workers/request/mod.rs +++ b/aquatic_udp/src/workers/request/mod.rs @@ -69,22 +69,18 @@ pub fn run_request_worker( response_sender.try_send_to(sender_index, response, src); } + // Run periodic tasks if iter_counter % 128 == 0 { let now = Instant::now(); peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - torrents.clean(&config, &state.access_list); + let (ipv4, ipv6) = torrents.clean_and_get_num_peers(&config, &state.access_list); if config.statistics.active() { - let peers_ipv4 = torrents.ipv4.values().map(|t| t.peers.len()).sum(); - let peers_ipv6 = torrents.ipv6.values().map(|t| t.peers.len()).sum(); - - state.statistics_ipv4.peers[worker_index.0] - .store(peers_ipv4, Ordering::Release); - state.statistics_ipv6.peers[worker_index.0] - .store(peers_ipv6, Ordering::Release); + state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release); + state.statistics_ipv6.peers[worker_index.0].store(ipv6, Ordering::Release); } last_cleaning = now; @@ -93,9 +89,9 @@ pub fn run_request_worker( && now > last_statistics_update + statistics_update_interval { state.statistics_ipv4.torrents[worker_index.0] - .store(torrents.ipv4.len(), Ordering::Release); + .store(torrents.ipv4.num_torrents(), Ordering::Release); state.statistics_ipv6.torrents[worker_index.0] - .store(torrents.ipv6.len(), Ordering::Release); + .store(torrents.ipv6.num_torrents(), Ordering::Release); last_statistics_update = now; } @@ -122,7 +118,7 @@ fn handle_announce_request( valid_until: peer_valid_until, }; - let torrent_data = torrents.entry(request.info_hash).or_default(); + let torrent_data = torrents.0.entry(request.info_hash).or_default(); let opt_removed_peer = match peer_status { PeerStatus::Leeching => { @@ -190,7 +186,7 @@ fn handle_scrape_request( if src.is_ipv4() { torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { - let s = if let Some(torrent_data) = torrents.ipv4.get(&info_hash) { + let s = if let Some(torrent_data) = torrents.ipv4.0.get(&info_hash) { create_torrent_scrape_statistics( torrent_data.num_seeders as i32, torrent_data.num_leechers as i32, @@ -203,7 +199,7 @@ fn handle_scrape_request( })); } else { torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { - let s = if let Some(torrent_data) = torrents.ipv6.get(&info_hash) { + let s = if let Some(torrent_data) = torrents.ipv6.0.get(&info_hash) { create_torrent_scrape_statistics( torrent_data.num_seeders as i32, torrent_data.num_leechers as i32, diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/request/storage.rs index b5eaafd..8280be7 100644 --- a/aquatic_udp/src/workers/request/storage.rs +++ b/aquatic_udp/src/workers/request/storage.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Instant; use aquatic_common::{ - access_list::{create_access_list_cache, AccessListArcSwap}, + access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, AmortizedIndexMap, ValidUntil, }; @@ -39,7 +39,8 @@ pub struct TorrentData { } impl TorrentData { - fn clean_and_check_if_has_peers(&mut self, now: Instant) -> bool { + /// Remove inactive peers and reclaim space + fn clean(&mut self, now: Instant) { self.peers.retain(|_, peer| { if peer.valid_until.0 > now { true @@ -58,12 +59,8 @@ impl TorrentData { } }); - if self.peers.is_empty() { - false - } else { + if !self.peers.is_empty() { self.peers.shrink_to_fit(); - - true } } } @@ -78,36 +75,72 @@ impl Default for TorrentData { } } -pub type TorrentMap = AmortizedIndexMap>; - #[derive(Default)] +pub struct TorrentMap(pub AmortizedIndexMap>); + +impl TorrentMap { + /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers + fn clean_and_get_num_peers( + &mut self, + access_list_cache: &mut AccessListCache, + access_list_mode: AccessListMode, + now: Instant, + ) -> usize { + let mut num_peers = 0; + + self.0.retain(|info_hash, torrent| { + if !access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + { + return false; + } + + torrent.clean(now); + + num_peers += torrent.peers.len(); + + !torrent.peers.is_empty() + }); + + self.0.shrink_to_fit(); + + num_peers + } + + pub fn num_torrents(&self) -> usize { + self.0.len() + } +} + pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, } -impl TorrentMaps { - /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let now = Instant::now(); - let access_list_mode = config.access_list.mode; - - let mut access_list_cache = create_access_list_cache(access_list); - - self.ipv4.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && torrent.clean_and_check_if_has_peers(now) - }); - self.ipv4.shrink_to_fit(); - - self.ipv6.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && torrent.clean_and_check_if_has_peers(now) - }); - self.ipv6.shrink_to_fit(); +impl Default for TorrentMaps { + fn default() -> Self { + Self { + ipv4: TorrentMap(Default::default()), + ipv6: TorrentMap(Default::default()), + } + } +} + +impl TorrentMaps { + /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers + pub fn clean_and_get_num_peers( + &mut self, + config: &Config, + access_list: &Arc, + ) -> (usize, usize) { + let mut cache = create_access_list_cache(access_list); + let mode = config.access_list.mode; + let now = Instant::now(); + + let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now); + let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now); + + (ipv4, ipv6) } }