aquatic_common: extract response peers: don't return sender

Seems to fix or at least help with some issues with
real clients being slow to initiate transfers
This commit is contained in:
Joakim Frostegård 2020-08-16 23:07:39 +02:00
parent b5452c2954
commit 6ee8ed4895
6 changed files with 89 additions and 70 deletions

View file

@ -26,13 +26,14 @@ impl ValidUntil {
/// If there are more peers in map than `max_num_peers_to_take`, do a /// If there are more peers in map than `max_num_peers_to_take`, do a
/// half-random selection of peers from first and second halves of map, /// half-random selection of peers from first and second halves of map,
/// in order to avoid returning too homogeneous peers. /// in order to avoid returning too homogeneous peers.
/// ///
/// Don't care if we send back announcing peer. /// Might return one less peer than wanted since sender is filtered out.
#[inline] #[inline]
pub fn extract_response_peers<K, V, R, F>( pub fn extract_response_peers<K, V, R, F>(
rng: &mut impl Rng, rng: &mut impl Rng,
peer_map: &IndexMap<K, V>, peer_map: &IndexMap<K, V>,
max_num_peers_to_take: usize, max_num_peers_to_take: usize,
sender_peer_map_key: K,
peer_conversion_function: F peer_conversion_function: F
) -> Vec<R> ) -> Vec<R>
where where
@ -41,9 +42,15 @@ pub fn extract_response_peers<K, V, R, F>(
{ {
let peer_map_len = peer_map.len(); let peer_map_len = peer_map.len();
if peer_map_len <= max_num_peers_to_take { if peer_map_len <= max_num_peers_to_take + 1 {
peer_map.values() peer_map.iter()
.map(peer_conversion_function) .filter_map(|(k, v)|{
if *k == sender_peer_map_key {
None
} else {
Some(peer_conversion_function(v))
}
})
.collect() .collect()
} else { } else {
let half_num_to_take = max_num_peers_to_take / 2; let half_num_to_take = max_num_peers_to_take / 2;
@ -64,17 +71,19 @@ pub fn extract_response_peers<K, V, R, F>(
let mut peers: Vec<R> = Vec::with_capacity(max_num_peers_to_take); let mut peers: Vec<R> = Vec::with_capacity(max_num_peers_to_take);
for i in offset_first_half..end_first_half { for i in offset_first_half..end_first_half {
if let Some((_, peer)) = peer_map.get_index(i){ if let Some((k, peer)) = peer_map.get_index(i){
peers.push(peer_conversion_function(peer)) if *k != sender_peer_map_key {
peers.push(peer_conversion_function(peer))
}
} }
} }
for i in offset_second_half..end_second_half { for i in offset_second_half..end_second_half {
if let Some((_, peer)) = peer_map.get_index(i){ if let Some((k, peer)) = peer_map.get_index(i){
peers.push(peer_conversion_function(peer)) if *k != sender_peer_map_key {
peers.push(peer_conversion_function(peer))
}
} }
} }
debug_assert_eq!(peers.len(), max_num_peers_to_take);
peers peers
} }

View file

@ -21,7 +21,7 @@ pub const LISTENER_TOKEN: Token = Token(0);
pub const CHANNEL_TOKEN: Token = Token(1); pub const CHANNEL_TOKEN: Token = Token(1);
pub trait Ip: Copy + Eq + ::std::hash::Hash {} pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
impl Ip for Ipv4Addr {} impl Ip for Ipv4Addr {}
impl Ip for Ipv6Addr {} impl Ip for Ipv6Addr {}
@ -73,7 +73,7 @@ impl PeerStatus {
} }
#[derive(Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct Peer<I: Ip> { pub struct Peer<I: Ip> {
pub connection_meta: PeerConnectionMeta<I>, pub connection_meta: PeerConnectionMeta<I>,
pub port: u16, pub port: u16,

View file

@ -194,57 +194,64 @@ fn upsert_peer_and_get_response_peers<I: Ip>(
valid_until: ValidUntil, valid_until: ValidUntil,
) -> (usize, usize, Vec<ResponsePeer<I>>) { ) -> (usize, usize, Vec<ResponsePeer<I>>) {
// Insert/update/remove peer who sent this request // Insert/update/remove peer who sent this request
{
let peer_status = PeerStatus::from_event_and_bytes_left( let peer_status = PeerStatus::from_event_and_bytes_left(
request.event, request.event,
Some(request.bytes_left) Some(request.bytes_left)
);
let peer = Peer {
connection_meta: request_sender_meta,
port: request.port,
status: peer_status,
valid_until,
};
::log::debug!("peer: {:?}", peer);
let ip_or_key = request.key
.map(Either::Right)
.unwrap_or_else(||
Either::Left(request_sender_meta.peer_ip_address)
); );
let peer = Peer { let peer_map_key = PeerMapKey {
connection_meta: request_sender_meta, peer_id: request.peer_id,
port: request.port, ip_or_key,
status: peer_status, };
valid_until,
};
let ip_or_key = request.key ::log::debug!("peer map key: {:?}", peer_map_key);
.map(Either::Right)
.unwrap_or_else(||
Either::Left(request_sender_meta.peer_ip_address)
);
let peer_map_key = PeerMapKey { let opt_removed_peer = match peer_status {
peer_id: request.peer_id, PeerStatus::Leeching => {
ip_or_key, torrent_data.num_leechers += 1;
};
let opt_removed_peer = match peer_status { torrent_data.peers.insert(peer_map_key.clone(), peer)
PeerStatus::Leeching => { },
torrent_data.num_leechers += 1; PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(peer_map_key, peer) torrent_data.peers.insert(peer_map_key.clone(), peer)
}, },
PeerStatus::Seeding => { PeerStatus::Stopped => {
torrent_data.num_seeders += 1; torrent_data.peers.remove(&peer_map_key)
torrent_data.peers.insert(peer_map_key, peer)
},
PeerStatus::Stopped => {
torrent_data.peers.remove(&peer_map_key)
}
};
match opt_removed_peer.map(|peer| peer.status){
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
},
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
},
_ => {}
} }
};
::log::debug!("opt_removed_peer: {:?}", opt_removed_peer);
match opt_removed_peer.map(|peer| peer.status){
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
},
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
},
_ => {}
} }
::log::debug!("peer request numwant: {:?}", request.numwant);
let max_num_peers_to_take = match request.numwant { let max_num_peers_to_take = match request.numwant {
Some(0) | None => config.protocol.max_peers, Some(0) | None => config.protocol.max_peers,
Some(numwant) => numwant.min(config.protocol.max_peers), Some(numwant) => numwant.min(config.protocol.max_peers),
@ -254,6 +261,7 @@ fn upsert_peer_and_get_response_peers<I: Ip>(
rng, rng,
&torrent_data.peers, &torrent_data.peers,
max_num_peers_to_take, max_num_peers_to_take,
peer_map_key,
Peer::to_response_peer Peer::to_response_peer
); );

View file

@ -90,7 +90,7 @@ impl <I: Ip>Peer<I> {
} }
#[derive(PartialEq, Eq, Hash, Clone)] #[derive(PartialEq, Eq, Hash, Clone, Copy)]
pub struct PeerMapKey<I: Ip> { pub struct PeerMapKey<I: Ip> {
pub ip: I, pub ip: I,
pub peer_id: PeerId pub peer_id: PeerId

View file

@ -291,6 +291,7 @@ fn handle_announce_request<I: Ip>(
rng, rng,
&torrent_data.peers, &torrent_data.peers,
max_num_peers_to_take, max_num_peers_to_take,
peer_key,
Peer::to_response_peer Peer::to_response_peer
); );
@ -419,9 +420,17 @@ mod tests {
let mut peer_map: PeerMap<Ipv4Addr> = IndexMap::new(); let mut peer_map: PeerMap<Ipv4Addr> = IndexMap::new();
let mut opt_sender_key = None;
let mut opt_sender_peer = None;
for i in 0..gen_num_peers { for i in 0..gen_num_peers {
let (key, value) = gen_peer_map_key_and_value(i); let (key, value) = gen_peer_map_key_and_value(i);
if i == 0 {
opt_sender_key = Some(key);
opt_sender_peer = Some(value.to_response_peer());
}
peer_map.insert(key, value); peer_map.insert(key, value);
} }
@ -431,6 +440,7 @@ mod tests {
&mut rng, &mut rng,
&peer_map, &peer_map,
req_num_peers, req_num_peers,
opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0),
Peer::to_response_peer Peer::to_response_peer
); );
@ -439,15 +449,17 @@ mod tests {
let mut success = peers.len() <= req_num_peers; let mut success = peers.len() <= req_num_peers;
if req_num_peers >= gen_num_peers as usize { if req_num_peers >= gen_num_peers as usize {
success &= peers.len() == gen_num_peers as usize; success &= peers.len() == gen_num_peers as usize ||
peers.len() + 1 == gen_num_peers as usize;
} }
// Check that returned peers are unique (no overlap) // Check that returned peers are unique (no overlap) and that sender
// isn't returned
let mut ip_addresses = HashSet::new(); let mut ip_addresses = HashSet::new();
for peer in peers { for peer in peers {
if ip_addresses.contains(&peer.ip_address){ if peer == opt_sender_peer.clone().unwrap() || ip_addresses.contains(&peer.ip_address){
success = false; success = false;
break; break;

View file

@ -178,23 +178,13 @@ pub fn handle_announce_requests(
rng, rng,
&torrent_data.peers, &torrent_data.peers,
max_num_peers_to_take, max_num_peers_to_take,
request.peer_id,
f f
); );
for (offer, offer_receiver) in offers.into_iter() for (offer, offer_receiver) in offers.into_iter()
.zip(offer_receivers) .zip(offer_receivers)
{ {
// Avoid sending offer back to requesting peer. This could be
// done in extract_announce_peers, but it would likely hurt
// performance to check all peers there for their socket addr,
// especially if there are thousands of peers. It might be
// possible to write a new version of that function which isn't
// shared with aquatic_udp and goes about it differently
// though.
if request_sender_meta.naive_peer_addr == offer_receiver.connection_meta.naive_peer_addr {
continue;
}
let middleman_offer = MiddlemanOfferToPeer { let middleman_offer = MiddlemanOfferToPeer {
action: AnnounceAction, action: AnnounceAction,
info_hash: request.info_hash, info_hash: request.info_hash,