From f65bcd7f5620fa9028bb5e4d760b33d025e3dafb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 15 Oct 2021 23:35:05 +0200 Subject: [PATCH] aquatic_udp: simplify access list handling in tasks, request workers --- aquatic_udp/src/lib/common.rs | 33 +++++-------- aquatic_udp/src/lib/handlers.rs | 82 ++++++++++++++------------------- aquatic_udp/src/lib/lib.rs | 19 ++++---- aquatic_udp/src/lib/tasks.rs | 35 ++++++-------- 4 files changed, 66 insertions(+), 103 deletions(-) diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index ec78716..cccd553 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -3,7 +3,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; -use aquatic_common::access_list::AccessListMode; use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; @@ -11,6 +10,8 @@ use parking_lot::Mutex; pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_udp_protocol::*; +use crate::config::Config; + pub const MAX_PACKET_SIZE: usize = 4096; pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { @@ -108,35 +109,25 @@ pub type TorrentMap = HashMap>; pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, + pub access_list: AccessList, } impl TorrentMaps { - /// Remove inactive torrents - pub fn clean(&mut self, now: Instant) { - self.ipv4 - .retain(|_, torrent| Self::clean_torrent_and_peers(now, torrent)); - self.ipv4.shrink_to_fit(); - - self.ipv6 - .retain(|_, torrent| Self::clean_torrent_and_peers(now, torrent)); - self.ipv6.shrink_to_fit(); - } - /// Remove disallowed and inactive torrents - pub fn clean_with_access_list( - &mut self, - access_list_type: AccessListMode, - access_list: &AccessList, - now: Instant, - ) { + pub fn clean(&mut self, config: &Config) { + let now = Instant::now(); + + let access_list = &self.access_list; + let access_list_mode = config.access_list.mode; + self.ipv4.retain(|info_hash, torrent| { - access_list.allows(access_list_type, &info_hash.0) + access_list.allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv4.shrink_to_fit(); self.ipv6.retain(|info_hash, torrent| { - access_list.allows(access_list_type, &info_hash.0) + access_list.allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv6.shrink_to_fit(); @@ -183,7 +174,6 @@ pub struct Statistics { pub struct State { pub connections: Arc>, pub torrents: Arc>, - pub access_list: Arc>, pub statistics: Arc, } @@ -192,7 +182,6 @@ impl Default for State { Self { connections: Arc::new(Mutex::new(HashMap::new())), torrents: Arc::new(Mutex::new(TorrentMaps::default())), - access_list: Arc::new(Mutex::new(AccessList::default())), statistics: Arc::new(Statistics::default()), } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index 36ceb14..ecfeae5 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -9,9 +9,7 @@ use rand::{ Rng, SeedableRng, }; -use aquatic_common::{ - access_list::AccessListMode, convert_ipv4_mapped_ipv6, extract_response_peers, -}; +use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; use aquatic_udp_protocol::*; use crate::common::*; @@ -81,7 +79,7 @@ pub fn run_request_worker( &mut responses, ); - // Check announce and scrape requests for valid connection + // Check announce and scrape requests for valid connections announce_requests.retain(|(request, src)| { let connection_key = ConnectionKey { @@ -125,31 +123,7 @@ pub fn run_request_worker( ::std::mem::drop(connections); - // Check announce requests for allowed info hashes - - match config.access_list.mode { - access_list_type @ (AccessListMode::Require | AccessListMode::Forbid) => { - let access_list: MutexGuard = state.access_list.lock(); - - announce_requests.retain(|(request, src)| { - if !access_list.allows(access_list_type, &request.info_hash.0) { - let response = ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".to_string(), - }; - - responses.push((response.into(), *src)); - - return false; - } - - true - }); - } - AccessListMode::Ignore => {} - }; - - // Handle announce and scrape requests + // Generate responses for announce and scrape requests if !(announce_requests.is_empty() && scrape_requests.is_empty()) { let mut torrents = state.torrents.lock(); @@ -210,30 +184,42 @@ pub fn handle_announce_requests( responses: &mut Vec<(Response, SocketAddr)>, ) { let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let access_list_mode = config.access_list.mode; responses.extend(requests.map(|(request, src)| { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + let info_hash_allowed = torrents.access_list.allows(access_list_mode, &request.info_hash.0); - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), + let response = if info_hash_allowed { + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + let response = match peer_ip { + IpAddr::V4(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + }; + + Response::Announce(response) + } else { + Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".to_string(), + }) }; - (response.into(), src) + (response, src) })); } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 6e5fd77..3f099c8 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -6,7 +6,6 @@ use std::thread::Builder; use std::time::Duration; use anyhow::Context; -use aquatic_common::access_list::AccessListMode; use crossbeam_channel::unbounded; use privdrop::PrivDrop; @@ -24,15 +23,7 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); - match config.access_list.mode { - AccessListMode::Require | AccessListMode::Forbid => { - state - .access_list - .lock() - .update_from_path(&config.access_list.path)?; - } - AccessListMode::Ignore => {} - } + tasks::update_access_list(&config, &mut state.torrents.lock()); let num_bound_sockets = start_workers(config.clone(), state.clone())?; @@ -63,8 +54,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + tasks::clean_connections(&state); - tasks::clean_connections_and_torrents(&config, &state); + let mut torrent_maps = state.torrents.lock(); + + tasks::update_access_list(&config, &mut torrent_maps); + + torrent_maps.clean(&config); } } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 04216bb..4f35aec 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -8,36 +8,27 @@ use aquatic_common::access_list::AccessListMode; use crate::common::*; use crate::config::Config; -pub fn clean_connections_and_torrents(config: &Config, state: &State) { - let now = Instant::now(); - - { - let mut connections = state.connections.lock(); - - connections.retain(|_, v| v.0 > now); - connections.shrink_to_fit(); - } - +pub fn update_access_list(config: &Config, torrent_maps: &mut TorrentMaps){ match config.access_list.mode { AccessListMode::Require | AccessListMode::Forbid => { - let mut access_list = state.access_list.lock(); - - if let Err(err) = access_list.update_from_path(&config.access_list.path) { + if let Err(err) = torrent_maps.access_list.update_from_path(&config.access_list.path) { ::log::error!("Update access list from path: {:?}", err); } - - state.torrents.lock().clean_with_access_list( - config.access_list.mode, - &access_list, - now, - ); - } - AccessListMode::Ignore => { - state.torrents.lock().clean(now); } + AccessListMode::Ignore => { } } } +pub fn clean_connections(state: &State) { + let now = Instant::now(); + + let mut connections = state.connections.lock(); + + connections.retain(|_, v| v.0 > now); + connections.shrink_to_fit(); +} + + pub fn gather_and_print_statistics(state: &State, config: &Config) { let interval = config.statistics.interval;