mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: remove swarm worker and related logic
This commit is contained in:
parent
a2e1dd4eef
commit
7fa143964e
7 changed files with 4 additions and 1082 deletions
|
|
@ -1,13 +1,9 @@
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::hash::Hash;
|
|
||||||
use std::iter::repeat_with;
|
use std::iter::repeat_with;
|
||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crossbeam_channel::{Receiver, SendError, Sender, TrySendError};
|
|
||||||
|
|
||||||
use aquatic_common::access_list::AccessListArcSwap;
|
use aquatic_common::access_list::AccessListArcSwap;
|
||||||
use aquatic_common::{CanonicalSocketAddr, ServerStartInstant};
|
use aquatic_common::ServerStartInstant;
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
use crossbeam_utils::CachePadded;
|
use crossbeam_utils::CachePadded;
|
||||||
use hdrhistogram::Histogram;
|
use hdrhistogram::Histogram;
|
||||||
|
|
@ -33,141 +29,6 @@ impl IpVersion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
pub struct SocketWorkerIndex(pub usize);
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
|
||||||
pub struct SwarmWorkerIndex(pub usize);
|
|
||||||
|
|
||||||
impl SwarmWorkerIndex {
|
|
||||||
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
|
|
||||||
Self(info_hash.0[0] as usize % config.swarm_workers)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct PendingScrapeRequest {
|
|
||||||
pub slab_key: usize,
|
|
||||||
pub info_hashes: BTreeMap<usize, InfoHash>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct PendingScrapeResponse {
|
|
||||||
pub slab_key: usize,
|
|
||||||
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum ConnectedRequest {
|
|
||||||
Announce(AnnounceRequest),
|
|
||||||
Scrape(PendingScrapeRequest),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum ConnectedResponse {
|
|
||||||
AnnounceIpv4(AnnounceResponse<Ipv4AddrBytes>),
|
|
||||||
AnnounceIpv6(AnnounceResponse<Ipv6AddrBytes>),
|
|
||||||
Scrape(PendingScrapeResponse),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ConnectedRequestSender {
|
|
||||||
index: SocketWorkerIndex,
|
|
||||||
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectedRequestSender {
|
|
||||||
pub fn new(
|
|
||||||
index: SocketWorkerIndex,
|
|
||||||
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>>,
|
|
||||||
) -> Self {
|
|
||||||
Self { index, senders }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn try_send_to(
|
|
||||||
&self,
|
|
||||||
index: SwarmWorkerIndex,
|
|
||||||
request: ConnectedRequest,
|
|
||||||
addr: CanonicalSocketAddr,
|
|
||||||
) -> Result<(), (SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)> {
|
|
||||||
match self.senders[index.0].try_send((self.index, request, addr)) {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(TrySendError::Full(r)) => Err((index, r.1, r.2)),
|
|
||||||
Err(TrySendError::Disconnected(_)) => {
|
|
||||||
panic!("Request channel {} is disconnected", index.0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ConnectedResponseSender {
|
|
||||||
senders: Vec<Sender<(CanonicalSocketAddr, ConnectedResponse)>>,
|
|
||||||
to_any_last_index_picked: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectedResponseSender {
|
|
||||||
pub fn new(senders: Vec<Sender<(CanonicalSocketAddr, ConnectedResponse)>>) -> Self {
|
|
||||||
Self {
|
|
||||||
senders,
|
|
||||||
to_any_last_index_picked: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn try_send_to(
|
|
||||||
&self,
|
|
||||||
index: SocketWorkerIndex,
|
|
||||||
addr: CanonicalSocketAddr,
|
|
||||||
response: ConnectedResponse,
|
|
||||||
) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> {
|
|
||||||
self.senders[index.0].try_send((addr, response))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_to(
|
|
||||||
&self,
|
|
||||||
index: SocketWorkerIndex,
|
|
||||||
addr: CanonicalSocketAddr,
|
|
||||||
response: ConnectedResponse,
|
|
||||||
) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> {
|
|
||||||
self.senders[index.0].send((addr, response))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_to_any(
|
|
||||||
&mut self,
|
|
||||||
addr: CanonicalSocketAddr,
|
|
||||||
response: ConnectedResponse,
|
|
||||||
) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> {
|
|
||||||
let start = self.to_any_last_index_picked + 1;
|
|
||||||
|
|
||||||
let mut message = Some((addr, response));
|
|
||||||
|
|
||||||
for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) {
|
|
||||||
match self.senders[i].try_send(message.take().unwrap()) {
|
|
||||||
Ok(()) => {
|
|
||||||
self.to_any_last_index_picked = i;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
Err(TrySendError::Full(msg)) => {
|
|
||||||
message = Some(msg);
|
|
||||||
}
|
|
||||||
Err(TrySendError::Disconnected(_)) => {
|
|
||||||
panic!("ConnectedResponseReceiver disconnected");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let (addr, response) = message.unwrap();
|
|
||||||
|
|
||||||
self.to_any_last_index_picked = start % self.senders.len();
|
|
||||||
self.send_to(
|
|
||||||
SocketWorkerIndex(self.to_any_last_index_picked),
|
|
||||||
addr,
|
|
||||||
response,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Statistics {
|
pub struct Statistics {
|
||||||
pub socket: Vec<CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>>,
|
pub socket: Vec<CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>>,
|
||||||
|
|
|
||||||
|
|
@ -3,13 +3,12 @@ pub mod config;
|
||||||
pub mod swarm;
|
pub mod swarm;
|
||||||
pub mod workers;
|
pub mod workers;
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::thread::{sleep, Builder, JoinHandle};
|
use std::thread::{sleep, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use aquatic_common::WorkerType;
|
use aquatic_common::WorkerType;
|
||||||
use crossbeam_channel::{bounded, unbounded};
|
use crossbeam_channel::unbounded;
|
||||||
use signal_hook::consts::SIGUSR1;
|
use signal_hook::consts::SIGUSR1;
|
||||||
use signal_hook::iterator::Signals;
|
use signal_hook::iterator::Signals;
|
||||||
|
|
||||||
|
|
@ -18,14 +17,9 @@ use aquatic_common::access_list::update_access_list;
|
||||||
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use aquatic_common::privileges::PrivilegeDropper;
|
use aquatic_common::privileges::PrivilegeDropper;
|
||||||
|
|
||||||
use common::{
|
use common::{State, Statistics};
|
||||||
ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, Statistics,
|
|
||||||
SwarmWorkerIndex,
|
|
||||||
};
|
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use swarm::TorrentMaps;
|
|
||||||
use workers::socket::ConnectionValidator;
|
use workers::socket::ConnectionValidator;
|
||||||
use workers::swarm::SwarmWorker;
|
|
||||||
|
|
||||||
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
|
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
|
||||||
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
|
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,2 @@
|
||||||
pub mod socket;
|
pub mod socket;
|
||||||
pub mod statistics;
|
pub mod statistics;
|
||||||
pub mod swarm;
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,4 @@
|
||||||
mod mio;
|
mod mio;
|
||||||
mod storage;
|
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
mod uring;
|
mod uring;
|
||||||
mod validator;
|
mod validator;
|
||||||
|
|
@ -11,8 +10,7 @@ use socket2::{Domain, Protocol, Socket, Type};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
common::{
|
common::{
|
||||||
CachePaddedArc, ConnectedRequestSender, ConnectedResponseReceiver, IpVersionStatistics,
|
CachePaddedArc, IpVersionStatistics, SocketWorkerStatistics, State, StatisticsMessage,
|
||||||
SocketWorkerStatistics, State, StatisticsMessage,
|
|
||||||
},
|
},
|
||||||
config::Config,
|
config::Config,
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,218 +0,0 @@
|
||||||
use std::collections::BTreeMap;
|
|
||||||
|
|
||||||
use hashbrown::HashMap;
|
|
||||||
use slab::Slab;
|
|
||||||
|
|
||||||
use aquatic_common::{SecondsSinceServerStart, ValidUntil};
|
|
||||||
use aquatic_udp_protocol::*;
|
|
||||||
|
|
||||||
use crate::common::*;
|
|
||||||
use crate::config::Config;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct PendingScrapeResponseSlabEntry {
|
|
||||||
num_pending: usize,
|
|
||||||
valid_until: ValidUntil,
|
|
||||||
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
|
|
||||||
transaction_id: TransactionId,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct PendingScrapeResponseSlab(Slab<PendingScrapeResponseSlabEntry>);
|
|
||||||
|
|
||||||
impl PendingScrapeResponseSlab {
|
|
||||||
pub fn prepare_split_requests(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
request: ScrapeRequest,
|
|
||||||
valid_until: ValidUntil,
|
|
||||||
) -> impl IntoIterator<Item = (SwarmWorkerIndex, PendingScrapeRequest)> {
|
|
||||||
let capacity = config.swarm_workers.min(request.info_hashes.len());
|
|
||||||
let mut split_requests: HashMap<SwarmWorkerIndex, PendingScrapeRequest> =
|
|
||||||
HashMap::with_capacity(capacity);
|
|
||||||
|
|
||||||
if request.info_hashes.is_empty() {
|
|
||||||
::log::warn!(
|
|
||||||
"Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes"
|
|
||||||
);
|
|
||||||
|
|
||||||
return split_requests;
|
|
||||||
}
|
|
||||||
|
|
||||||
let vacant_entry = self.0.vacant_entry();
|
|
||||||
let slab_key = vacant_entry.key();
|
|
||||||
|
|
||||||
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
|
|
||||||
let split_request = split_requests
|
|
||||||
.entry(SwarmWorkerIndex::from_info_hash(config, info_hash))
|
|
||||||
.or_insert_with(|| PendingScrapeRequest {
|
|
||||||
slab_key,
|
|
||||||
info_hashes: BTreeMap::new(),
|
|
||||||
});
|
|
||||||
|
|
||||||
split_request.info_hashes.insert(i, info_hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
vacant_entry.insert(PendingScrapeResponseSlabEntry {
|
|
||||||
num_pending: split_requests.len(),
|
|
||||||
valid_until,
|
|
||||||
torrent_stats: Default::default(),
|
|
||||||
transaction_id: request.transaction_id,
|
|
||||||
});
|
|
||||||
|
|
||||||
split_requests
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add_and_get_finished(
|
|
||||||
&mut self,
|
|
||||||
response: &PendingScrapeResponse,
|
|
||||||
) -> Option<ScrapeResponse> {
|
|
||||||
let finished = if let Some(entry) = self.0.get_mut(response.slab_key) {
|
|
||||||
entry.num_pending -= 1;
|
|
||||||
|
|
||||||
entry.torrent_stats.extend(response.torrent_stats.iter());
|
|
||||||
|
|
||||||
entry.num_pending == 0
|
|
||||||
} else {
|
|
||||||
::log::warn!(
|
|
||||||
"PendingScrapeResponseSlab.add didn't find entry for key {:?}",
|
|
||||||
response.slab_key
|
|
||||||
);
|
|
||||||
|
|
||||||
false
|
|
||||||
};
|
|
||||||
|
|
||||||
if finished {
|
|
||||||
let entry = self.0.remove(response.slab_key);
|
|
||||||
|
|
||||||
Some(ScrapeResponse {
|
|
||||||
transaction_id: entry.transaction_id,
|
|
||||||
torrent_stats: entry.torrent_stats.into_values().collect(),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clean(&mut self, now: SecondsSinceServerStart) {
|
|
||||||
self.0.retain(|k, v| {
|
|
||||||
if v.valid_until.valid(now) {
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
::log::warn!(
|
|
||||||
"Unconsumed PendingScrapeResponseSlab entry. {:?}: {:?}",
|
|
||||||
k,
|
|
||||||
v
|
|
||||||
);
|
|
||||||
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
self.0.shrink_to_fit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use aquatic_common::ServerStartInstant;
|
|
||||||
use quickcheck::TestResult;
|
|
||||||
use quickcheck_macros::quickcheck;
|
|
||||||
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[quickcheck]
|
|
||||||
fn test_pending_scrape_response_slab(
|
|
||||||
request_data: Vec<(i32, i64, u8)>,
|
|
||||||
swarm_workers: u8,
|
|
||||||
) -> TestResult {
|
|
||||||
if swarm_workers == 0 {
|
|
||||||
return TestResult::discard();
|
|
||||||
}
|
|
||||||
|
|
||||||
let config = Config {
|
|
||||||
swarm_workers: swarm_workers as usize,
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let valid_until = ValidUntil::new(ServerStartInstant::new(), 1);
|
|
||||||
|
|
||||||
let mut map = PendingScrapeResponseSlab::default();
|
|
||||||
|
|
||||||
let mut requests = Vec::new();
|
|
||||||
|
|
||||||
for (t, c, b) in request_data {
|
|
||||||
if b == 0 {
|
|
||||||
return TestResult::discard();
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut info_hashes = Vec::new();
|
|
||||||
|
|
||||||
for i in 0..b {
|
|
||||||
let info_hash = InfoHash([i; 20]);
|
|
||||||
|
|
||||||
info_hashes.push(info_hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
let request = ScrapeRequest {
|
|
||||||
transaction_id: TransactionId::new(t),
|
|
||||||
connection_id: ConnectionId::new(c),
|
|
||||||
info_hashes,
|
|
||||||
};
|
|
||||||
|
|
||||||
requests.push(request);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut all_split_requests = Vec::new();
|
|
||||||
|
|
||||||
for request in requests.iter() {
|
|
||||||
let split_requests =
|
|
||||||
map.prepare_split_requests(&config, request.to_owned(), valid_until);
|
|
||||||
|
|
||||||
all_split_requests.push(
|
|
||||||
split_requests
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Vec<(SwarmWorkerIndex, PendingScrapeRequest)>>(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(map.0.len(), requests.len());
|
|
||||||
|
|
||||||
let mut responses = Vec::new();
|
|
||||||
|
|
||||||
for split_requests in all_split_requests {
|
|
||||||
for (worker_index, split_request) in split_requests {
|
|
||||||
assert!(worker_index.0 < swarm_workers as usize);
|
|
||||||
|
|
||||||
let torrent_stats = split_request
|
|
||||||
.info_hashes
|
|
||||||
.into_iter()
|
|
||||||
.map(|(i, info_hash)| {
|
|
||||||
(
|
|
||||||
i,
|
|
||||||
TorrentScrapeStatistics {
|
|
||||||
seeders: NumberOfPeers::new((info_hash.0[0]) as i32),
|
|
||||||
leechers: NumberOfPeers::new(0),
|
|
||||||
completed: NumberOfDownloads::new(0),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let response = PendingScrapeResponse {
|
|
||||||
slab_key: split_request.slab_key,
|
|
||||||
torrent_stats,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(response) = map.add_and_get_finished(&response) {
|
|
||||||
responses.push(response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert!(map.0.is_empty());
|
|
||||||
assert_eq!(responses.len(), requests.len());
|
|
||||||
|
|
||||||
TestResult::from_bool(true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,149 +0,0 @@
|
||||||
mod storage;
|
|
||||||
|
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::time::Duration;
|
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
use crossbeam_channel::Receiver;
|
|
||||||
use crossbeam_channel::Sender;
|
|
||||||
use rand::{rngs::SmallRng, SeedableRng};
|
|
||||||
|
|
||||||
use aquatic_common::{CanonicalSocketAddr, ValidUntil};
|
|
||||||
|
|
||||||
use crate::common::*;
|
|
||||||
use crate::config::Config;
|
|
||||||
|
|
||||||
use storage::TorrentMaps;
|
|
||||||
|
|
||||||
pub struct SwarmWorker {
|
|
||||||
pub config: Config,
|
|
||||||
pub state: State,
|
|
||||||
pub statistics: CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
|
|
||||||
pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
|
||||||
pub response_sender: ConnectedResponseSender,
|
|
||||||
pub statistics_sender: Sender<StatisticsMessage>,
|
|
||||||
pub worker_index: SwarmWorkerIndex,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SwarmWorker {
|
|
||||||
pub fn run(&mut self) -> anyhow::Result<()> {
|
|
||||||
let mut torrents = TorrentMaps::default();
|
|
||||||
let mut rng = SmallRng::from_entropy();
|
|
||||||
|
|
||||||
let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms);
|
|
||||||
let mut peer_valid_until = ValidUntil::new(
|
|
||||||
self.state.server_start_instant,
|
|
||||||
self.config.cleaning.max_peer_age,
|
|
||||||
);
|
|
||||||
|
|
||||||
let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval);
|
|
||||||
let statistics_update_interval = Duration::from_secs(self.config.statistics.interval);
|
|
||||||
|
|
||||||
let mut last_cleaning = Instant::now();
|
|
||||||
let mut last_statistics_update = Instant::now();
|
|
||||||
|
|
||||||
let mut iter_counter = 0usize;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) {
|
|
||||||
// It is OK to block here as long as we don't also do blocking
|
|
||||||
// sends in socket workers (doing both could cause a deadlock)
|
|
||||||
match (request, src.get().ip()) {
|
|
||||||
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
|
|
||||||
let response = torrents
|
|
||||||
.ipv4
|
|
||||||
.0
|
|
||||||
.entry(request.info_hash)
|
|
||||||
.or_default()
|
|
||||||
.announce(
|
|
||||||
&self.config,
|
|
||||||
&self.statistics_sender,
|
|
||||||
&mut rng,
|
|
||||||
&request,
|
|
||||||
ip.into(),
|
|
||||||
peer_valid_until,
|
|
||||||
);
|
|
||||||
|
|
||||||
// It doesn't matter which socket worker receives announce responses
|
|
||||||
self.response_sender
|
|
||||||
.send_to_any(src, ConnectedResponse::AnnounceIpv4(response))
|
|
||||||
.expect("swarm response channel is closed");
|
|
||||||
}
|
|
||||||
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
|
|
||||||
let response = torrents
|
|
||||||
.ipv6
|
|
||||||
.0
|
|
||||||
.entry(request.info_hash)
|
|
||||||
.or_default()
|
|
||||||
.announce(
|
|
||||||
&self.config,
|
|
||||||
&self.statistics_sender,
|
|
||||||
&mut rng,
|
|
||||||
&request,
|
|
||||||
ip.into(),
|
|
||||||
peer_valid_until,
|
|
||||||
);
|
|
||||||
|
|
||||||
// It doesn't matter which socket worker receives announce responses
|
|
||||||
self.response_sender
|
|
||||||
.send_to_any(src, ConnectedResponse::AnnounceIpv6(response))
|
|
||||||
.expect("swarm response channel is closed");
|
|
||||||
}
|
|
||||||
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
|
|
||||||
let response = torrents.ipv4.scrape(request);
|
|
||||||
|
|
||||||
self.response_sender
|
|
||||||
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
|
|
||||||
.expect("swarm response channel is closed");
|
|
||||||
}
|
|
||||||
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
|
|
||||||
let response = torrents.ipv6.scrape(request);
|
|
||||||
|
|
||||||
self.response_sender
|
|
||||||
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
|
|
||||||
.expect("swarm response channel is closed");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run periodic tasks
|
|
||||||
if iter_counter % 128 == 0 {
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
peer_valid_until = ValidUntil::new(
|
|
||||||
self.state.server_start_instant,
|
|
||||||
self.config.cleaning.max_peer_age,
|
|
||||||
);
|
|
||||||
|
|
||||||
if now > last_cleaning + cleaning_interval {
|
|
||||||
torrents.clean_and_update_statistics(
|
|
||||||
&self.config,
|
|
||||||
&self.state,
|
|
||||||
&self.statistics,
|
|
||||||
&self.statistics_sender,
|
|
||||||
&self.state.access_list,
|
|
||||||
);
|
|
||||||
|
|
||||||
last_cleaning = now;
|
|
||||||
}
|
|
||||||
if self.config.statistics.active()
|
|
||||||
&& now > last_statistics_update + statistics_update_interval
|
|
||||||
{
|
|
||||||
self.statistics
|
|
||||||
.ipv4
|
|
||||||
.torrents
|
|
||||||
.store(torrents.ipv4.num_torrents(), Ordering::Relaxed);
|
|
||||||
self.statistics
|
|
||||||
.ipv6
|
|
||||||
.torrents
|
|
||||||
.store(torrents.ipv6.num_torrents(), Ordering::Relaxed);
|
|
||||||
|
|
||||||
last_statistics_update = now;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
iter_counter = iter_counter.wrapping_add(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,563 +0,0 @@
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use aquatic_common::IndexMap;
|
|
||||||
use aquatic_common::SecondsSinceServerStart;
|
|
||||||
use aquatic_common::{
|
|
||||||
access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode},
|
|
||||||
ValidUntil,
|
|
||||||
};
|
|
||||||
|
|
||||||
use aquatic_udp_protocol::*;
|
|
||||||
use arrayvec::ArrayVec;
|
|
||||||
use crossbeam_channel::Sender;
|
|
||||||
use hdrhistogram::Histogram;
|
|
||||||
use rand::prelude::SmallRng;
|
|
||||||
use rand::Rng;
|
|
||||||
|
|
||||||
use crate::common::*;
|
|
||||||
use crate::config::Config;
|
|
||||||
|
|
||||||
const SMALL_PEER_MAP_CAPACITY: usize = 2;
|
|
||||||
|
|
||||||
pub struct TorrentMaps {
|
|
||||||
pub ipv4: TorrentMap<Ipv4AddrBytes>,
|
|
||||||
pub ipv6: TorrentMap<Ipv6AddrBytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for TorrentMaps {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
ipv4: TorrentMap(Default::default()),
|
|
||||||
ipv6: TorrentMap(Default::default()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TorrentMaps {
|
|
||||||
/// Remove forbidden or inactive torrents, reclaim space and update statistics
|
|
||||||
pub fn clean_and_update_statistics(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
state: &State,
|
|
||||||
statistics: &CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
access_list: &Arc<AccessListArcSwap>,
|
|
||||||
) {
|
|
||||||
let mut cache = create_access_list_cache(access_list);
|
|
||||||
let mode = config.access_list.mode;
|
|
||||||
let now = state.server_start_instant.seconds_elapsed();
|
|
||||||
|
|
||||||
let ipv4 =
|
|
||||||
self.ipv4
|
|
||||||
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
|
|
||||||
let ipv6 =
|
|
||||||
self.ipv6
|
|
||||||
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
|
|
||||||
|
|
||||||
if config.statistics.active() {
|
|
||||||
statistics.ipv4.peers.store(ipv4.0, Ordering::Relaxed);
|
|
||||||
statistics.ipv6.peers.store(ipv6.0, Ordering::Relaxed);
|
|
||||||
|
|
||||||
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
|
|
||||||
if let Err(err) = statistics_sender.try_send(message) {
|
|
||||||
::log::error!("couldn't send statistics message: {:#}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
|
|
||||||
if let Err(err) = statistics_sender.try_send(message) {
|
|
||||||
::log::error!("couldn't send statistics message: {:#}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct TorrentMap<I: Ip>(pub IndexMap<InfoHash, TorrentData<I>>);
|
|
||||||
|
|
||||||
impl<I: Ip> TorrentMap<I> {
|
|
||||||
pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse {
|
|
||||||
let torrent_stats = request
|
|
||||||
.info_hashes
|
|
||||||
.into_iter()
|
|
||||||
.map(|(i, info_hash)| {
|
|
||||||
let stats = self
|
|
||||||
.0
|
|
||||||
.get(&info_hash)
|
|
||||||
.map(|torrent_data| torrent_data.scrape_statistics())
|
|
||||||
.unwrap_or_else(|| TorrentScrapeStatistics {
|
|
||||||
seeders: NumberOfPeers::new(0),
|
|
||||||
leechers: NumberOfPeers::new(0),
|
|
||||||
completed: NumberOfDownloads::new(0),
|
|
||||||
});
|
|
||||||
|
|
||||||
(i, stats)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
PendingScrapeResponse {
|
|
||||||
slab_key: request.slab_key,
|
|
||||||
torrent_stats,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
|
||||||
fn clean_and_get_statistics(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
access_list_cache: &mut AccessListCache,
|
|
||||||
access_list_mode: AccessListMode,
|
|
||||||
now: SecondsSinceServerStart,
|
|
||||||
) -> (usize, Option<Histogram<u64>>) {
|
|
||||||
let mut total_num_peers = 0;
|
|
||||||
|
|
||||||
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
|
|
||||||
{
|
|
||||||
match Histogram::new(3) {
|
|
||||||
Ok(histogram) => Some(histogram),
|
|
||||||
Err(err) => {
|
|
||||||
::log::error!("Couldn't create peer histogram: {:#}", err);
|
|
||||||
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
self.0.retain(|info_hash, torrent| {
|
|
||||||
if !access_list_cache
|
|
||||||
.load()
|
|
||||||
.allows(access_list_mode, &info_hash.0)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
let num_peers = match torrent {
|
|
||||||
TorrentData::Small(peer_map) => {
|
|
||||||
peer_map.clean_and_get_num_peers(config, statistics_sender, now)
|
|
||||||
}
|
|
||||||
TorrentData::Large(peer_map) => {
|
|
||||||
let num_peers =
|
|
||||||
peer_map.clean_and_get_num_peers(config, statistics_sender, now);
|
|
||||||
|
|
||||||
if let Some(peer_map) = peer_map.try_shrink() {
|
|
||||||
*torrent = TorrentData::Small(peer_map);
|
|
||||||
}
|
|
||||||
|
|
||||||
num_peers
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
total_num_peers += num_peers;
|
|
||||||
|
|
||||||
match opt_histogram {
|
|
||||||
Some(ref mut histogram) if num_peers > 0 => {
|
|
||||||
let n = num_peers.try_into().expect("Couldn't fit usize into u64");
|
|
||||||
|
|
||||||
if let Err(err) = histogram.record(n) {
|
|
||||||
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
|
|
||||||
num_peers > 0
|
|
||||||
});
|
|
||||||
|
|
||||||
self.0.shrink_to_fit();
|
|
||||||
|
|
||||||
(total_num_peers, opt_histogram)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn num_torrents(&self) -> usize {
|
|
||||||
self.0.len()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum TorrentData<I: Ip> {
|
|
||||||
Small(SmallPeerMap<I>),
|
|
||||||
Large(LargePeerMap<I>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Ip> TorrentData<I> {
|
|
||||||
pub fn announce(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
rng: &mut SmallRng,
|
|
||||||
request: &AnnounceRequest,
|
|
||||||
ip_address: I,
|
|
||||||
valid_until: ValidUntil,
|
|
||||||
) -> AnnounceResponse<I> {
|
|
||||||
let max_num_peers_to_take: usize = if request.peers_wanted.0.get() <= 0 {
|
|
||||||
config.protocol.max_response_peers
|
|
||||||
} else {
|
|
||||||
::std::cmp::min(
|
|
||||||
config.protocol.max_response_peers,
|
|
||||||
request.peers_wanted.0.get().try_into().unwrap(),
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
let status =
|
|
||||||
PeerStatus::from_event_and_bytes_left(request.event.into(), request.bytes_left);
|
|
||||||
|
|
||||||
let peer_map_key = ResponsePeer {
|
|
||||||
ip_address,
|
|
||||||
port: request.port,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Create the response before inserting the peer. This means that we
|
|
||||||
// don't have to filter it out from the response peers, and that the
|
|
||||||
// reported number of seeders/leechers will not include it
|
|
||||||
let (response, opt_removed_peer) = match self {
|
|
||||||
Self::Small(peer_map) => {
|
|
||||||
let opt_removed_peer = peer_map.remove(&peer_map_key);
|
|
||||||
|
|
||||||
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
|
||||||
|
|
||||||
let response = AnnounceResponse {
|
|
||||||
fixed: AnnounceResponseFixedData {
|
|
||||||
transaction_id: request.transaction_id,
|
|
||||||
announce_interval: AnnounceInterval::new(
|
|
||||||
config.protocol.peer_announce_interval,
|
|
||||||
),
|
|
||||||
leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
|
|
||||||
seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
|
|
||||||
},
|
|
||||||
peers: peer_map.extract_response_peers(max_num_peers_to_take),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Convert peer map to large variant if it is full and
|
|
||||||
// announcing peer is not stopped and will therefore be
|
|
||||||
// inserted
|
|
||||||
if peer_map.is_full() && status != PeerStatus::Stopped {
|
|
||||||
*self = Self::Large(peer_map.to_large());
|
|
||||||
}
|
|
||||||
|
|
||||||
(response, opt_removed_peer)
|
|
||||||
}
|
|
||||||
Self::Large(peer_map) => {
|
|
||||||
let opt_removed_peer = peer_map.remove_peer(&peer_map_key);
|
|
||||||
|
|
||||||
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
|
||||||
|
|
||||||
let response = AnnounceResponse {
|
|
||||||
fixed: AnnounceResponseFixedData {
|
|
||||||
transaction_id: request.transaction_id,
|
|
||||||
announce_interval: AnnounceInterval::new(
|
|
||||||
config.protocol.peer_announce_interval,
|
|
||||||
),
|
|
||||||
leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
|
|
||||||
seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
|
|
||||||
},
|
|
||||||
peers: peer_map.extract_response_peers(rng, max_num_peers_to_take),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Try shrinking the map if announcing peer is stopped and
|
|
||||||
// will therefore not be inserted
|
|
||||||
if status == PeerStatus::Stopped {
|
|
||||||
if let Some(peer_map) = peer_map.try_shrink() {
|
|
||||||
*self = Self::Small(peer_map);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(response, opt_removed_peer)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match status {
|
|
||||||
PeerStatus::Leeching | PeerStatus::Seeding => {
|
|
||||||
let peer = Peer {
|
|
||||||
peer_id: request.peer_id,
|
|
||||||
is_seeder: status == PeerStatus::Seeding,
|
|
||||||
valid_until,
|
|
||||||
};
|
|
||||||
|
|
||||||
match self {
|
|
||||||
Self::Small(peer_map) => peer_map.insert(peer_map_key, peer),
|
|
||||||
Self::Large(peer_map) => peer_map.insert(peer_map_key, peer),
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.statistics.peer_clients && opt_removed_peer.is_none() {
|
|
||||||
statistics_sender
|
|
||||||
.try_send(StatisticsMessage::PeerAdded(request.peer_id))
|
|
||||||
.expect("statistics channel should be unbounded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PeerStatus::Stopped => {
|
|
||||||
if config.statistics.peer_clients && opt_removed_peer.is_some() {
|
|
||||||
statistics_sender
|
|
||||||
.try_send(StatisticsMessage::PeerRemoved(request.peer_id))
|
|
||||||
.expect("statistics channel should be unbounded");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn scrape_statistics(&self) -> TorrentScrapeStatistics {
|
|
||||||
let (seeders, leechers) = match self {
|
|
||||||
Self::Small(peer_map) => peer_map.num_seeders_leechers(),
|
|
||||||
Self::Large(peer_map) => peer_map.num_seeders_leechers(),
|
|
||||||
};
|
|
||||||
|
|
||||||
TorrentScrapeStatistics {
|
|
||||||
seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
|
|
||||||
leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
|
|
||||||
completed: NumberOfDownloads::new(0),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Ip> Default for TorrentData<I> {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::Small(SmallPeerMap(ArrayVec::default()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store torrents with up to two peers without an extra heap allocation
|
|
||||||
///
|
|
||||||
/// On public open trackers, this is likely to be the majority of torrents.
|
|
||||||
#[derive(Default, Debug)]
|
|
||||||
pub struct SmallPeerMap<I: Ip>(ArrayVec<(ResponsePeer<I>, Peer), SMALL_PEER_MAP_CAPACITY>);
|
|
||||||
|
|
||||||
impl<I: Ip> SmallPeerMap<I> {
|
|
||||||
fn is_full(&self) -> bool {
|
|
||||||
self.0.is_full()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn num_seeders_leechers(&self) -> (usize, usize) {
|
|
||||||
let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count();
|
|
||||||
let leechers = self.0.len() - seeders;
|
|
||||||
|
|
||||||
(seeders, leechers)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
|
||||||
self.0.push((key, peer));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
|
||||||
for (i, (k, _)) in self.0.iter().enumerate() {
|
|
||||||
if k == key {
|
|
||||||
return Some(self.0.remove(i).1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec<ResponsePeer<I>> {
|
|
||||||
Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clean_and_get_num_peers(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
now: SecondsSinceServerStart,
|
|
||||||
) -> usize {
|
|
||||||
self.0.retain(|(_, peer)| {
|
|
||||||
let keep = peer.valid_until.valid(now);
|
|
||||||
|
|
||||||
if !keep
|
|
||||||
&& config.statistics.peer_clients
|
|
||||||
&& statistics_sender
|
|
||||||
.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
// Should never happen in practice
|
|
||||||
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
|
|
||||||
}
|
|
||||||
|
|
||||||
keep
|
|
||||||
});
|
|
||||||
|
|
||||||
self.0.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_large(&self) -> LargePeerMap<I> {
|
|
||||||
let (num_seeders, _) = self.num_seeders_leechers();
|
|
||||||
let peers = self.0.iter().copied().collect();
|
|
||||||
|
|
||||||
LargePeerMap { peers, num_seeders }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct LargePeerMap<I: Ip> {
|
|
||||||
peers: IndexMap<ResponsePeer<I>, Peer>,
|
|
||||||
num_seeders: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Ip> LargePeerMap<I> {
|
|
||||||
fn num_seeders_leechers(&self) -> (usize, usize) {
|
|
||||||
(self.num_seeders, self.peers.len() - self.num_seeders)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
|
||||||
if peer.is_seeder {
|
|
||||||
self.num_seeders += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.peers.insert(key, peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
|
||||||
let opt_removed_peer = self.peers.swap_remove(key);
|
|
||||||
|
|
||||||
if let Some(Peer {
|
|
||||||
is_seeder: true, ..
|
|
||||||
}) = opt_removed_peer
|
|
||||||
{
|
|
||||||
self.num_seeders -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
opt_removed_peer
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Extract response peers
|
|
||||||
///
|
|
||||||
/// If there are more peers in map than `max_num_peers_to_take`, do a random
|
|
||||||
/// selection of peers from first and second halves of map in order to avoid
|
|
||||||
/// returning too homogeneous peers.
|
|
||||||
///
|
|
||||||
/// Does NOT filter out announcing peer.
|
|
||||||
pub fn extract_response_peers(
|
|
||||||
&self,
|
|
||||||
rng: &mut impl Rng,
|
|
||||||
max_num_peers_to_take: usize,
|
|
||||||
) -> Vec<ResponsePeer<I>> {
|
|
||||||
if self.peers.len() <= max_num_peers_to_take {
|
|
||||||
self.peers.keys().copied().collect()
|
|
||||||
} else {
|
|
||||||
let middle_index = self.peers.len() / 2;
|
|
||||||
let num_to_take_per_half = max_num_peers_to_take / 2;
|
|
||||||
|
|
||||||
let offset_half_one = {
|
|
||||||
let from = 0;
|
|
||||||
let to = usize::max(1, middle_index - num_to_take_per_half);
|
|
||||||
|
|
||||||
rng.gen_range(from..to)
|
|
||||||
};
|
|
||||||
let offset_half_two = {
|
|
||||||
let from = middle_index;
|
|
||||||
let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half);
|
|
||||||
|
|
||||||
rng.gen_range(from..to)
|
|
||||||
};
|
|
||||||
|
|
||||||
let end_half_one = offset_half_one + num_to_take_per_half;
|
|
||||||
let end_half_two = offset_half_two + num_to_take_per_half;
|
|
||||||
|
|
||||||
let mut peers = Vec::with_capacity(max_num_peers_to_take);
|
|
||||||
|
|
||||||
if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) {
|
|
||||||
peers.extend(slice.keys());
|
|
||||||
}
|
|
||||||
if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) {
|
|
||||||
peers.extend(slice.keys());
|
|
||||||
}
|
|
||||||
|
|
||||||
peers
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clean_and_get_num_peers(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
now: SecondsSinceServerStart,
|
|
||||||
) -> usize {
|
|
||||||
self.peers.retain(|_, peer| {
|
|
||||||
let keep = peer.valid_until.valid(now);
|
|
||||||
|
|
||||||
if !keep {
|
|
||||||
if peer.is_seeder {
|
|
||||||
self.num_seeders -= 1;
|
|
||||||
}
|
|
||||||
if config.statistics.peer_clients
|
|
||||||
&& statistics_sender
|
|
||||||
.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
// Should never happen in practice
|
|
||||||
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
keep
|
|
||||||
});
|
|
||||||
|
|
||||||
if !self.peers.is_empty() {
|
|
||||||
self.peers.shrink_to_fit();
|
|
||||||
}
|
|
||||||
|
|
||||||
self.peers.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_shrink(&mut self) -> Option<SmallPeerMap<I>> {
|
|
||||||
(self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| {
|
|
||||||
SmallPeerMap(ArrayVec::from_iter(
|
|
||||||
self.peers.iter().map(|(k, v)| (*k, *v)),
|
|
||||||
))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
struct Peer {
|
|
||||||
peer_id: PeerId,
|
|
||||||
is_seeder: bool,
|
|
||||||
valid_until: ValidUntil,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
|
|
||||||
pub enum PeerStatus {
|
|
||||||
Seeding,
|
|
||||||
Leeching,
|
|
||||||
Stopped,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PeerStatus {
|
|
||||||
/// Determine peer status from announce event and number of bytes left.
|
|
||||||
///
|
|
||||||
/// Likely, the last branch will be taken most of the time.
|
|
||||||
#[inline]
|
|
||||||
pub fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: NumberOfBytes) -> Self {
|
|
||||||
if event == AnnounceEvent::Stopped {
|
|
||||||
Self::Stopped
|
|
||||||
} else if bytes_left.0.get() == 0 {
|
|
||||||
Self::Seeding
|
|
||||||
} else {
|
|
||||||
Self::Leeching
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_peer_status_from_event_and_bytes_left() {
|
|
||||||
use PeerStatus::*;
|
|
||||||
|
|
||||||
let f = PeerStatus::from_event_and_bytes_left;
|
|
||||||
|
|
||||||
assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(0)));
|
|
||||||
assert_eq!(Stopped, f(AnnounceEvent::Stopped, NumberOfBytes::new(1)));
|
|
||||||
|
|
||||||
assert_eq!(Seeding, f(AnnounceEvent::Started, NumberOfBytes::new(0)));
|
|
||||||
assert_eq!(Leeching, f(AnnounceEvent::Started, NumberOfBytes::new(1)));
|
|
||||||
|
|
||||||
assert_eq!(Seeding, f(AnnounceEvent::Completed, NumberOfBytes::new(0)));
|
|
||||||
assert_eq!(Leeching, f(AnnounceEvent::Completed, NumberOfBytes::new(1)));
|
|
||||||
|
|
||||||
assert_eq!(Seeding, f(AnnounceEvent::None, NumberOfBytes::new(0)));
|
|
||||||
assert_eq!(Leeching, f(AnnounceEvent::None, NumberOfBytes::new(1)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue