mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: store torrents with few peers without an extra heap alloc
This commit is contained in:
parent
d8bd964a57
commit
e77c9f46e7
4 changed files with 269 additions and 138 deletions
|
|
@ -21,6 +21,8 @@
|
||||||
* Index peers by packet source IP and provided port, instead of by peer_id.
|
* Index peers by packet source IP and provided port, instead of by peer_id.
|
||||||
This prevents users from impersonating others and is likely also slightly
|
This prevents users from impersonating others and is likely also slightly
|
||||||
faster for IPv4 peers.
|
faster for IPv4 peers.
|
||||||
|
* Store torrents with up to two peers without an extra heap allocation for the
|
||||||
|
peers.
|
||||||
* Remove support for unbounded worker channels
|
* Remove support for unbounded worker channels
|
||||||
* Add backpressure in socket workers. They will postpone reading from the
|
* Add backpressure in socket workers. They will postpone reading from the
|
||||||
socket if sending a request to a swarm worker failed
|
socket if sending a request to a swarm worker failed
|
||||||
|
|
|
||||||
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -293,6 +293,7 @@ dependencies = [
|
||||||
"aquatic_common",
|
"aquatic_common",
|
||||||
"aquatic_toml_config",
|
"aquatic_toml_config",
|
||||||
"aquatic_udp_protocol",
|
"aquatic_udp_protocol",
|
||||||
|
"arrayvec",
|
||||||
"blake3",
|
"blake3",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"compact_str",
|
"compact_str",
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,7 @@ aquatic_toml_config.workspace = true
|
||||||
aquatic_udp_protocol.workspace = true
|
aquatic_udp_protocol.workspace = true
|
||||||
|
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
|
arrayvec = "0.7"
|
||||||
blake3 = "1"
|
blake3 = "1"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
compact_str = "0.7"
|
compact_str = "0.7"
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ use aquatic_common::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
|
use arrayvec::ArrayVec;
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::Sender;
|
||||||
use hdrhistogram::Histogram;
|
use hdrhistogram::Histogram;
|
||||||
use rand::prelude::SmallRng;
|
use rand::prelude::SmallRng;
|
||||||
|
|
@ -18,6 +19,8 @@ use rand::Rng;
|
||||||
use crate::common::*;
|
use crate::common::*;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
|
const SMALL_PEER_MAP_CAPACITY: usize = 2;
|
||||||
|
|
||||||
pub struct TorrentMaps {
|
pub struct TorrentMaps {
|
||||||
pub ipv4: TorrentMap<Ipv4AddrBytes>,
|
pub ipv4: TorrentMap<Ipv4AddrBytes>,
|
||||||
pub ipv6: TorrentMap<Ipv6AddrBytes>,
|
pub ipv6: TorrentMap<Ipv6AddrBytes>,
|
||||||
|
|
@ -84,7 +87,11 @@ impl<I: Ip> TorrentMap<I> {
|
||||||
.0
|
.0
|
||||||
.get(&info_hash)
|
.get(&info_hash)
|
||||||
.map(|torrent_data| torrent_data.scrape_statistics())
|
.map(|torrent_data| torrent_data.scrape_statistics())
|
||||||
.unwrap_or_else(|| create_torrent_scrape_statistics(0, 0));
|
.unwrap_or_else(|| TorrentScrapeStatistics {
|
||||||
|
seeders: NumberOfPeers::new(0),
|
||||||
|
leechers: NumberOfPeers::new(0),
|
||||||
|
completed: NumberOfDownloads::new(0),
|
||||||
|
});
|
||||||
|
|
||||||
(i, stats)
|
(i, stats)
|
||||||
});
|
});
|
||||||
|
|
@ -100,7 +107,7 @@ impl<I: Ip> TorrentMap<I> {
|
||||||
access_list_mode: AccessListMode,
|
access_list_mode: AccessListMode,
|
||||||
now: SecondsSinceServerStart,
|
now: SecondsSinceServerStart,
|
||||||
) -> (usize, Option<Histogram<u64>>) {
|
) -> (usize, Option<Histogram<u64>>) {
|
||||||
let mut num_peers = 0;
|
let mut total_num_peers = 0;
|
||||||
|
|
||||||
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
|
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
|
||||||
{
|
{
|
||||||
|
|
@ -124,17 +131,27 @@ impl<I: Ip> TorrentMap<I> {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
torrent.clean(config, statistics_sender, now);
|
let num_peers = match torrent {
|
||||||
|
TorrentData::Small(peer_map) => {
|
||||||
|
peer_map.clean_and_get_num_peers(config, statistics_sender, now)
|
||||||
|
}
|
||||||
|
TorrentData::Large(peer_map) => {
|
||||||
|
let num_peers =
|
||||||
|
peer_map.clean_and_get_num_peers(config, statistics_sender, now);
|
||||||
|
|
||||||
num_peers += torrent.peers.len();
|
if let Some(peer_map) = peer_map.try_shrink() {
|
||||||
|
*torrent = TorrentData::Small(peer_map);
|
||||||
|
}
|
||||||
|
|
||||||
|
num_peers
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
total_num_peers += num_peers;
|
||||||
|
|
||||||
match opt_histogram {
|
match opt_histogram {
|
||||||
Some(ref mut histogram) if torrent.peers.len() != 0 => {
|
Some(ref mut histogram) if num_peers > 0 => {
|
||||||
let n = torrent
|
let n = num_peers.try_into().expect("Couldn't fit usize into u64");
|
||||||
.peers
|
|
||||||
.len()
|
|
||||||
.try_into()
|
|
||||||
.expect("Couldn't fit usize into u64");
|
|
||||||
|
|
||||||
if let Err(err) = histogram.record(n) {
|
if let Err(err) = histogram.record(n) {
|
||||||
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
|
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
|
||||||
|
|
@ -143,12 +160,12 @@ impl<I: Ip> TorrentMap<I> {
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
!torrent.peers.is_empty()
|
num_peers > 0
|
||||||
});
|
});
|
||||||
|
|
||||||
self.0.shrink_to_fit();
|
self.0.shrink_to_fit();
|
||||||
|
|
||||||
(num_peers, opt_histogram)
|
(total_num_peers, opt_histogram)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_torrents(&self) -> usize {
|
pub fn num_torrents(&self) -> usize {
|
||||||
|
|
@ -156,9 +173,9 @@ impl<I: Ip> TorrentMap<I> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TorrentData<I: Ip> {
|
pub enum TorrentData<I: Ip> {
|
||||||
peers: IndexMap<ResponsePeer<I>, Peer>,
|
Small(SmallPeerMap<I>),
|
||||||
num_seeders: usize,
|
Large(LargePeerMap<I>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Ip> TorrentData<I> {
|
impl<I: Ip> TorrentData<I> {
|
||||||
|
|
@ -189,60 +206,75 @@ impl<I: Ip> TorrentData<I> {
|
||||||
port: request.port,
|
port: request.port,
|
||||||
};
|
};
|
||||||
|
|
||||||
let opt_removed_peer = self.peers.remove(&peer_map_key);
|
|
||||||
|
|
||||||
if let Some(Peer {
|
|
||||||
is_seeder: true, ..
|
|
||||||
}) = opt_removed_peer
|
|
||||||
{
|
|
||||||
self.num_seeders -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the response before inserting the peer. This means that we
|
// Create the response before inserting the peer. This means that we
|
||||||
// don't have to filter it out from the response peers, and that the
|
// don't have to filter it out from the response peers, and that the
|
||||||
// reported number of seeders/leechers will not include it
|
// reported number of seeders/leechers will not include it
|
||||||
|
let opt_removed_peer = match self {
|
||||||
|
Self::Small(peer_map) => {
|
||||||
|
let opt_removed_peer = peer_map.remove(&peer_map_key);
|
||||||
|
|
||||||
|
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
||||||
|
|
||||||
response.fixed = AnnounceResponseFixedData {
|
response.fixed = AnnounceResponseFixedData {
|
||||||
transaction_id: request.transaction_id,
|
transaction_id: request.transaction_id,
|
||||||
announce_interval: AnnounceInterval::new(config.protocol.peer_announce_interval),
|
announce_interval: AnnounceInterval::new(
|
||||||
leechers: NumberOfPeers::new(self.num_leechers().try_into().unwrap_or(i32::MAX)),
|
config.protocol.peer_announce_interval,
|
||||||
seeders: NumberOfPeers::new(self.num_seeders().try_into().unwrap_or(i32::MAX)),
|
),
|
||||||
|
leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
|
||||||
|
seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
|
||||||
};
|
};
|
||||||
|
|
||||||
extract_response_peers(
|
peer_map.extract_response_peers(max_num_peers_to_take, &mut response.peers);
|
||||||
rng,
|
|
||||||
&self.peers,
|
// Convert peer map to large variant if it is full and
|
||||||
max_num_peers_to_take,
|
// announcing peer is not stopped and will therefore be
|
||||||
|k, _| *k,
|
// inserted
|
||||||
&mut response.peers,
|
if peer_map.is_full() && status != PeerStatus::Stopped {
|
||||||
);
|
*self = Self::Large(peer_map.to_large());
|
||||||
|
}
|
||||||
|
|
||||||
|
opt_removed_peer
|
||||||
|
}
|
||||||
|
Self::Large(peer_map) => {
|
||||||
|
let opt_removed_peer = peer_map.remove_peer(&peer_map_key);
|
||||||
|
|
||||||
|
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
||||||
|
|
||||||
|
response.fixed = AnnounceResponseFixedData {
|
||||||
|
transaction_id: request.transaction_id,
|
||||||
|
announce_interval: AnnounceInterval::new(
|
||||||
|
config.protocol.peer_announce_interval,
|
||||||
|
),
|
||||||
|
leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
|
||||||
|
seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
|
||||||
|
};
|
||||||
|
|
||||||
|
peer_map.extract_response_peers(rng, max_num_peers_to_take, &mut response.peers);
|
||||||
|
|
||||||
|
// Try shrinking the map if announcing peer is stopped and
|
||||||
|
// will therefore not be inserted
|
||||||
|
if status == PeerStatus::Stopped {
|
||||||
|
if let Some(peer_map) = peer_map.try_shrink() {
|
||||||
|
*self = Self::Small(peer_map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
opt_removed_peer
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
match status {
|
match status {
|
||||||
PeerStatus::Leeching => {
|
PeerStatus::Leeching | PeerStatus::Seeding => {
|
||||||
let peer = Peer {
|
let peer = Peer {
|
||||||
peer_id: request.peer_id,
|
peer_id: request.peer_id,
|
||||||
is_seeder: false,
|
is_seeder: status == PeerStatus::Seeding,
|
||||||
valid_until,
|
valid_until,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.peers.insert(peer_map_key, peer);
|
match self {
|
||||||
|
Self::Small(peer_map) => peer_map.insert(peer_map_key, peer),
|
||||||
if config.statistics.peer_clients && opt_removed_peer.is_none() {
|
Self::Large(peer_map) => peer_map.insert(peer_map_key, peer),
|
||||||
statistics_sender
|
|
||||||
.try_send(StatisticsMessage::PeerAdded(request.peer_id))
|
|
||||||
.expect("statistics channel should be unbounded");
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
PeerStatus::Seeding => {
|
|
||||||
let peer = Peer {
|
|
||||||
peer_id: request.peer_id,
|
|
||||||
is_seeder: true,
|
|
||||||
valid_until,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.peers.insert(peer_map_key, peer);
|
|
||||||
|
|
||||||
self.num_seeders += 1;
|
|
||||||
|
|
||||||
if config.statistics.peer_clients && opt_removed_peer.is_none() {
|
if config.statistics.peer_clients && opt_removed_peer.is_none() {
|
||||||
statistics_sender
|
statistics_sender
|
||||||
|
|
@ -260,28 +292,180 @@ impl<I: Ip> TorrentData<I> {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_leechers(&self) -> usize {
|
|
||||||
self.peers.len() - self.num_seeders
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn num_seeders(&self) -> usize {
|
|
||||||
self.num_seeders
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn scrape_statistics(&self) -> TorrentScrapeStatistics {
|
pub fn scrape_statistics(&self) -> TorrentScrapeStatistics {
|
||||||
create_torrent_scrape_statistics(
|
let (seeders, leechers) = match self {
|
||||||
self.num_seeders.try_into().unwrap_or(i32::MAX),
|
Self::Small(peer_map) => peer_map.num_seeders_leechers(),
|
||||||
self.num_leechers().try_into().unwrap_or(i32::MAX),
|
Self::Large(peer_map) => peer_map.num_seeders_leechers(),
|
||||||
)
|
};
|
||||||
|
|
||||||
|
TorrentScrapeStatistics {
|
||||||
|
seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
|
||||||
|
leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
|
||||||
|
completed: NumberOfDownloads::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove inactive peers and reclaim space
|
impl<I: Ip> Default for TorrentData<I> {
|
||||||
fn clean(
|
fn default() -> Self {
|
||||||
|
Self::Small(SmallPeerMap(ArrayVec::default()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store torrents with up to two peers without an extra heap allocation
|
||||||
|
///
|
||||||
|
/// On public open trackers, this is likely to be the majority of torrents.
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct SmallPeerMap<I: Ip>(ArrayVec<(ResponsePeer<I>, Peer), SMALL_PEER_MAP_CAPACITY>);
|
||||||
|
|
||||||
|
impl<I: Ip> SmallPeerMap<I> {
|
||||||
|
fn is_full(&self) -> bool {
|
||||||
|
self.0.is_full()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn num_seeders_leechers(&self) -> (usize, usize) {
|
||||||
|
let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count();
|
||||||
|
let leechers = self.0.len() - seeders;
|
||||||
|
|
||||||
|
(seeders, leechers)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
||||||
|
self.0.push((key, peer));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
||||||
|
for (i, (k, _)) in self.0.iter().enumerate() {
|
||||||
|
if k == key {
|
||||||
|
return Some(self.0.remove(i).1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_response_peers(
|
||||||
|
&self,
|
||||||
|
max_num_peers_to_take: usize,
|
||||||
|
peers: &mut Vec<ResponsePeer<I>>,
|
||||||
|
) {
|
||||||
|
peers.extend(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| k))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clean_and_get_num_peers(
|
||||||
&mut self,
|
&mut self,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
statistics_sender: &Sender<StatisticsMessage>,
|
||||||
now: SecondsSinceServerStart,
|
now: SecondsSinceServerStart,
|
||||||
|
) -> usize {
|
||||||
|
self.0.retain(|(_, peer)| {
|
||||||
|
let keep = peer.valid_until.valid(now);
|
||||||
|
|
||||||
|
if !keep && config.statistics.peer_clients {
|
||||||
|
if let Err(_) =
|
||||||
|
statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
|
||||||
|
{
|
||||||
|
// Should never happen in practice
|
||||||
|
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
keep
|
||||||
|
});
|
||||||
|
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_large(&self) -> LargePeerMap<I> {
|
||||||
|
let (num_seeders, _) = self.num_seeders_leechers();
|
||||||
|
let peers = self.0.iter().copied().collect();
|
||||||
|
|
||||||
|
LargePeerMap { peers, num_seeders }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct LargePeerMap<I: Ip> {
|
||||||
|
peers: IndexMap<ResponsePeer<I>, Peer>,
|
||||||
|
num_seeders: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: Ip> LargePeerMap<I> {
|
||||||
|
fn num_seeders_leechers(&self) -> (usize, usize) {
|
||||||
|
(self.num_seeders, self.peers.len() - self.num_seeders)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
||||||
|
if peer.is_seeder {
|
||||||
|
self.num_seeders += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.peers.insert(key, peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
||||||
|
let opt_removed_peer = self.peers.remove(key);
|
||||||
|
|
||||||
|
if let Some(Peer {
|
||||||
|
is_seeder: true, ..
|
||||||
|
}) = opt_removed_peer
|
||||||
|
{
|
||||||
|
self.num_seeders -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
opt_removed_peer
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract response peers
|
||||||
|
///
|
||||||
|
/// If there are more peers in map than `max_num_peers_to_take`, do a random
|
||||||
|
/// selection of peers from first and second halves of map in order to avoid
|
||||||
|
/// returning too homogeneous peers.
|
||||||
|
///
|
||||||
|
/// Does NOT filter out announcing peer.
|
||||||
|
pub fn extract_response_peers(
|
||||||
|
&self,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
max_num_peers_to_take: usize,
|
||||||
|
peers: &mut Vec<ResponsePeer<I>>,
|
||||||
) {
|
) {
|
||||||
|
if self.peers.len() <= max_num_peers_to_take {
|
||||||
|
peers.extend(self.peers.keys());
|
||||||
|
} else {
|
||||||
|
let middle_index = self.peers.len() / 2;
|
||||||
|
let num_to_take_per_half = max_num_peers_to_take / 2;
|
||||||
|
|
||||||
|
let offset_half_one = {
|
||||||
|
let from = 0;
|
||||||
|
let to = usize::max(1, middle_index - num_to_take_per_half);
|
||||||
|
|
||||||
|
rng.gen_range(from..to)
|
||||||
|
};
|
||||||
|
let offset_half_two = {
|
||||||
|
let from = middle_index;
|
||||||
|
let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half);
|
||||||
|
|
||||||
|
rng.gen_range(from..to)
|
||||||
|
};
|
||||||
|
|
||||||
|
let end_half_one = offset_half_one + num_to_take_per_half;
|
||||||
|
let end_half_two = offset_half_two + num_to_take_per_half;
|
||||||
|
|
||||||
|
if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) {
|
||||||
|
peers.extend(slice.keys());
|
||||||
|
}
|
||||||
|
if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) {
|
||||||
|
peers.extend(slice.keys());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clean_and_get_num_peers(
|
||||||
|
&mut self,
|
||||||
|
config: &Config,
|
||||||
|
statistics_sender: &Sender<StatisticsMessage>,
|
||||||
|
now: SecondsSinceServerStart,
|
||||||
|
) -> usize {
|
||||||
self.peers.retain(|_, peer| {
|
self.peers.retain(|_, peer| {
|
||||||
let keep = peer.valid_until.valid(now);
|
let keep = peer.valid_until.valid(now);
|
||||||
|
|
||||||
|
|
@ -305,79 +489,22 @@ impl<I: Ip> TorrentData<I> {
|
||||||
if !self.peers.is_empty() {
|
if !self.peers.is_empty() {
|
||||||
self.peers.shrink_to_fit();
|
self.peers.shrink_to_fit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.peers.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_shrink(&mut self) -> Option<SmallPeerMap<I>> {
|
||||||
|
(self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| {
|
||||||
|
SmallPeerMap(ArrayVec::from_iter(
|
||||||
|
self.peers.iter().map(|(k, v)| (*k, *v)),
|
||||||
|
))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Ip> Default for TorrentData<I> {
|
#[derive(Clone, Copy, Debug)]
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
peers: Default::default(),
|
|
||||||
num_seeders: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
struct Peer {
|
struct Peer {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
is_seeder: bool,
|
is_seeder: bool,
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract response peers
|
|
||||||
///
|
|
||||||
/// If there are more peers in map than `max_num_peers_to_take`, do a random
|
|
||||||
/// selection of peers from first and second halves of map in order to avoid
|
|
||||||
/// returning too homogeneous peers.
|
|
||||||
///
|
|
||||||
/// Does NOT filter out announcing peer.
|
|
||||||
#[inline]
|
|
||||||
pub fn extract_response_peers<K, V, R, F>(
|
|
||||||
rng: &mut impl Rng,
|
|
||||||
peer_map: &IndexMap<K, V>,
|
|
||||||
max_num_peers_to_take: usize,
|
|
||||||
peer_conversion_function: F,
|
|
||||||
peers: &mut Vec<R>,
|
|
||||||
) where
|
|
||||||
K: Eq + ::std::hash::Hash,
|
|
||||||
F: Fn(&K, &V) -> R,
|
|
||||||
{
|
|
||||||
if peer_map.len() <= max_num_peers_to_take {
|
|
||||||
peers.extend(peer_map.iter().map(|(k, v)| peer_conversion_function(k, v)));
|
|
||||||
} else {
|
|
||||||
let middle_index = peer_map.len() / 2;
|
|
||||||
let num_to_take_per_half = max_num_peers_to_take / 2;
|
|
||||||
|
|
||||||
let offset_half_one = {
|
|
||||||
let from = 0;
|
|
||||||
let to = usize::max(1, middle_index - num_to_take_per_half);
|
|
||||||
|
|
||||||
rng.gen_range(from..to)
|
|
||||||
};
|
|
||||||
let offset_half_two = {
|
|
||||||
let from = middle_index;
|
|
||||||
let to = usize::max(middle_index + 1, peer_map.len() - num_to_take_per_half);
|
|
||||||
|
|
||||||
rng.gen_range(from..to)
|
|
||||||
};
|
|
||||||
|
|
||||||
let end_half_one = offset_half_one + num_to_take_per_half;
|
|
||||||
let end_half_two = offset_half_two + num_to_take_per_half;
|
|
||||||
|
|
||||||
if let Some(slice) = peer_map.get_range(offset_half_one..end_half_one) {
|
|
||||||
peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v)));
|
|
||||||
}
|
|
||||||
if let Some(slice) = peer_map.get_range(offset_half_two..end_half_two) {
|
|
||||||
peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics {
|
|
||||||
TorrentScrapeStatistics {
|
|
||||||
seeders: NumberOfPeers::new(seeders),
|
|
||||||
completed: NumberOfDownloads::new(0), // No implementation planned
|
|
||||||
leechers: NumberOfPeers::new(leechers),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue