diff --git a/aquatic_udp/src/workers/request/mod.rs b/aquatic_udp/src/workers/request/mod.rs index 78cd2e3..1b8335a 100644 --- a/aquatic_udp/src/workers/request/mod.rs +++ b/aquatic_udp/src/workers/request/mod.rs @@ -8,7 +8,7 @@ use std::time::Instant; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{extract_response_peers, CanonicalSocketAddr, PanicSentinel, ValidUntil}; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil}; use aquatic_udp_protocol::*; @@ -117,72 +117,38 @@ fn handle_announce_request( peer_ip: I, peer_valid_until: ValidUntil, ) -> AnnounceResponse { - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); + let max_num_peers_to_take = if request.peers_wanted.0 <= 0 { + config.protocol.max_response_peers as usize + } else { + ::std::cmp::min( + config.protocol.max_response_peers as usize, + request.peers_wanted.0.try_into().unwrap(), + ) + }; let peer = Peer { ip_address: peer_ip, port: request.port, - status: peer_status, + status: PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left), valid_until: peer_valid_until, }; let torrent_data = torrents.0.entry(request.info_hash).or_default(); - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; + torrent_data.update_peer(request.peer_id, peer); - torrent_data.peers.insert(request.peer_id, peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(request.peer_id, peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - request.peer_id, - Peer::to_response_peer, - ); + let response_peers = + torrent_data.extract_response_peers(rng, request.peer_id, max_num_peers_to_take); AnnounceResponse { transaction_id: request.transaction_id, announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), + leechers: NumberOfPeers(torrent_data.num_leechers() as i32), + seeders: NumberOfPeers(torrent_data.num_seeders() as i32), peers: response_peers, } } -#[inline] -fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { - if peers_wanted <= 0 { - config.protocol.max_response_peers as usize - } else { - ::std::cmp::min( - config.protocol.max_response_peers as usize, - peers_wanted as usize, - ) - } -} - fn handle_scrape_request( torrents: &mut TorrentMap, request: PendingScrapeRequest, @@ -196,12 +162,7 @@ fn handle_scrape_request( let stats = torrents .0 .get(&info_hash) - .map(|torrent_data| { - create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - ) - }) + .map(|torrent_data| torrent_data.scrape_statistics()) .unwrap_or(EMPTY_STATS); (i, stats) @@ -222,95 +183,3 @@ const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> Torren leechers: NumberOfPeers(leechers), } } - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::net::Ipv4Addr; - - use quickcheck::{quickcheck, TestResult}; - use rand::thread_rng; - - use super::*; - - fn gen_peer_id(i: u32) -> PeerId { - let mut peer_id = PeerId([0; 20]); - - peer_id.0[0..4].copy_from_slice(&i.to_ne_bytes()); - - peer_id - } - fn gen_peer(i: u32) -> Peer { - Peer { - ip_address: Ipv4Addr::from(i.to_be_bytes()), - port: Port(1), - status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), - } - } - - #[test] - fn test_extract_response_peers() { - fn prop(data: (u16, u16)) -> TestResult { - let gen_num_peers = data.0 as u32; - let req_num_peers = data.1 as usize; - - let mut peer_map: storage::PeerMap = Default::default(); - - let mut opt_sender_key = None; - let mut opt_sender_peer = None; - - for i in 0..gen_num_peers { - let key = gen_peer_id(i); - let peer = gen_peer((i << 16) + i); - - if i == 0 { - opt_sender_key = Some(key); - opt_sender_peer = Some(peer.to_response_peer()); - } - - peer_map.insert(key, peer); - } - - let mut rng = thread_rng(); - - let peers = extract_response_peers( - &mut rng, - &peer_map, - req_num_peers, - opt_sender_key.unwrap_or_else(|| gen_peer_id(1)), - Peer::to_response_peer, - ); - - // Check that number of returned peers is correct - - let mut success = peers.len() <= req_num_peers; - - if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize - || peers.len() + 1 == gen_num_peers as usize; - } - - // Check that returned peers are unique (no overlap) and that sender - // isn't returned - - let mut ip_addresses = HashSet::with_capacity(peers.len()); - - for peer in peers { - if peer == opt_sender_peer.clone().unwrap() - || ip_addresses.contains(&peer.ip_address) - { - success = false; - - break; - } - - ip_addresses.insert(peer.ip_address); - } - - TestResult::from_bool(success) - } - - quickcheck(prop as fn((u16, u16)) -> TestResult); - } -} diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/request/storage.rs index 8280be7..75bc311 100644 --- a/aquatic_udp/src/workers/request/storage.rs +++ b/aquatic_udp/src/workers/request/storage.rs @@ -5,14 +5,17 @@ use std::time::Instant; use aquatic_common::{ access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, - AmortizedIndexMap, ValidUntil, + extract_response_peers, AmortizedIndexMap, ValidUntil, }; use aquatic_udp_protocol::*; +use rand::prelude::SmallRng; use crate::common::*; use crate::config::Config; +use super::create_torrent_scrape_statistics; + #[derive(Clone, Debug)] pub struct Peer { pub ip_address: I, @@ -33,12 +36,68 @@ impl Peer { pub type PeerMap = AmortizedIndexMap>; pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, + peers: PeerMap, + num_seeders: usize, + num_leechers: usize, } impl TorrentData { + pub fn update_peer(&mut self, peer_id: PeerId, peer: Peer) { + let opt_removed_peer = match peer.status { + PeerStatus::Leeching => { + self.num_leechers += 1; + + self.peers.insert(peer_id, peer) + } + PeerStatus::Seeding => { + self.num_seeders += 1; + + self.peers.insert(peer_id, peer) + } + PeerStatus::Stopped => self.peers.remove(&peer_id), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + self.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + self.num_seeders -= 1; + } + _ => {} + } + } + + pub fn extract_response_peers( + &self, + rng: &mut SmallRng, + peer_id: PeerId, + max_num_peers_to_take: usize, + ) -> Vec> { + extract_response_peers( + rng, + &self.peers, + max_num_peers_to_take, + peer_id, + Peer::to_response_peer, + ) + } + + pub fn num_leechers(&self) -> usize { + self.num_leechers + } + + pub fn num_seeders(&self) -> usize { + self.num_seeders + } + + pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { + create_torrent_scrape_statistics( + self.num_seeders.try_into().unwrap_or(i32::MAX), + self.num_leechers.try_into().unwrap_or(i32::MAX), + ) + } + /// Remove inactive peers and reclaim space fn clean(&mut self, now: Instant) { self.peers.retain(|_, peer| { @@ -144,3 +203,95 @@ impl TorrentMaps { (ipv4, ipv6) } } + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::Ipv4Addr; + + use quickcheck::{quickcheck, TestResult}; + use rand::thread_rng; + + use super::*; + + fn gen_peer_id(i: u32) -> PeerId { + let mut peer_id = PeerId([0; 20]); + + peer_id.0[0..4].copy_from_slice(&i.to_ne_bytes()); + + peer_id + } + fn gen_peer(i: u32) -> Peer { + Peer { + ip_address: Ipv4Addr::from(i.to_be_bytes()), + port: Port(1), + status: PeerStatus::Leeching, + valid_until: ValidUntil::new(0), + } + } + + #[test] + fn test_extract_response_peers() { + fn prop(data: (u16, u16)) -> TestResult { + let gen_num_peers = data.0 as u32; + let req_num_peers = data.1 as usize; + + let mut peer_map: PeerMap = Default::default(); + + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + + for i in 0..gen_num_peers { + let key = gen_peer_id(i); + let peer = gen_peer((i << 16) + i); + + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(peer.to_response_peer()); + } + + peer_map.insert(key, peer); + } + + let mut rng = thread_rng(); + + let peers = extract_response_peers( + &mut rng, + &peer_map, + req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_id(1)), + Peer::to_response_peer, + ); + + // Check that number of returned peers is correct + + let mut success = peers.len() <= req_num_peers; + + if req_num_peers >= gen_num_peers as usize { + success &= peers.len() == gen_num_peers as usize + || peers.len() + 1 == gen_num_peers as usize; + } + + // Check that returned peers are unique (no overlap) and that sender + // isn't returned + + let mut ip_addresses = HashSet::with_capacity(peers.len()); + + for peer in peers { + if peer == opt_sender_peer.clone().unwrap() + || ip_addresses.contains(&peer.ip_address) + { + success = false; + + break; + } + + ip_addresses.insert(peer.ip_address); + } + + TestResult::from_bool(success) + } + + quickcheck(prop as fn((u16, u16)) -> TestResult); + } +}