aquatic_udp: simplify access list handling in tasks, request workers

This commit is contained in:
Joakim Frostegård 2021-10-15 23:35:05 +02:00
parent ddb1f394a1
commit f65bcd7f56
4 changed files with 66 additions and 103 deletions

View file

@ -3,7 +3,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::{atomic::AtomicUsize, Arc}; use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::AccessListMode;
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -11,6 +10,8 @@ use parking_lot::Mutex;
pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*; pub use aquatic_udp_protocol::*;
use crate::config::Config;
pub const MAX_PACKET_SIZE: usize = 4096; pub const MAX_PACKET_SIZE: usize = 4096;
pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { pub trait Ip: Hash + PartialEq + Eq + Clone + Copy {
@ -108,35 +109,25 @@ pub type TorrentMap<I> = HashMap<InfoHash, TorrentData<I>>;
pub struct TorrentMaps { pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>, pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>, pub ipv6: TorrentMap<Ipv6Addr>,
pub access_list: AccessList,
} }
impl TorrentMaps { impl TorrentMaps {
/// Remove inactive torrents
pub fn clean(&mut self, now: Instant) {
self.ipv4
.retain(|_, torrent| Self::clean_torrent_and_peers(now, torrent));
self.ipv4.shrink_to_fit();
self.ipv6
.retain(|_, torrent| Self::clean_torrent_and_peers(now, torrent));
self.ipv6.shrink_to_fit();
}
/// Remove disallowed and inactive torrents /// Remove disallowed and inactive torrents
pub fn clean_with_access_list( pub fn clean(&mut self, config: &Config) {
&mut self, let now = Instant::now();
access_list_type: AccessListMode,
access_list: &AccessList, let access_list = &self.access_list;
now: Instant, let access_list_mode = config.access_list.mode;
) {
self.ipv4.retain(|info_hash, torrent| { self.ipv4.retain(|info_hash, torrent| {
access_list.allows(access_list_type, &info_hash.0) access_list.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent) && Self::clean_torrent_and_peers(now, torrent)
}); });
self.ipv4.shrink_to_fit(); self.ipv4.shrink_to_fit();
self.ipv6.retain(|info_hash, torrent| { self.ipv6.retain(|info_hash, torrent| {
access_list.allows(access_list_type, &info_hash.0) access_list.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent) && Self::clean_torrent_and_peers(now, torrent)
}); });
self.ipv6.shrink_to_fit(); self.ipv6.shrink_to_fit();
@ -183,7 +174,6 @@ pub struct Statistics {
pub struct State { pub struct State {
pub connections: Arc<Mutex<ConnectionMap>>, pub connections: Arc<Mutex<ConnectionMap>>,
pub torrents: Arc<Mutex<TorrentMaps>>, pub torrents: Arc<Mutex<TorrentMaps>>,
pub access_list: Arc<Mutex<AccessList>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
} }
@ -192,7 +182,6 @@ impl Default for State {
Self { Self {
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
torrents: Arc::new(Mutex::new(TorrentMaps::default())), torrents: Arc::new(Mutex::new(TorrentMaps::default())),
access_list: Arc::new(Mutex::new(AccessList::default())),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
} }
} }

View file

@ -9,9 +9,7 @@ use rand::{
Rng, SeedableRng, Rng, SeedableRng,
}; };
use aquatic_common::{ use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers};
access_list::AccessListMode, convert_ipv4_mapped_ipv6, extract_response_peers,
};
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::common::*; use crate::common::*;
@ -81,7 +79,7 @@ pub fn run_request_worker(
&mut responses, &mut responses,
); );
// Check announce and scrape requests for valid connection // Check announce and scrape requests for valid connections
announce_requests.retain(|(request, src)| { announce_requests.retain(|(request, src)| {
let connection_key = ConnectionKey { let connection_key = ConnectionKey {
@ -125,31 +123,7 @@ pub fn run_request_worker(
::std::mem::drop(connections); ::std::mem::drop(connections);
// Check announce requests for allowed info hashes // Generate responses for announce and scrape requests
match config.access_list.mode {
access_list_type @ (AccessListMode::Require | AccessListMode::Forbid) => {
let access_list: MutexGuard<AccessList> = state.access_list.lock();
announce_requests.retain(|(request, src)| {
if !access_list.allows(access_list_type, &request.info_hash.0) {
let response = ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".to_string(),
};
responses.push((response.into(), *src));
return false;
}
true
});
}
AccessListMode::Ignore => {}
};
// Handle announce and scrape requests
if !(announce_requests.is_empty() && scrape_requests.is_empty()) { if !(announce_requests.is_empty() && scrape_requests.is_empty()) {
let mut torrents = state.torrents.lock(); let mut torrents = state.torrents.lock();
@ -210,30 +184,42 @@ pub fn handle_announce_requests(
responses: &mut Vec<(Response, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>,
) { ) {
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
let access_list_mode = config.access_list.mode;
responses.extend(requests.map(|(request, src)| { responses.extend(requests.map(|(request, src)| {
let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); let info_hash_allowed = torrents.access_list.allows(access_list_mode, &request.info_hash.0);
let response = match peer_ip { let response = if info_hash_allowed {
IpAddr::V4(ip) => handle_announce_request( let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
config,
rng, let response = match peer_ip {
&mut torrents.ipv4, IpAddr::V4(ip) => handle_announce_request(
request, config,
ip, rng,
peer_valid_until, &mut torrents.ipv4,
), request,
IpAddr::V6(ip) => handle_announce_request( ip,
config, peer_valid_until,
rng, ),
&mut torrents.ipv6, IpAddr::V6(ip) => handle_announce_request(
request, config,
ip, rng,
peer_valid_until, &mut torrents.ipv6,
), request,
ip,
peer_valid_until,
),
};
Response::Announce(response)
} else {
Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".to_string(),
})
}; };
(response.into(), src) (response, src)
})); }));
} }

View file

@ -6,7 +6,6 @@ use std::thread::Builder;
use std::time::Duration; use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use aquatic_common::access_list::AccessListMode;
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use privdrop::PrivDrop; use privdrop::PrivDrop;
@ -24,15 +23,7 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::default(); let state = State::default();
match config.access_list.mode { tasks::update_access_list(&config, &mut state.torrents.lock());
AccessListMode::Require | AccessListMode::Forbid => {
state
.access_list
.lock()
.update_from_path(&config.access_list.path)?;
}
AccessListMode::Ignore => {}
}
let num_bound_sockets = start_workers(config.clone(), state.clone())?; let num_bound_sockets = start_workers(config.clone(), state.clone())?;
@ -64,7 +55,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
loop { loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); ::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::clean_connections_and_torrents(&config, &state); tasks::clean_connections(&state);
let mut torrent_maps = state.torrents.lock();
tasks::update_access_list(&config, &mut torrent_maps);
torrent_maps.clean(&config);
} }
} }

View file

@ -8,36 +8,27 @@ use aquatic_common::access_list::AccessListMode;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn clean_connections_and_torrents(config: &Config, state: &State) { pub fn update_access_list(config: &Config, torrent_maps: &mut TorrentMaps){
let now = Instant::now();
{
let mut connections = state.connections.lock();
connections.retain(|_, v| v.0 > now);
connections.shrink_to_fit();
}
match config.access_list.mode { match config.access_list.mode {
AccessListMode::Require | AccessListMode::Forbid => { AccessListMode::Require | AccessListMode::Forbid => {
let mut access_list = state.access_list.lock(); if let Err(err) = torrent_maps.access_list.update_from_path(&config.access_list.path) {
if let Err(err) = access_list.update_from_path(&config.access_list.path) {
::log::error!("Update access list from path: {:?}", err); ::log::error!("Update access list from path: {:?}", err);
} }
state.torrents.lock().clean_with_access_list(
config.access_list.mode,
&access_list,
now,
);
}
AccessListMode::Ignore => {
state.torrents.lock().clean(now);
} }
AccessListMode::Ignore => { }
} }
} }
pub fn clean_connections(state: &State) {
let now = Instant::now();
let mut connections = state.connections.lock();
connections.retain(|_, v| v.0 > now);
connections.shrink_to_fit();
}
pub fn gather_and_print_statistics(state: &State, config: &Config) { pub fn gather_and_print_statistics(state: &State, config: &Config) {
let interval = config.statistics.interval; let interval = config.statistics.interval;