diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request/mod.rs similarity index 78% rename from aquatic_udp/src/workers/request.rs rename to aquatic_udp/src/workers/request/mod.rs index d6041f9..052ca4c 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request/mod.rs @@ -1,15 +1,11 @@ +mod storage; + use std::collections::BTreeMap; use std::net::IpAddr; -use std::net::Ipv4Addr; -use std::net::Ipv6Addr; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use aquatic_common::access_list::create_access_list_cache; -use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::AmortizedIndexMap; use aquatic_common::CanonicalSocketAddr; use aquatic_common::PanicSentinel; use aquatic_common::ValidUntil; @@ -23,107 +19,7 @@ use aquatic_udp_protocol::*; use crate::common::*; use crate::config::Config; -#[derive(Clone, Debug)] -struct Peer { - pub ip_address: I, - pub port: Port, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -impl Peer { - fn to_response_peer(&self) -> ResponsePeer { - ResponsePeer { - ip_address: self.ip_address, - port: self.port, - } - } -} - -type PeerMap = AmortizedIndexMap>; - -struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -type TorrentMap = AmortizedIndexMap>; - -#[derive(Default)] -struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let now = Instant::now(); - let access_list_mode = config.access_list.mode; - - let mut access_list_cache = create_access_list_cache(access_list); - - self.ipv4.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && Self::clean_torrent_and_peers(now, torrent) - }); - self.ipv4.shrink_to_fit(); - - self.ipv6.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && Self::clean_torrent_and_peers(now, torrent) - }); - self.ipv6.shrink_to_fit(); - } - - /// Returns true if torrent is to be kept - #[inline] - fn clean_torrent_and_peers(now: Instant, torrent: &mut TorrentData) -> bool { - let num_seeders = &mut torrent.num_seeders; - let num_leechers = &mut torrent.num_leechers; - - torrent.peers.retain(|_, peer| { - if peer.valid_until.0 > now { - true - } else { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - - false - } - }); - - if torrent.peers.is_empty() { - false - } else { - torrent.peers.shrink_to_fit(); - - true - } - } -} +use storage::{Peer, TorrentMap, TorrentMaps}; pub fn run_request_worker( _sentinel: PanicSentinel, @@ -388,7 +284,7 @@ mod tests { let gen_num_peers = data.0 as u32; let req_num_peers = data.1 as usize; - let mut peer_map: PeerMap = Default::default(); + let mut peer_map: storage::PeerMap = Default::default(); let mut opt_sender_key = None; let mut opt_sender_peer = None; diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/request/storage.rs new file mode 100644 index 0000000..e558ad7 --- /dev/null +++ b/aquatic_udp/src/workers/request/storage.rs @@ -0,0 +1,116 @@ +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; +use std::sync::Arc; +use std::time::Instant; + +use aquatic_common::access_list::create_access_list_cache; +use aquatic_common::access_list::AccessListArcSwap; +use aquatic_common::AmortizedIndexMap; +use aquatic_common::ValidUntil; + +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +#[derive(Clone, Debug)] +pub struct Peer { + pub ip_address: I, + pub port: Port, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port, + } + } +} + +pub type PeerMap = AmortizedIndexMap>; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = AmortizedIndexMap>; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + /// Remove disallowed and inactive torrents + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let now = Instant::now(); + let access_list_mode = config.access_list.mode; + + let mut access_list_cache = create_access_list_cache(access_list); + + self.ipv4.retain(|info_hash, torrent| { + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + && Self::clean_torrent_and_peers(now, torrent) + }); + self.ipv4.shrink_to_fit(); + + self.ipv6.retain(|info_hash, torrent| { + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + && Self::clean_torrent_and_peers(now, torrent) + }); + self.ipv6.shrink_to_fit(); + } + + /// Returns true if torrent is to be kept + #[inline] + fn clean_torrent_and_peers(now: Instant, torrent: &mut TorrentData) -> bool { + let num_seeders = &mut torrent.num_seeders; + let num_leechers = &mut torrent.num_leechers; + + torrent.peers.retain(|_, peer| { + if peer.valid_until.0 > now { + true + } else { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + + false + } + }); + + if torrent.peers.is_empty() { + false + } else { + torrent.peers.shrink_to_fit(); + + true + } + } +}