mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
Merge pull request #36 from greatest-ape/fixes-2021-11-29
small udp refactor, update README and TODO
This commit is contained in:
commit
e1bffae42c
4 changed files with 100 additions and 98 deletions
|
|
@ -106,7 +106,7 @@ in emitting of an info-level log message.
|
|||
#### More information
|
||||
|
||||
More documentation of the various configuration options might be available
|
||||
under `src/lib/config.rs` in directories `aquatic_udp`, `aquatic_http` and
|
||||
under `src/config.rs` in directories `aquatic_udp`, `aquatic_http` and
|
||||
`aquatic_ws`.
|
||||
|
||||
## Details on implementations
|
||||
|
|
|
|||
5
TODO.md
5
TODO.md
|
|
@ -17,12 +17,13 @@
|
|||
* cargo-deny
|
||||
|
||||
* aquatic_udp
|
||||
* consider bounded channels with size 0 meaning unbounded
|
||||
* check config field and group names, including in load tester
|
||||
* look at proper cpu pinning (check that one thread gets bound per core)
|
||||
* then consider so_attach_reuseport_cbpf
|
||||
* what poll event capacity is actually needed?
|
||||
* stagger connection cleaning intervals?
|
||||
* load test
|
||||
* move additional request sending to for each received response, maybe
|
||||
with probability 0.2
|
||||
|
||||
* aquatic_http:
|
||||
* clean out connections regularly
|
||||
|
|
|
|||
|
|
@ -3,13 +3,10 @@ use std::hash::Hash;
|
|||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use crossbeam_channel::{Sender, TrySendError};
|
||||
|
||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap};
|
||||
use aquatic_common::AHashIndexMap;
|
||||
use aquatic_common::ValidUntil;
|
||||
use aquatic_common::access_list::AccessListArcSwap;
|
||||
use aquatic_udp_protocol::*;
|
||||
|
||||
use crate::config::Config;
|
||||
|
|
@ -150,95 +147,6 @@ impl PeerStatus {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Peer<I: Ip> {
|
||||
pub ip_address: I,
|
||||
pub port: Port,
|
||||
pub status: PeerStatus,
|
||||
pub valid_until: ValidUntil,
|
||||
}
|
||||
|
||||
pub type PeerMap<I> = AHashIndexMap<PeerId, Peer<I>>;
|
||||
|
||||
pub struct TorrentData<I: Ip> {
|
||||
pub peers: PeerMap<I>,
|
||||
pub num_seeders: usize,
|
||||
pub num_leechers: usize,
|
||||
}
|
||||
|
||||
impl<I: Ip> Default for TorrentData<I> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
peers: Default::default(),
|
||||
num_seeders: 0,
|
||||
num_leechers: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type TorrentMap<I> = AHashIndexMap<InfoHash, TorrentData<I>>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TorrentMaps {
|
||||
pub ipv4: TorrentMap<Ipv4Addr>,
|
||||
pub ipv6: TorrentMap<Ipv6Addr>,
|
||||
}
|
||||
|
||||
impl TorrentMaps {
|
||||
/// Remove disallowed and inactive torrents
|
||||
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
|
||||
let now = Instant::now();
|
||||
let access_list_mode = config.access_list.mode;
|
||||
|
||||
let mut access_list_cache = create_access_list_cache(access_list);
|
||||
|
||||
self.ipv4.retain(|info_hash, torrent| {
|
||||
access_list_cache
|
||||
.load()
|
||||
.allows(access_list_mode, &info_hash.0)
|
||||
&& Self::clean_torrent_and_peers(now, torrent)
|
||||
});
|
||||
self.ipv4.shrink_to_fit();
|
||||
|
||||
self.ipv6.retain(|info_hash, torrent| {
|
||||
access_list_cache
|
||||
.load()
|
||||
.allows(access_list_mode, &info_hash.0)
|
||||
&& Self::clean_torrent_and_peers(now, torrent)
|
||||
});
|
||||
self.ipv6.shrink_to_fit();
|
||||
}
|
||||
|
||||
/// Returns true if torrent is to be kept
|
||||
#[inline]
|
||||
fn clean_torrent_and_peers<I: Ip>(now: Instant, torrent: &mut TorrentData<I>) -> bool {
|
||||
let num_seeders = &mut torrent.num_seeders;
|
||||
let num_leechers = &mut torrent.num_leechers;
|
||||
|
||||
torrent.peers.retain(|_, peer| {
|
||||
let keep = peer.valid_until.0 > now;
|
||||
|
||||
if !keep {
|
||||
match peer.status {
|
||||
PeerStatus::Seeding => {
|
||||
*num_seeders -= 1;
|
||||
}
|
||||
PeerStatus::Leeching => {
|
||||
*num_leechers -= 1;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
keep
|
||||
});
|
||||
|
||||
torrent.peers.shrink_to_fit();
|
||||
|
||||
!torrent.peers.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Statistics {
|
||||
pub requests_received: AtomicUsize,
|
||||
pub responses_sent: AtomicUsize,
|
||||
|
|
|
|||
|
|
@ -4,9 +4,13 @@ use std::net::Ipv4Addr;
|
|||
use std::net::Ipv6Addr;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use aquatic_common::access_list::create_access_list_cache;
|
||||
use aquatic_common::access_list::AccessListArcSwap;
|
||||
use aquatic_common::AHashIndexMap;
|
||||
use aquatic_common::ValidUntil;
|
||||
use crossbeam_channel::Receiver;
|
||||
use rand::{rngs::SmallRng, SeedableRng};
|
||||
|
|
@ -18,6 +22,95 @@ use aquatic_udp_protocol::*;
|
|||
use crate::common::*;
|
||||
use crate::config::Config;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Peer<I: Ip> {
|
||||
pub ip_address: I,
|
||||
pub port: Port,
|
||||
pub status: PeerStatus,
|
||||
pub valid_until: ValidUntil,
|
||||
}
|
||||
|
||||
type PeerMap<I> = AHashIndexMap<PeerId, Peer<I>>;
|
||||
|
||||
struct TorrentData<I: Ip> {
|
||||
pub peers: PeerMap<I>,
|
||||
pub num_seeders: usize,
|
||||
pub num_leechers: usize,
|
||||
}
|
||||
|
||||
impl<I: Ip> Default for TorrentData<I> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
peers: Default::default(),
|
||||
num_seeders: 0,
|
||||
num_leechers: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type TorrentMap<I> = AHashIndexMap<InfoHash, TorrentData<I>>;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TorrentMaps {
|
||||
pub ipv4: TorrentMap<Ipv4Addr>,
|
||||
pub ipv6: TorrentMap<Ipv6Addr>,
|
||||
}
|
||||
|
||||
impl TorrentMaps {
|
||||
/// Remove disallowed and inactive torrents
|
||||
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
|
||||
let now = Instant::now();
|
||||
let access_list_mode = config.access_list.mode;
|
||||
|
||||
let mut access_list_cache = create_access_list_cache(access_list);
|
||||
|
||||
self.ipv4.retain(|info_hash, torrent| {
|
||||
access_list_cache
|
||||
.load()
|
||||
.allows(access_list_mode, &info_hash.0)
|
||||
&& Self::clean_torrent_and_peers(now, torrent)
|
||||
});
|
||||
self.ipv4.shrink_to_fit();
|
||||
|
||||
self.ipv6.retain(|info_hash, torrent| {
|
||||
access_list_cache
|
||||
.load()
|
||||
.allows(access_list_mode, &info_hash.0)
|
||||
&& Self::clean_torrent_and_peers(now, torrent)
|
||||
});
|
||||
self.ipv6.shrink_to_fit();
|
||||
}
|
||||
|
||||
/// Returns true if torrent is to be kept
|
||||
#[inline]
|
||||
fn clean_torrent_and_peers<I: Ip>(now: Instant, torrent: &mut TorrentData<I>) -> bool {
|
||||
let num_seeders = &mut torrent.num_seeders;
|
||||
let num_leechers = &mut torrent.num_leechers;
|
||||
|
||||
torrent.peers.retain(|_, peer| {
|
||||
let keep = peer.valid_until.0 > now;
|
||||
|
||||
if !keep {
|
||||
match peer.status {
|
||||
PeerStatus::Seeding => {
|
||||
*num_seeders -= 1;
|
||||
}
|
||||
PeerStatus::Leeching => {
|
||||
*num_leechers -= 1;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
keep
|
||||
});
|
||||
|
||||
torrent.peers.shrink_to_fit();
|
||||
|
||||
!torrent.peers.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
pub struct ProtocolResponsePeer<I> {
|
||||
pub ip_address: I,
|
||||
|
|
@ -156,7 +249,7 @@ pub fn run_request_worker(
|
|||
}
|
||||
}
|
||||
|
||||
pub fn handle_announce_request(
|
||||
fn handle_announce_request(
|
||||
config: &Config,
|
||||
rng: &mut SmallRng,
|
||||
torrents: &mut TorrentMaps,
|
||||
|
|
@ -260,7 +353,7 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn handle_scrape_request(
|
||||
fn handle_scrape_request(
|
||||
torrents: &mut TorrentMaps,
|
||||
src: SocketAddr,
|
||||
request: PendingScrapeRequest,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue