aquatic_udp: first access list implementation

This commit is contained in:
Joakim Frostegård 2021-10-15 02:08:57 +02:00
parent 1c94e201de
commit 8639f380f4
8 changed files with 166 additions and 10 deletions

5
Cargo.lock generated
View file

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3
[[package]] [[package]]
name = "addr2line" name = "addr2line"
version = "0.16.0" version = "0.16.0"
@ -72,8 +74,11 @@ dependencies = [
name = "aquatic_common" name = "aquatic_common"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"hashbrown 0.11.2",
"indexmap", "indexmap",
"rand", "rand",
"serde",
] ]
[[package]] [[package]]

View file

@ -11,5 +11,8 @@ repository = "https://github.com/greatest-ape/aquatic"
name = "aquatic_common" name = "aquatic_common"
[dependencies] [dependencies]
anyhow = "1"
hashbrown = "0.11.2"
indexmap = "1" indexmap = "1"
rand = { version = "0.8", features = ["small_rng"] } rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }

View file

@ -1,8 +1,74 @@
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::IpAddr; use std::net::IpAddr;
use std::path::PathBuf;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use hashbrown::HashSet;
use indexmap::IndexMap; use indexmap::IndexMap;
use rand::Rng; use rand::Rng;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AccessListType {
Allow,
Deny,
Ignore
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AccessListConfig {
pub path: PathBuf,
pub list_type: AccessListType,
}
impl Default for AccessListConfig {
fn default() -> Self {
Self {
path: "".into(),
list_type: AccessListType::Ignore,
}
}
}
#[derive(Default)]
pub struct AccessList(HashSet<[u8; 20]>);
impl AccessList {
fn parse_line_to_info_hash(line: String) -> anyhow::Result<[u8; 20]> {
let mut count = 0usize;
let mut bytes = [0u8; 20];
for (byte, c) in bytes.iter_mut().zip(line.chars()) {
*byte = c as u8;
count += 1;
}
if count == 20 {
Ok(bytes)
} else {
Err(anyhow::anyhow!("Info hash length only {} bytes: {}", count, line))
}
}
pub fn update_from_path(&mut self, path: &PathBuf) -> anyhow::Result<()> {
let file = File::open(path)?;
let reader = BufReader::new(file);
self.0.clear();
for line in reader.lines() {
self.0.insert(Self::parse_line_to_info_hash(line?)?);
}
Ok(())
}
pub fn contains(&self, info_hash_bytes: &[u8; 20]) -> bool {
self.0.contains(info_hash_bytes)
}
}
/// Peer or connection valid until this instant /// Peer or connection valid until this instant
/// ///

View file

@ -6,7 +6,7 @@ use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use parking_lot::Mutex; use parking_lot::Mutex;
pub use aquatic_common::ValidUntil; pub use aquatic_common::{AccessList, ValidUntil};
pub use aquatic_udp_protocol::*; pub use aquatic_udp_protocol::*;
pub const MAX_PACKET_SIZE: usize = 4096; pub const MAX_PACKET_SIZE: usize = 4096;
@ -121,6 +121,7 @@ pub struct Statistics {
pub struct State { pub struct State {
pub connections: Arc<Mutex<ConnectionMap>>, pub connections: Arc<Mutex<ConnectionMap>>,
pub torrents: Arc<Mutex<TorrentMaps>>, pub torrents: Arc<Mutex<TorrentMaps>>,
pub access_list: Arc<Mutex<AccessList>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
} }
@ -129,6 +130,7 @@ impl Default for State {
Self { Self {
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
torrents: Arc::new(Mutex::new(TorrentMaps::default())), torrents: Arc::new(Mutex::new(TorrentMaps::default())),
access_list: Arc::new(Mutex::new(AccessList::default())),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
} }
} }

View file

@ -1,5 +1,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use aquatic_common::AccessListConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use aquatic_cli_helpers::LogLevel; use aquatic_cli_helpers::LogLevel;
@ -21,6 +22,7 @@ pub struct Config {
pub statistics: StatisticsConfig, pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig,
} }
impl aquatic_cli_helpers::Config for Config { impl aquatic_cli_helpers::Config for Config {
@ -113,6 +115,7 @@ impl Default for Config {
statistics: StatisticsConfig::default(), statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(), cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(), privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
} }
} }
} }

View file

@ -79,6 +79,8 @@ pub fn run_request_worker(
&mut responses, &mut responses,
); );
// Check announce and scrape requests for valid connection
announce_requests.retain(|(request, src)| { announce_requests.retain(|(request, src)| {
let connection_key = ConnectionKey { let connection_key = ConnectionKey {
connection_id: request.connection_id, connection_id: request.connection_id,
@ -95,7 +97,7 @@ pub fn run_request_worker(
responses.push((response.into(), *src)); responses.push((response.into(), *src));
false return false;
} }
}); });
@ -121,6 +123,46 @@ pub fn run_request_worker(
::std::mem::drop(connections); ::std::mem::drop(connections);
// Check announce requests for allowed info hash
let access_list: MutexGuard<AccessList> = state.access_list.lock();
announce_requests.retain(|(request, src)| {
match config.access_list.list_type {
aquatic_common::AccessListType::Allow => {
if !access_list.contains(&request.info_hash.0) {
let response = ErrorResponse {
transaction_id: request.transaction_id,
message: "Forbidden info hash".to_string(),
};
responses.push((response.into(), *src));
return false;
}
},
aquatic_common::AccessListType::Deny => {
if access_list.contains(&request.info_hash.0) {
let response = ErrorResponse {
transaction_id: request.transaction_id,
message: "Forbidden info hash".to_string(),
};
responses.push((response.into(), *src));
return false;
}
},
aquatic_common::AccessListType::Ignore => {},
}
true
});
::std::mem::drop(access_list);
// Handle announce and scrape requests
if !(announce_requests.is_empty() && scrape_requests.is_empty()) { if !(announce_requests.is_empty() && scrape_requests.is_empty()) {
let mut torrents = state.torrents.lock(); let mut torrents = state.torrents.lock();

View file

@ -31,8 +31,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
if sockets == config.socket_workers { if sockets == config.socket_workers {
PrivDrop::default() PrivDrop::default()
.chroot(config.privileges.chroot_path) .chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user) .user(config.privileges.user.clone())
.apply()?; .apply()?;
break; break;
@ -45,7 +45,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
loop { loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); ::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::clean_connections_and_torrents(&state); tasks::clean_connections_and_torrents(&config, &state);
} }
} }

View file

@ -3,10 +3,12 @@ use std::time::Instant;
use histogram::Histogram; use histogram::Histogram;
use aquatic_common::AccessListType;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn clean_connections_and_torrents(state: &State) { pub fn clean_connections_and_torrents(config: &Config, state: &State) {
let now = Instant::now(); let now = Instant::now();
{ {
@ -16,14 +18,47 @@ pub fn clean_connections_and_torrents(state: &State) {
connections.shrink_to_fit(); connections.shrink_to_fit();
} }
match config.access_list.list_type {
AccessListType::Allow => {
let mut access_list = state.access_list.lock();
access_list.update_from_path(&config.access_list.path);
let mut torrents = state.torrents.lock();
torrents.ipv4.retain(|info_hash, _| access_list.contains(&info_hash.0));
clean_torrent_map(&mut torrents.ipv4, now);
torrents.ipv6.retain(|info_hash, _| access_list.contains(&info_hash.0));
clean_torrent_map(&mut torrents.ipv6, now);
},
AccessListType::Deny => {
let mut access_list = state.access_list.lock();
access_list.update_from_path(&config.access_list.path);
let mut torrents = state.torrents.lock();
torrents.ipv4.retain(|info_hash, _| !access_list.contains(&info_hash.0));
clean_torrent_map(&mut torrents.ipv4, now);
torrents.ipv6.retain(|info_hash, _| !access_list.contains(&info_hash.0));
clean_torrent_map(&mut torrents.ipv6, now);
},
AccessListType::Ignore => {
let mut torrents = state.torrents.lock(); let mut torrents = state.torrents.lock();
clean_torrent_map(&mut torrents.ipv4, now); clean_torrent_map(&mut torrents.ipv4, now);
clean_torrent_map(&mut torrents.ipv6, now); clean_torrent_map(&mut torrents.ipv6, now);
}
}
} }
#[inline] #[inline]
fn clean_torrent_map<I: Ip>(torrents: &mut TorrentMap<I>, now: Instant) { fn clean_torrent_map<I: Ip>(
torrents: &mut TorrentMap<I>,
now: Instant
) {
torrents.retain(|_, torrent| { torrents.retain(|_, torrent| {
let num_seeders = &mut torrent.num_seeders; let num_seeders = &mut torrent.num_seeders;
let num_leechers = &mut torrent.num_leechers; let num_leechers = &mut torrent.num_leechers;