udp: split workers/request.rs into modules

This commit is contained in:
Joakim Frostegård 2022-04-16 00:25:36 +02:00
parent 86fb7f0fb3
commit 313b73daef
2 changed files with 120 additions and 108 deletions

View file

@ -1,15 +1,11 @@
mod storage;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::AmortizedIndexMap;
use aquatic_common::CanonicalSocketAddr; use aquatic_common::CanonicalSocketAddr;
use aquatic_common::PanicSentinel; use aquatic_common::PanicSentinel;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
@ -23,107 +19,7 @@ use aquatic_udp_protocol::*;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
#[derive(Clone, Debug)] use storage::{Peer, TorrentMap, TorrentMaps};
struct Peer<I: Ip> {
pub ip_address: I,
pub port: Port,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
type PeerMap<I> = AmortizedIndexMap<PeerId, Peer<I>>;
struct TorrentData<I: Ip> {
pub peers: PeerMap<I>,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl<I: Ip> Default for TorrentData<I> {
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)]
struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>,
}
impl TorrentMaps {
/// Remove disallowed and inactive torrents
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
let now = Instant::now();
let access_list_mode = config.access_list.mode;
let mut access_list_cache = create_access_list_cache(access_list);
self.ipv4.retain(|info_hash, torrent| {
access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv4.shrink_to_fit();
self.ipv6.retain(|info_hash, torrent| {
access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv6.shrink_to_fit();
}
/// Returns true if torrent is to be kept
#[inline]
fn clean_torrent_and_peers<I: Ip>(now: Instant, torrent: &mut TorrentData<I>) -> bool {
let num_seeders = &mut torrent.num_seeders;
let num_leechers = &mut torrent.num_leechers;
torrent.peers.retain(|_, peer| {
if peer.valid_until.0 > now {
true
} else {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
false
}
});
if torrent.peers.is_empty() {
false
} else {
torrent.peers.shrink_to_fit();
true
}
}
}
pub fn run_request_worker( pub fn run_request_worker(
_sentinel: PanicSentinel, _sentinel: PanicSentinel,
@ -388,7 +284,7 @@ mod tests {
let gen_num_peers = data.0 as u32; let gen_num_peers = data.0 as u32;
let req_num_peers = data.1 as usize; let req_num_peers = data.1 as usize;
let mut peer_map: PeerMap<Ipv4Addr> = Default::default(); let mut peer_map: storage::PeerMap<Ipv4Addr> = Default::default();
let mut opt_sender_key = None; let mut opt_sender_key = None;
let mut opt_sender_peer = None; let mut opt_sender_peer = None;

View file

@ -0,0 +1,116 @@
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::AmortizedIndexMap;
use aquatic_common::ValidUntil;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
#[derive(Clone, Debug)]
pub struct Peer<I: Ip> {
pub ip_address: I,
pub port: Port,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
pub fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
pub type PeerMap<I> = AmortizedIndexMap<PeerId, Peer<I>>;
pub struct TorrentData<I: Ip> {
pub peers: PeerMap<I>,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl<I: Ip> Default for TorrentData<I> {
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
pub type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)]
pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>,
}
impl TorrentMaps {
/// Remove disallowed and inactive torrents
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
let now = Instant::now();
let access_list_mode = config.access_list.mode;
let mut access_list_cache = create_access_list_cache(access_list);
self.ipv4.retain(|info_hash, torrent| {
access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv4.shrink_to_fit();
self.ipv6.retain(|info_hash, torrent| {
access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv6.shrink_to_fit();
}
/// Returns true if torrent is to be kept
#[inline]
fn clean_torrent_and_peers<I: Ip>(now: Instant, torrent: &mut TorrentData<I>) -> bool {
let num_seeders = &mut torrent.num_seeders;
let num_leechers = &mut torrent.num_leechers;
torrent.peers.retain(|_, peer| {
if peer.valid_until.0 > now {
true
} else {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
false
}
});
if torrent.peers.is_empty() {
false
} else {
torrent.peers.shrink_to_fit();
true
}
}
}