diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 8e71191..cef413e 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -26,13 +26,14 @@ impl ValidUntil { /// If there are more peers in map than `max_num_peers_to_take`, do a /// half-random selection of peers from first and second halves of map, /// in order to avoid returning too homogeneous peers. -/// -/// Don't care if we send back announcing peer. +/// +/// Might return one less peer than wanted since sender is filtered out. #[inline] pub fn extract_response_peers( rng: &mut impl Rng, peer_map: &IndexMap, max_num_peers_to_take: usize, + sender_peer_map_key: K, peer_conversion_function: F ) -> Vec where @@ -41,9 +42,15 @@ pub fn extract_response_peers( { let peer_map_len = peer_map.len(); - if peer_map_len <= max_num_peers_to_take { - peer_map.values() - .map(peer_conversion_function) + if peer_map_len <= max_num_peers_to_take + 1 { + peer_map.iter() + .filter_map(|(k, v)|{ + if *k == sender_peer_map_key { + None + } else { + Some(peer_conversion_function(v)) + } + }) .collect() } else { let half_num_to_take = max_num_peers_to_take / 2; @@ -64,17 +71,19 @@ pub fn extract_response_peers( let mut peers: Vec = Vec::with_capacity(max_num_peers_to_take); for i in offset_first_half..end_first_half { - if let Some((_, peer)) = peer_map.get_index(i){ - peers.push(peer_conversion_function(peer)) + if let Some((k, peer)) = peer_map.get_index(i){ + if *k != sender_peer_map_key { + peers.push(peer_conversion_function(peer)) + } } } for i in offset_second_half..end_second_half { - if let Some((_, peer)) = peer_map.get_index(i){ - peers.push(peer_conversion_function(peer)) + if let Some((k, peer)) = peer_map.get_index(i){ + if *k != sender_peer_map_key { + peers.push(peer_conversion_function(peer)) + } } } - - debug_assert_eq!(peers.len(), max_num_peers_to_take); peers } diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index fe14c0a..050c712 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -21,7 +21,7 @@ pub const LISTENER_TOKEN: Token = Token(0); pub const CHANNEL_TOKEN: Token = Token(1); -pub trait Ip: Copy + Eq + ::std::hash::Hash {} +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} impl Ip for Ipv4Addr {} impl Ip for Ipv6Addr {} @@ -73,7 +73,7 @@ impl PeerStatus { } -#[derive(Clone, Copy)] +#[derive(Debug, Clone, Copy)] pub struct Peer { pub connection_meta: PeerConnectionMeta, pub port: u16, diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index ec95986..27592d9 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -194,57 +194,64 @@ fn upsert_peer_and_get_response_peers( valid_until: ValidUntil, ) -> (usize, usize, Vec>) { // Insert/update/remove peer who sent this request - { - let peer_status = PeerStatus::from_event_and_bytes_left( - request.event, - Some(request.bytes_left) + + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event, + Some(request.bytes_left) + ); + + let peer = Peer { + connection_meta: request_sender_meta, + port: request.port, + status: peer_status, + valid_until, + }; + + ::log::debug!("peer: {:?}", peer); + + let ip_or_key = request.key + .map(Either::Right) + .unwrap_or_else(|| + Either::Left(request_sender_meta.peer_ip_address) ); - let peer = Peer { - connection_meta: request_sender_meta, - port: request.port, - status: peer_status, - valid_until, - }; + let peer_map_key = PeerMapKey { + peer_id: request.peer_id, + ip_or_key, + }; - let ip_or_key = request.key - .map(Either::Right) - .unwrap_or_else(|| - Either::Left(request_sender_meta.peer_ip_address) - ); + ::log::debug!("peer map key: {:?}", peer_map_key); - let peer_map_key = PeerMapKey { - peer_id: request.peer_id, - ip_or_key, - }; + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; + torrent_data.peers.insert(peer_map_key.clone(), peer) + }, + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; - torrent_data.peers.insert(peer_map_key, peer) - }, - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_map_key, peer) - }, - PeerStatus::Stopped => { - torrent_data.peers.remove(&peer_map_key) - } - }; - - match opt_removed_peer.map(|peer| peer.status){ - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - }, - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - }, - _ => {} + torrent_data.peers.insert(peer_map_key.clone(), peer) + }, + PeerStatus::Stopped => { + torrent_data.peers.remove(&peer_map_key) } + }; + + ::log::debug!("opt_removed_peer: {:?}", opt_removed_peer); + + match opt_removed_peer.map(|peer| peer.status){ + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + }, + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + }, + _ => {} } + ::log::debug!("peer request numwant: {:?}", request.numwant); + let max_num_peers_to_take = match request.numwant { Some(0) | None => config.protocol.max_peers, Some(numwant) => numwant.min(config.protocol.max_peers), @@ -254,6 +261,7 @@ fn upsert_peer_and_get_response_peers( rng, &torrent_data.peers, max_num_peers_to_take, + peer_map_key, Peer::to_response_peer ); diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index b23c6d6..d46d26f 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -90,7 +90,7 @@ impl Peer { } -#[derive(PartialEq, Eq, Hash, Clone)] +#[derive(PartialEq, Eq, Hash, Clone, Copy)] pub struct PeerMapKey { pub ip: I, pub peer_id: PeerId diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index 3821929..aae5cdd 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -291,6 +291,7 @@ fn handle_announce_request( rng, &torrent_data.peers, max_num_peers_to_take, + peer_key, Peer::to_response_peer ); @@ -419,9 +420,17 @@ mod tests { let mut peer_map: PeerMap = IndexMap::new(); + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + for i in 0..gen_num_peers { let (key, value) = gen_peer_map_key_and_value(i); + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(value.to_response_peer()); + } + peer_map.insert(key, value); } @@ -431,6 +440,7 @@ mod tests { &mut rng, &peer_map, req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), Peer::to_response_peer ); @@ -439,15 +449,17 @@ mod tests { let mut success = peers.len() <= req_num_peers; if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize; + success &= peers.len() == gen_num_peers as usize || + peers.len() + 1 == gen_num_peers as usize; } - // Check that returned peers are unique (no overlap) + // Check that returned peers are unique (no overlap) and that sender + // isn't returned let mut ip_addresses = HashSet::new(); for peer in peers { - if ip_addresses.contains(&peer.ip_address){ + if peer == opt_sender_peer.clone().unwrap() || ip_addresses.contains(&peer.ip_address){ success = false; break; diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index 8ec0e03..1e53fbf 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -178,23 +178,13 @@ pub fn handle_announce_requests( rng, &torrent_data.peers, max_num_peers_to_take, + request.peer_id, f ); for (offer, offer_receiver) in offers.into_iter() .zip(offer_receivers) { - // Avoid sending offer back to requesting peer. This could be - // done in extract_announce_peers, but it would likely hurt - // performance to check all peers there for their socket addr, - // especially if there are thousands of peers. It might be - // possible to write a new version of that function which isn't - // shared with aquatic_udp and goes about it differently - // though. - if request_sender_meta.naive_peer_addr == offer_receiver.connection_meta.naive_peer_addr { - continue; - } - let middleman_offer = MiddlemanOfferToPeer { action: AnnounceAction, info_hash: request.info_hash,