http: extract response peers while announcing peer removed

This improves performance by avoiding lots of comparisons
This commit is contained in:
Joakim Frostegård 2024-01-24 23:33:15 +01:00
parent d346cf97aa
commit 73eeb22f66
2 changed files with 94 additions and 64 deletions

View file

@ -7,7 +7,6 @@
interval to clean up data interval to clean up data
* http * http
* extract response peers while peer is removed, as in udp implementation
* consider storing small number of peers without extra heap allocation * consider storing small number of peers without extra heap allocation
* add CI transfer test for http without TLS * add CI transfer test for http without TLS

View file

@ -6,8 +6,7 @@ use rand::Rng;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::{ use aquatic_common::{
extract_response_peers, CanonicalSocketAddr, IndexMap, SecondsSinceServerStart, CanonicalSocketAddr, IndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil,
ServerStartInstant, ValidUntil,
}; };
use aquatic_http_protocol::common::*; use aquatic_http_protocol::common::*;
use aquatic_http_protocol::request::*; use aquatic_http_protocol::request::*;
@ -212,7 +211,7 @@ impl TorrentMaps {
pub type TorrentMap<I> = IndexMap<InfoHash, TorrentData<I>>; pub type TorrentMap<I> = IndexMap<InfoHash, TorrentData<I>>;
pub struct TorrentData<I: Ip> { pub struct TorrentData<I: Ip> {
peers: PeerMap<I>, peers: IndexMap<ResponsePeer<I>, Peer>,
num_seeders: usize, num_seeders: usize,
} }
@ -240,8 +239,6 @@ impl<I: Ip> TorrentData<I> {
request: AnnounceRequest, request: AnnounceRequest,
valid_until: ValidUntil, valid_until: ValidUntil,
) -> (usize, usize, Vec<ResponsePeer<I>>) { ) -> (usize, usize, Vec<ResponsePeer<I>>) {
// Insert/update/remove peer who sent this request
let peer_status = let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
@ -250,75 +247,109 @@ impl<I: Ip> TorrentData<I> {
port: request.port, port: request.port,
}; };
let opt_removed_peer = match peer_status { let opt_removed_peer = self.peers.remove(&peer_map_key);
PeerStatus::Leeching => {
let peer = Peer {
valid_until,
seeder: false,
};
self.peers.insert(peer_map_key, peer) if let Some(Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
}
PeerStatus::Seeding => {
self.num_seeders += 1;
let peer = Peer {
valid_until,
seeder: true,
};
self.peers.insert(peer_map_key, peer)
}
PeerStatus::Stopped => self.peers.remove(&peer_map_key),
};
if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
self.num_seeders -= 1; self.num_seeders -= 1;
} }
#[cfg(feature = "metrics")] let response_peers = match peer_status {
match peer_status { PeerStatus::Seeding | PeerStatus::Leeching => {
PeerStatus::Stopped if opt_removed_peer.is_some() => { #[cfg(feature = "metrics")]
::metrics::decrement_gauge!( if opt_removed_peer.is_none() {
"aquatic_peers", ::metrics::increment_gauge!(
1.0, "aquatic_peers",
"ip_version" => I::ip_version_str(), 1.0,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), "ip_version" => I::ip_version_str(),
); "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
} );
PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => { }
::metrics::increment_gauge!(
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
_ => {}
}
let response_peers = if let PeerStatus::Stopped = peer_status { let max_num_peers_to_take = match request.numwant {
Vec::new() Some(0) | None => config.protocol.max_peers,
} else { Some(numwant) => numwant.min(config.protocol.max_peers),
let max_num_peers_to_take = match request.numwant { };
Some(0) | None => config.protocol.max_peers,
Some(numwant) => numwant.min(config.protocol.max_peers),
};
extract_response_peers( let response_peers = self.extract_response_peers(rng, max_num_peers_to_take);
rng,
&self.peers, let peer = Peer {
max_num_peers_to_take, valid_until,
peer_map_key, seeder: peer_status == PeerStatus::Seeding,
|k, _| *k, };
)
self.peers.insert(peer_map_key, peer);
if peer_status == PeerStatus::Seeding {
self.num_seeders += 1;
}
response_peers
}
PeerStatus::Stopped => {
#[cfg(feature = "metrics")]
if opt_removed_peer.is_some() {
::metrics::decrement_gauge!(
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
Vec::new()
}
}; };
(self.num_seeders, self.num_leechers(), response_peers) (self.num_seeders, self.num_leechers(), response_peers)
} }
}
type PeerMap<I> = IndexMap<ResponsePeer<I>, Peer>; /// Extract response peers
///
/// If there are more peers in map than `max_num_peers_to_take`, do a random
/// selection of peers from first and second halves of map in order to avoid
/// returning too homogeneous peers.
///
/// Does NOT filter out announcing peer.
pub fn extract_response_peers(
&self,
rng: &mut impl Rng,
max_num_peers_to_take: usize,
) -> Vec<ResponsePeer<I>> {
if self.peers.len() <= max_num_peers_to_take {
self.peers.keys().copied().collect()
} else {
let middle_index = self.peers.len() / 2;
let num_to_take_per_half = max_num_peers_to_take / 2;
let offset_half_one = {
let from = 0;
let to = usize::max(1, middle_index - num_to_take_per_half);
rng.gen_range(from..to)
};
let offset_half_two = {
let from = middle_index;
let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half);
rng.gen_range(from..to)
};
let end_half_one = offset_half_one + num_to_take_per_half;
let end_half_two = offset_half_two + num_to_take_per_half;
let mut peers = Vec::with_capacity(max_num_peers_to_take);
if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) {
peers.extend(slice.keys());
}
if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) {
peers.extend(slice.keys());
}
peers
}
}
}
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
struct Peer { struct Peer {