mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 09:45:31 +00:00
http: avoid heap allocation for torrents with few peers; refactor
This commit is contained in:
parent
91f5289b2b
commit
352194e0bd
5 changed files with 322 additions and 192 deletions
|
|
@ -47,6 +47,8 @@
|
|||
|
||||
* Index peers by packet source IP and provided port instead of by source ip
|
||||
and peer id. This is likely slightly faster.
|
||||
* Avoid a heap allocation for torrents with four or less peers. This can save
|
||||
a lot of memory if many torrents are tracked
|
||||
* Improve announce performance by avoiding having to filter response peers
|
||||
* In announce response statistics, don't include announcing peer
|
||||
|
||||
|
|
|
|||
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -183,6 +183,7 @@ dependencies = [
|
|||
"aquatic_http_protocol",
|
||||
"aquatic_toml_config",
|
||||
"arc-swap",
|
||||
"arrayvec",
|
||||
"cfg-if",
|
||||
"either",
|
||||
"futures",
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ aquatic_http_protocol.workspace = true
|
|||
aquatic_toml_config.workspace = true
|
||||
|
||||
anyhow = "1"
|
||||
arrayvec = "0.7"
|
||||
arc-swap = "1"
|
||||
cfg-if = "1"
|
||||
either = "1"
|
||||
|
|
|
|||
|
|
@ -58,19 +58,8 @@ pub async fn run_swarm_worker(
|
|||
// Periodically update torrent count metrics
|
||||
#[cfg(feature = "metrics")]
|
||||
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
|
||||
enclose!((config, torrents, worker_index) move || async move {
|
||||
let torrents = torrents.borrow_mut();
|
||||
|
||||
::metrics::gauge!(
|
||||
"aquatic_torrents",
|
||||
"ip_version" => "4",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
).set(torrents.ipv4.len() as f64);
|
||||
::metrics::gauge!(
|
||||
"aquatic_torrents",
|
||||
"ip_version" => "6",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
).set(torrents.ipv6.len() as f64);
|
||||
enclose!((config, torrents) move || async move {
|
||||
torrents.borrow_mut().update_torrent_metrics();
|
||||
|
||||
Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
|
||||
})()
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::collections::BTreeMap;
|
|||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrayvec::ArrayVec;
|
||||
use rand::Rng;
|
||||
|
||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
||||
|
|
@ -15,52 +16,23 @@ use aquatic_http_protocol::response::*;
|
|||
|
||||
use crate::config::Config;
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
const SMALL_PEER_MAP_CAPACITY: usize = 4;
|
||||
|
||||
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn ip_version_str() -> &'static str;
|
||||
}
|
||||
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
|
||||
|
||||
impl Ip for Ipv4Addr {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn ip_version_str() -> &'static str {
|
||||
"4"
|
||||
}
|
||||
}
|
||||
impl Ip for Ipv6Addr {
|
||||
#[cfg(feature = "metrics")]
|
||||
fn ip_version_str() -> &'static str {
|
||||
"6"
|
||||
}
|
||||
}
|
||||
impl Ip for Ipv4Addr {}
|
||||
impl Ip for Ipv6Addr {}
|
||||
|
||||
pub struct TorrentMaps {
|
||||
pub ipv4: TorrentMap<Ipv4Addr>,
|
||||
pub ipv6: TorrentMap<Ipv6Addr>,
|
||||
#[cfg(feature = "metrics")]
|
||||
pub ipv4_peer_gauge: metrics::Gauge,
|
||||
#[cfg(feature = "metrics")]
|
||||
pub ipv6_peer_gauge: metrics::Gauge,
|
||||
}
|
||||
|
||||
impl TorrentMaps {
|
||||
pub fn new(worker_index: usize) -> Self {
|
||||
Self {
|
||||
ipv4: Default::default(),
|
||||
ipv6: Default::default(),
|
||||
#[cfg(feature = "metrics")]
|
||||
ipv4_peer_gauge: ::metrics::gauge!(
|
||||
"aquatic_peers",
|
||||
"ip_version" => "4",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
),
|
||||
#[cfg(feature = "metrics")]
|
||||
ipv6_peer_gauge: ::metrics::gauge!(
|
||||
"aquatic_peers",
|
||||
"ip_version" => "6",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
),
|
||||
ipv4: TorrentMap::new(worker_index, true),
|
||||
ipv6: TorrentMap::new(worker_index, false),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -74,18 +46,13 @@ impl TorrentMaps {
|
|||
) -> AnnounceResponse {
|
||||
match peer_addr.get().ip() {
|
||||
IpAddr::V4(peer_ip_address) => {
|
||||
let (seeders, leechers, response_peers) = self
|
||||
.ipv4
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.upsert_peer_and_get_response_peers(
|
||||
let (seeders, leechers, response_peers) =
|
||||
self.ipv4.upsert_peer_and_get_response_peers(
|
||||
config,
|
||||
rng,
|
||||
valid_until,
|
||||
peer_ip_address,
|
||||
request,
|
||||
valid_until,
|
||||
#[cfg(feature = "metrics")]
|
||||
&self.ipv4_peer_gauge,
|
||||
);
|
||||
|
||||
AnnounceResponse {
|
||||
|
|
@ -98,18 +65,13 @@ impl TorrentMaps {
|
|||
}
|
||||
}
|
||||
IpAddr::V6(peer_ip_address) => {
|
||||
let (seeders, leechers, response_peers) = self
|
||||
.ipv6
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.upsert_peer_and_get_response_peers(
|
||||
let (seeders, leechers, response_peers) =
|
||||
self.ipv6.upsert_peer_and_get_response_peers(
|
||||
config,
|
||||
rng,
|
||||
valid_until,
|
||||
peer_ip_address,
|
||||
request,
|
||||
valid_until,
|
||||
#[cfg(feature = "metrics")]
|
||||
&self.ipv6_peer_gauge,
|
||||
);
|
||||
|
||||
AnnounceResponse {
|
||||
|
|
@ -130,46 +92,16 @@ impl TorrentMaps {
|
|||
peer_addr: CanonicalSocketAddr,
|
||||
request: ScrapeRequest,
|
||||
) -> ScrapeResponse {
|
||||
let num_to_take = request
|
||||
.info_hashes
|
||||
.len()
|
||||
.min(config.protocol.max_scrape_torrents);
|
||||
|
||||
let mut response = ScrapeResponse {
|
||||
files: BTreeMap::new(),
|
||||
};
|
||||
|
||||
let peer_ip = peer_addr.get().ip();
|
||||
|
||||
// If request.info_hashes is empty, don't return scrape for all
|
||||
// torrents, even though reference server does it. It is too expensive.
|
||||
if peer_ip.is_ipv4() {
|
||||
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
|
||||
if let Some(torrent_data) = self.ipv4.get(&info_hash) {
|
||||
let stats = ScrapeStatistics {
|
||||
complete: torrent_data.num_seeders,
|
||||
downloaded: 0, // No implementation planned
|
||||
incomplete: torrent_data.num_leechers(),
|
||||
};
|
||||
|
||||
response.files.insert(info_hash, stats);
|
||||
}
|
||||
}
|
||||
if peer_addr.get().ip().is_ipv4() {
|
||||
self.ipv4.handle_scrape_request(config, request)
|
||||
} else {
|
||||
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
|
||||
if let Some(torrent_data) = self.ipv6.get(&info_hash) {
|
||||
let stats = ScrapeStatistics {
|
||||
complete: torrent_data.num_seeders,
|
||||
downloaded: 0, // No implementation planned
|
||||
incomplete: torrent_data.num_leechers(),
|
||||
};
|
||||
self.ipv6.handle_scrape_request(config, request)
|
||||
}
|
||||
}
|
||||
|
||||
response.files.insert(info_hash, stats);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
response
|
||||
pub fn update_torrent_metrics(&self) {
|
||||
self.ipv4.torrent_gauge.set(self.ipv4.torrents.len() as f64);
|
||||
self.ipv6.torrent_gauge.set(self.ipv6.torrents.len() as f64);
|
||||
}
|
||||
|
||||
pub fn clean(
|
||||
|
|
@ -182,32 +114,117 @@ impl TorrentMaps {
|
|||
|
||||
let now = server_start_instant.seconds_elapsed();
|
||||
|
||||
Self::clean_torrent_map(
|
||||
config,
|
||||
&mut access_list_cache,
|
||||
&mut self.ipv4,
|
||||
now,
|
||||
&self.ipv4_peer_gauge,
|
||||
);
|
||||
Self::clean_torrent_map(
|
||||
config,
|
||||
&mut access_list_cache,
|
||||
&mut self.ipv6,
|
||||
now,
|
||||
&self.ipv6_peer_gauge,
|
||||
);
|
||||
self.ipv4.clean(config, &mut access_list_cache, now);
|
||||
self.ipv6.clean(config, &mut access_list_cache, now);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TorrentMap<I: Ip> {
|
||||
torrents: IndexMap<InfoHash, TorrentData<I>>,
|
||||
#[cfg(feature = "metrics")]
|
||||
peer_gauge: ::metrics::Gauge,
|
||||
#[cfg(feature = "metrics")]
|
||||
torrent_gauge: ::metrics::Gauge,
|
||||
}
|
||||
|
||||
impl<I: Ip> TorrentMap<I> {
|
||||
fn new(worker_index: usize, ipv4: bool) -> Self {
|
||||
#[cfg(feature = "metrics")]
|
||||
let peer_gauge = if ipv4 {
|
||||
::metrics::gauge!(
|
||||
"aquatic_peers",
|
||||
"ip_version" => "4",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
)
|
||||
} else {
|
||||
::metrics::gauge!(
|
||||
"aquatic_peers",
|
||||
"ip_version" => "6",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
)
|
||||
};
|
||||
#[cfg(feature = "metrics")]
|
||||
let torrent_gauge = if ipv4 {
|
||||
::metrics::gauge!(
|
||||
"aquatic_torrents",
|
||||
"ip_version" => "4",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
)
|
||||
} else {
|
||||
::metrics::gauge!(
|
||||
"aquatic_torrents",
|
||||
"ip_version" => "6",
|
||||
"worker_index" => worker_index.to_string(),
|
||||
)
|
||||
};
|
||||
|
||||
Self {
|
||||
torrents: Default::default(),
|
||||
#[cfg(feature = "metrics")]
|
||||
peer_gauge,
|
||||
#[cfg(feature = "metrics")]
|
||||
torrent_gauge,
|
||||
}
|
||||
}
|
||||
|
||||
fn clean_torrent_map<I: Ip>(
|
||||
fn upsert_peer_and_get_response_peers(
|
||||
&mut self,
|
||||
config: &Config,
|
||||
rng: &mut impl Rng,
|
||||
valid_until: ValidUntil,
|
||||
peer_ip_address: I,
|
||||
request: AnnounceRequest,
|
||||
) -> (usize, usize, Vec<ResponsePeer<I>>) {
|
||||
self.torrents
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.upsert_peer_and_get_response_peers(
|
||||
config,
|
||||
rng,
|
||||
request,
|
||||
peer_ip_address,
|
||||
valid_until,
|
||||
#[cfg(feature = "metrics")]
|
||||
&self.peer_gauge,
|
||||
)
|
||||
}
|
||||
|
||||
fn handle_scrape_request(&mut self, config: &Config, request: ScrapeRequest) -> ScrapeResponse {
|
||||
let num_to_take = request
|
||||
.info_hashes
|
||||
.len()
|
||||
.min(config.protocol.max_scrape_torrents);
|
||||
|
||||
let mut response = ScrapeResponse {
|
||||
files: BTreeMap::new(),
|
||||
};
|
||||
|
||||
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
|
||||
let stats = self
|
||||
.torrents
|
||||
.get(&info_hash)
|
||||
.map(|torrent_data| torrent_data.scrape_statistics())
|
||||
.unwrap_or(ScrapeStatistics {
|
||||
complete: 0,
|
||||
incomplete: 0,
|
||||
downloaded: 0,
|
||||
});
|
||||
|
||||
response.files.insert(info_hash, stats);
|
||||
}
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
fn clean(
|
||||
&mut self,
|
||||
config: &Config,
|
||||
access_list_cache: &mut AccessListCache,
|
||||
torrent_map: &mut TorrentMap<I>,
|
||||
now: SecondsSinceServerStart,
|
||||
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
|
||||
) {
|
||||
let mut total_num_peers = 0;
|
||||
|
||||
torrent_map.retain(|info_hash, torrent_data| {
|
||||
self.torrents.retain(|info_hash, torrent_data| {
|
||||
if !access_list_cache
|
||||
.load()
|
||||
.allows(config.access_list.mode, &info_hash.0)
|
||||
|
|
@ -215,116 +232,216 @@ impl TorrentMaps {
|
|||
return false;
|
||||
}
|
||||
|
||||
let num_seeders = &mut torrent_data.num_seeders;
|
||||
let num_peers = match torrent_data {
|
||||
TorrentData::Small(t) => t.clean_and_get_num_peers(now),
|
||||
TorrentData::Large(t) => t.clean_and_get_num_peers(now),
|
||||
};
|
||||
|
||||
torrent_data.peers.retain(|_, peer| {
|
||||
let keep = peer.valid_until.valid(now);
|
||||
total_num_peers += num_peers as u64;
|
||||
|
||||
if (!keep) & peer.seeder {
|
||||
*num_seeders -= 1;
|
||||
}
|
||||
|
||||
keep
|
||||
});
|
||||
|
||||
total_num_peers += torrent_data.peers.len() as u64;
|
||||
|
||||
!torrent_data.peers.is_empty()
|
||||
num_peers > 0
|
||||
});
|
||||
|
||||
let total_num_peers = total_num_peers as f64;
|
||||
self.torrents.shrink_to_fit();
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
peer_gauge.set(total_num_peers);
|
||||
|
||||
torrent_map.shrink_to_fit();
|
||||
self.peer_gauge.set(total_num_peers as f64);
|
||||
}
|
||||
}
|
||||
|
||||
pub type TorrentMap<I> = IndexMap<InfoHash, TorrentData<I>>;
|
||||
|
||||
pub struct TorrentData<I: Ip> {
|
||||
peers: IndexMap<ResponsePeer<I>, Peer>,
|
||||
num_seeders: usize,
|
||||
}
|
||||
|
||||
impl<I: Ip> Default for TorrentData<I> {
|
||||
#[inline]
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
peers: Default::default(),
|
||||
num_seeders: 0,
|
||||
}
|
||||
}
|
||||
pub enum TorrentData<I: Ip> {
|
||||
Small(SmallPeerMap<I>),
|
||||
Large(LargePeerMap<I>),
|
||||
}
|
||||
|
||||
impl<I: Ip> TorrentData<I> {
|
||||
fn num_leechers(&self) -> usize {
|
||||
self.peers.len() - self.num_seeders
|
||||
}
|
||||
|
||||
/// Insert/update peer. Return num_seeders, num_leechers and response peers
|
||||
fn upsert_peer_and_get_response_peers(
|
||||
&mut self,
|
||||
config: &Config,
|
||||
rng: &mut impl Rng,
|
||||
peer_ip_address: I,
|
||||
request: AnnounceRequest,
|
||||
ip_address: I,
|
||||
valid_until: ValidUntil,
|
||||
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
|
||||
) -> (usize, usize, Vec<ResponsePeer<I>>) {
|
||||
let peer_status =
|
||||
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
|
||||
let max_num_peers_to_take = match request.numwant {
|
||||
Some(0) | None => config.protocol.max_peers,
|
||||
Some(numwant) => numwant.min(config.protocol.max_peers),
|
||||
};
|
||||
|
||||
let status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left);
|
||||
|
||||
let peer_map_key = ResponsePeer {
|
||||
ip_address: peer_ip_address,
|
||||
ip_address,
|
||||
port: request.port,
|
||||
};
|
||||
|
||||
let opt_removed_peer = self.peers.remove(&peer_map_key);
|
||||
// Create the response before inserting the peer. This means that we
|
||||
// don't have to filter it out from the response peers, and that the
|
||||
// reported number of seeders/leechers will not include it
|
||||
let (response_data, opt_removed_peer) = match self {
|
||||
Self::Small(peer_map) => {
|
||||
let opt_removed_peer = peer_map.remove(&peer_map_key);
|
||||
|
||||
if let Some(Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
|
||||
self.num_seeders -= 1;
|
||||
}
|
||||
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
||||
let response_peers = peer_map.extract_response_peers(max_num_peers_to_take);
|
||||
|
||||
let response_peers = match peer_status {
|
||||
PeerStatus::Seeding | PeerStatus::Leeching => {
|
||||
// Convert peer map to large variant if it is full and
|
||||
// announcing peer is not stopped and will therefore be
|
||||
// inserted
|
||||
if peer_map.is_full() && status != PeerStatus::Stopped {
|
||||
*self = Self::Large(peer_map.to_large());
|
||||
}
|
||||
|
||||
((seeders, leechers, response_peers), opt_removed_peer)
|
||||
}
|
||||
Self::Large(peer_map) => {
|
||||
let opt_removed_peer = peer_map.remove_peer(&peer_map_key);
|
||||
|
||||
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
||||
let response_peers = peer_map.extract_response_peers(rng, max_num_peers_to_take);
|
||||
|
||||
// Try shrinking the map if announcing peer is stopped and
|
||||
// will therefore not be inserted
|
||||
if status == PeerStatus::Stopped {
|
||||
if let Some(peer_map) = peer_map.try_shrink() {
|
||||
*self = Self::Small(peer_map);
|
||||
}
|
||||
}
|
||||
|
||||
((seeders, leechers, response_peers), opt_removed_peer)
|
||||
}
|
||||
};
|
||||
|
||||
match status {
|
||||
PeerStatus::Leeching | PeerStatus::Seeding => {
|
||||
#[cfg(feature = "metrics")]
|
||||
if opt_removed_peer.is_none() {
|
||||
peer_gauge.increment(1.0);
|
||||
}
|
||||
|
||||
let max_num_peers_to_take = match request.numwant {
|
||||
Some(0) | None => config.protocol.max_peers,
|
||||
Some(numwant) => numwant.min(config.protocol.max_peers),
|
||||
};
|
||||
|
||||
let response_peers = self.extract_response_peers(rng, max_num_peers_to_take);
|
||||
|
||||
let peer = Peer {
|
||||
is_seeder: status == PeerStatus::Seeding,
|
||||
valid_until,
|
||||
seeder: peer_status == PeerStatus::Seeding,
|
||||
};
|
||||
|
||||
self.peers.insert(peer_map_key, peer);
|
||||
|
||||
if peer_status == PeerStatus::Seeding {
|
||||
self.num_seeders += 1;
|
||||
match self {
|
||||
Self::Small(peer_map) => peer_map.insert(peer_map_key, peer),
|
||||
Self::Large(peer_map) => peer_map.insert(peer_map_key, peer),
|
||||
}
|
||||
|
||||
response_peers
|
||||
}
|
||||
PeerStatus::Stopped => {
|
||||
PeerStatus::Stopped =>
|
||||
{
|
||||
#[cfg(feature = "metrics")]
|
||||
if opt_removed_peer.is_some() {
|
||||
peer_gauge.decrement(1.0);
|
||||
}
|
||||
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
(self.num_seeders, self.num_leechers(), response_peers)
|
||||
response_data
|
||||
}
|
||||
|
||||
fn scrape_statistics(&self) -> ScrapeStatistics {
|
||||
let (seeders, leechers) = match self {
|
||||
Self::Small(peer_map) => peer_map.num_seeders_leechers(),
|
||||
Self::Large(peer_map) => peer_map.num_seeders_leechers(),
|
||||
};
|
||||
|
||||
ScrapeStatistics {
|
||||
complete: seeders,
|
||||
incomplete: leechers,
|
||||
downloaded: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: Ip> Default for TorrentData<I> {
|
||||
fn default() -> Self {
|
||||
Self::Small(SmallPeerMap(ArrayVec::default()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Store torrents with very few peers without an extra heap allocation
|
||||
///
|
||||
/// On public open trackers, this is likely to be the majority of torrents.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct SmallPeerMap<I: Ip>(ArrayVec<(ResponsePeer<I>, Peer), SMALL_PEER_MAP_CAPACITY>);
|
||||
|
||||
impl<I: Ip> SmallPeerMap<I> {
|
||||
fn is_full(&self) -> bool {
|
||||
self.0.is_full()
|
||||
}
|
||||
|
||||
fn num_seeders_leechers(&self) -> (usize, usize) {
|
||||
let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count();
|
||||
let leechers = self.0.len() - seeders;
|
||||
|
||||
(seeders, leechers)
|
||||
}
|
||||
|
||||
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
||||
self.0.push((key, peer));
|
||||
}
|
||||
|
||||
fn remove(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
||||
for (i, (k, _)) in self.0.iter().enumerate() {
|
||||
if k == key {
|
||||
return Some(self.0.remove(i).1);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec<ResponsePeer<I>> {
|
||||
Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k))
|
||||
}
|
||||
|
||||
fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize {
|
||||
self.0.retain(|(_, peer)| peer.valid_until.valid(now));
|
||||
|
||||
self.0.len()
|
||||
}
|
||||
|
||||
fn to_large(&self) -> LargePeerMap<I> {
|
||||
let (num_seeders, _) = self.num_seeders_leechers();
|
||||
let peers = self.0.iter().copied().collect();
|
||||
|
||||
LargePeerMap { peers, num_seeders }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct LargePeerMap<I: Ip> {
|
||||
peers: IndexMap<ResponsePeer<I>, Peer>,
|
||||
num_seeders: usize,
|
||||
}
|
||||
|
||||
impl<I: Ip> LargePeerMap<I> {
|
||||
fn num_seeders_leechers(&self) -> (usize, usize) {
|
||||
(self.num_seeders, self.peers.len() - self.num_seeders)
|
||||
}
|
||||
|
||||
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
||||
if peer.is_seeder {
|
||||
self.num_seeders += 1;
|
||||
}
|
||||
|
||||
self.peers.insert(key, peer);
|
||||
}
|
||||
|
||||
fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
||||
let opt_removed_peer = self.peers.remove(key);
|
||||
|
||||
if let Some(Peer {
|
||||
is_seeder: true, ..
|
||||
}) = opt_removed_peer
|
||||
{
|
||||
self.num_seeders -= 1;
|
||||
}
|
||||
|
||||
opt_removed_peer
|
||||
}
|
||||
|
||||
/// Extract response peers
|
||||
|
|
@ -373,12 +490,36 @@ impl<I: Ip> TorrentData<I> {
|
|||
peers
|
||||
}
|
||||
}
|
||||
|
||||
fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize {
|
||||
self.peers.retain(|_, peer| {
|
||||
let keep = peer.valid_until.valid(now);
|
||||
|
||||
if (!keep) & peer.is_seeder {
|
||||
self.num_seeders -= 1;
|
||||
}
|
||||
|
||||
keep
|
||||
});
|
||||
|
||||
self.peers.shrink_to_fit();
|
||||
|
||||
self.peers.len()
|
||||
}
|
||||
|
||||
fn try_shrink(&mut self) -> Option<SmallPeerMap<I>> {
|
||||
(self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| {
|
||||
SmallPeerMap(ArrayVec::from_iter(
|
||||
self.peers.iter().map(|(k, v)| (*k, *v)),
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct Peer {
|
||||
pub valid_until: ValidUntil,
|
||||
pub seeder: bool,
|
||||
pub is_seeder: bool,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||
|
|
@ -389,14 +530,10 @@ enum PeerStatus {
|
|||
}
|
||||
|
||||
impl PeerStatus {
|
||||
/// Determine peer status from announce event and number of bytes left.
|
||||
///
|
||||
/// Likely, the last branch will be taken most of the time.
|
||||
#[inline]
|
||||
fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
|
||||
fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: usize) -> Self {
|
||||
if let AnnounceEvent::Stopped = event {
|
||||
Self::Stopped
|
||||
} else if let Some(0) = opt_bytes_left {
|
||||
} else if bytes_left == 0 {
|
||||
Self::Seeding
|
||||
} else {
|
||||
Self::Leeching
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue