diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 06c1e75..941dab0 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -3,13 +3,10 @@ use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::time::Instant; use crossbeam_channel::{Sender, TrySendError}; -use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; -use aquatic_common::AHashIndexMap; -use aquatic_common::ValidUntil; +use aquatic_common::access_list::AccessListArcSwap; use aquatic_udp_protocol::*; use crate::config::Config; @@ -150,95 +147,6 @@ impl PeerStatus { } } -#[derive(Clone, Debug)] -pub struct Peer { - pub ip_address: I, - pub port: Port, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -pub type PeerMap = AHashIndexMap>; - -pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -pub type TorrentMap = AHashIndexMap>; - -#[derive(Default)] -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let now = Instant::now(); - let access_list_mode = config.access_list.mode; - - let mut access_list_cache = create_access_list_cache(access_list); - - self.ipv4.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && Self::clean_torrent_and_peers(now, torrent) - }); - self.ipv4.shrink_to_fit(); - - self.ipv6.retain(|info_hash, torrent| { - access_list_cache - .load() - .allows(access_list_mode, &info_hash.0) - && Self::clean_torrent_and_peers(now, torrent) - }); - self.ipv6.shrink_to_fit(); - } - - /// Returns true if torrent is to be kept - #[inline] - fn clean_torrent_and_peers(now: Instant, torrent: &mut TorrentData) -> bool { - let num_seeders = &mut torrent.num_seeders; - let num_leechers = &mut torrent.num_leechers; - - torrent.peers.retain(|_, peer| { - let keep = peer.valid_until.0 > now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - torrent.peers.shrink_to_fit(); - - !torrent.peers.is_empty() - } -} - pub struct Statistics { pub requests_received: AtomicUsize, pub responses_sent: AtomicUsize, diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index ea81d29..44163ab 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -4,9 +4,13 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::net::SocketAddr; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use aquatic_common::access_list::create_access_list_cache; +use aquatic_common::access_list::AccessListArcSwap; +use aquatic_common::AHashIndexMap; use aquatic_common::ValidUntil; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; @@ -18,6 +22,95 @@ use aquatic_udp_protocol::*; use crate::common::*; use crate::config::Config; +#[derive(Clone, Debug)] +struct Peer { + pub ip_address: I, + pub port: Port, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +type PeerMap = AHashIndexMap>; + +struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +type TorrentMap = AHashIndexMap>; + +#[derive(Default)] +struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + /// Remove disallowed and inactive torrents + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let now = Instant::now(); + let access_list_mode = config.access_list.mode; + + let mut access_list_cache = create_access_list_cache(access_list); + + self.ipv4.retain(|info_hash, torrent| { + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + && Self::clean_torrent_and_peers(now, torrent) + }); + self.ipv4.shrink_to_fit(); + + self.ipv6.retain(|info_hash, torrent| { + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) + && Self::clean_torrent_and_peers(now, torrent) + }); + self.ipv6.shrink_to_fit(); + } + + /// Returns true if torrent is to be kept + #[inline] + fn clean_torrent_and_peers(now: Instant, torrent: &mut TorrentData) -> bool { + let num_seeders = &mut torrent.num_seeders; + let num_leechers = &mut torrent.num_leechers; + + torrent.peers.retain(|_, peer| { + let keep = peer.valid_until.0 > now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + torrent.peers.shrink_to_fit(); + + !torrent.peers.is_empty() + } +} + #[derive(Clone, PartialEq, Debug)] pub struct ProtocolResponsePeer { pub ip_address: I, @@ -156,7 +249,7 @@ pub fn run_request_worker( } } -pub fn handle_announce_request( +fn handle_announce_request( config: &Config, rng: &mut SmallRng, torrents: &mut TorrentMaps, @@ -260,7 +353,7 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { } } -pub fn handle_scrape_request( +fn handle_scrape_request( torrents: &mut TorrentMaps, src: SocketAddr, request: PendingScrapeRequest,