diff --git a/Cargo.lock b/Cargo.lock index b358509..6eb446c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "addr2line" version = "0.16.0" @@ -72,8 +74,11 @@ dependencies = [ name = "aquatic_common" version = "0.1.0" dependencies = [ + "anyhow", + "hashbrown 0.11.2", "indexmap", "rand", + "serde", ] [[package]] diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index b74988f..79c4d86 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -11,5 +11,8 @@ repository = "https://github.com/greatest-ape/aquatic" name = "aquatic_common" [dependencies] +anyhow = "1" +hashbrown = "0.11.2" indexmap = "1" rand = { version = "0.8", features = ["small_rng"] } +serde = { version = "1", features = ["derive"] } diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 7ba33c1..1b97684 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,8 +1,74 @@ +use std::fs::File; +use std::io::{BufRead, BufReader}; use std::net::IpAddr; +use std::path::PathBuf; use std::time::{Duration, Instant}; +use hashbrown::HashSet; use indexmap::IndexMap; use rand::Rng; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum AccessListType { + Allow, + Deny, + Ignore +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AccessListConfig { + pub path: PathBuf, + pub list_type: AccessListType, +} + +impl Default for AccessListConfig { + fn default() -> Self { + Self { + path: "".into(), + list_type: AccessListType::Ignore, + } + } +} + +#[derive(Default)] +pub struct AccessList(HashSet<[u8; 20]>); + +impl AccessList { + fn parse_line_to_info_hash(line: String) -> anyhow::Result<[u8; 20]> { + let mut count = 0usize; + let mut bytes = [0u8; 20]; + + for (byte, c) in bytes.iter_mut().zip(line.chars()) { + *byte = c as u8; + count += 1; + } + + if count == 20 { + Ok(bytes) + } else { + Err(anyhow::anyhow!("Info hash length only {} bytes: {}", count, line)) + } + } + + pub fn update_from_path(&mut self, path: &PathBuf) -> anyhow::Result<()> { + let file = File::open(path)?; + let reader = BufReader::new(file); + + self.0.clear(); + + for line in reader.lines() { + self.0.insert(Self::parse_line_to_info_hash(line?)?); + } + + + Ok(()) + } + + pub fn contains(&self, info_hash_bytes: &[u8; 20]) -> bool { + self.0.contains(info_hash_bytes) + } +} /// Peer or connection valid until this instant /// diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 83e3385..b7eba08 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -6,7 +6,7 @@ use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; -pub use aquatic_common::ValidUntil; +pub use aquatic_common::{AccessList, ValidUntil}; pub use aquatic_udp_protocol::*; pub const MAX_PACKET_SIZE: usize = 4096; @@ -121,6 +121,7 @@ pub struct Statistics { pub struct State { pub connections: Arc>, pub torrents: Arc>, + pub access_list: Arc>, pub statistics: Arc, } @@ -129,6 +130,7 @@ impl Default for State { Self { connections: Arc::new(Mutex::new(HashMap::new())), torrents: Arc::new(Mutex::new(TorrentMaps::default())), + access_list: Arc::new(Mutex::new(AccessList::default())), statistics: Arc::new(Statistics::default()), } } diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index cb75ea9..3db9156 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; +use aquatic_common::AccessListConfig; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -21,6 +22,7 @@ pub struct Config { pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, + pub access_list: AccessListConfig, } impl aquatic_cli_helpers::Config for Config { @@ -113,6 +115,7 @@ impl Default for Config { statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), + access_list: AccessListConfig::default(), } } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index 52f800b..d2e93aa 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -79,6 +79,8 @@ pub fn run_request_worker( &mut responses, ); + // Check announce and scrape requests for valid connection + announce_requests.retain(|(request, src)| { let connection_key = ConnectionKey { connection_id: request.connection_id, @@ -95,7 +97,7 @@ pub fn run_request_worker( responses.push((response.into(), *src)); - false + return false; } }); @@ -121,6 +123,46 @@ pub fn run_request_worker( ::std::mem::drop(connections); + // Check announce requests for allowed info hash + + let access_list: MutexGuard = state.access_list.lock(); + + announce_requests.retain(|(request, src)| { + match config.access_list.list_type { + aquatic_common::AccessListType::Allow => { + if !access_list.contains(&request.info_hash.0) { + let response = ErrorResponse { + transaction_id: request.transaction_id, + message: "Forbidden info hash".to_string(), + }; + + responses.push((response.into(), *src)); + + return false; + } + }, + aquatic_common::AccessListType::Deny => { + if access_list.contains(&request.info_hash.0) { + let response = ErrorResponse { + transaction_id: request.transaction_id, + message: "Forbidden info hash".to_string(), + }; + + responses.push((response.into(), *src)); + + return false; + } + }, + aquatic_common::AccessListType::Ignore => {}, + } + + true + }); + + ::std::mem::drop(access_list); + + // Handle announce and scrape requests + if !(announce_requests.is_empty() && scrape_requests.is_empty()) { let mut torrents = state.torrents.lock(); diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index bdc2159..4322006 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -31,8 +31,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { if sockets == config.socket_workers { PrivDrop::default() - .chroot(config.privileges.chroot_path) - .user(config.privileges.user) + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) .apply()?; break; @@ -45,7 +45,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_connections_and_torrents(&state); + tasks::clean_connections_and_torrents(&config, &state); } } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 8a1a153..795d033 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -3,10 +3,12 @@ use std::time::Instant; use histogram::Histogram; +use aquatic_common::AccessListType; + use crate::common::*; use crate::config::Config; -pub fn clean_connections_and_torrents(state: &State) { +pub fn clean_connections_and_torrents(config: &Config, state: &State) { let now = Instant::now(); { @@ -16,14 +18,47 @@ pub fn clean_connections_and_torrents(state: &State) { connections.shrink_to_fit(); } - let mut torrents = state.torrents.lock(); + match config.access_list.list_type { + AccessListType::Allow => { + let mut access_list = state.access_list.lock(); - clean_torrent_map(&mut torrents.ipv4, now); - clean_torrent_map(&mut torrents.ipv6, now); + access_list.update_from_path(&config.access_list.path); + + let mut torrents = state.torrents.lock(); + + torrents.ipv4.retain(|info_hash, _| access_list.contains(&info_hash.0)); + clean_torrent_map(&mut torrents.ipv4, now); + + torrents.ipv6.retain(|info_hash, _| access_list.contains(&info_hash.0)); + clean_torrent_map(&mut torrents.ipv6, now); + }, + AccessListType::Deny => { + let mut access_list = state.access_list.lock(); + + access_list.update_from_path(&config.access_list.path); + + let mut torrents = state.torrents.lock(); + + torrents.ipv4.retain(|info_hash, _| !access_list.contains(&info_hash.0)); + clean_torrent_map(&mut torrents.ipv4, now); + + torrents.ipv6.retain(|info_hash, _| !access_list.contains(&info_hash.0)); + clean_torrent_map(&mut torrents.ipv6, now); + }, + AccessListType::Ignore => { + let mut torrents = state.torrents.lock(); + + clean_torrent_map(&mut torrents.ipv4, now); + clean_torrent_map(&mut torrents.ipv6, now); + } + } } #[inline] -fn clean_torrent_map(torrents: &mut TorrentMap, now: Instant) { +fn clean_torrent_map( + torrents: &mut TorrentMap, + now: Instant +) { torrents.retain(|_, torrent| { let num_seeders = &mut torrent.num_seeders; let num_leechers = &mut torrent.num_leechers;