diff --git a/Cargo.lock b/Cargo.lock index 1814d40..1baa87a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,7 @@ dependencies = [ "futures-lite", "glommio", "hashbrown 0.11.2", + "hex", "histogram", "indexmap", "log", diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 93b8373..28e24e3 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -18,6 +18,12 @@ pub enum AccessListMode { Off, } +impl AccessListMode { + pub fn is_on(&self) -> bool { + !matches!(self, Self::Off) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AccessListConfig { pub mode: AccessListMode, @@ -36,54 +42,71 @@ impl Default for AccessListConfig { } } -pub struct AccessList(ArcSwap>); +#[derive(Default)] +pub struct AccessList(HashSet<[u8; 20]>); -impl Default for AccessList { - fn default() -> Self { - Self(ArcSwap::from(Arc::new(HashSet::default()))) +impl AccessList { + pub fn insert_from_line(&mut self, line: &str) -> anyhow::Result<()> { + self.0.insert(parse_info_hash(line)?); + + Ok(()) + } + pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool { + match mode { + AccessListMode::White => self.0.contains(info_hash), + AccessListMode::Black => !self.0.contains(info_hash), + AccessListMode::Off => true, + } } } -impl AccessList { - fn parse_info_hash(line: String) -> anyhow::Result<[u8; 20]> { - let mut bytes = [0u8; 20]; +pub trait AccessListQuery { + fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>; + fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; +} - hex::decode_to_slice(line, &mut bytes)?; +pub type AccessListArcSwap = ArcSwap; - Ok(bytes) - } - - pub fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { +impl AccessListQuery for AccessListArcSwap { + fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { let file = File::open(path)?; let reader = BufReader::new(file); let mut new_list = HashSet::new(); for line in reader.lines() { - new_list.insert(Self::parse_info_hash(line?)?); + new_list.insert(parse_info_hash(&line?)?); } - self.0.store(Arc::new(new_list)); + self.store(Arc::new(AccessList(new_list))); Ok(()) } - pub fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { - match list_mode { - AccessListMode::White => self.0.load().contains(info_hash_bytes), - AccessListMode::Black => !self.0.load().contains(info_hash_bytes), + fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { + match mode { + AccessListMode::White => self.load().0.contains(info_hash_bytes), + AccessListMode::Black => !self.load().0.contains(info_hash_bytes), AccessListMode::Off => true, } } } +fn parse_info_hash(line: &str) -> anyhow::Result<[u8; 20]> { + let mut bytes = [0u8; 20]; + + hex::decode_to_slice(line, &mut bytes)?; + + Ok(bytes) +} + #[cfg(test)] mod tests { use super::*; #[test] fn test_parse_info_hash() { - let f = AccessList::parse_info_hash; + let f = parse_info_hash; assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok()); assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err()); diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index e36e390..e4a9832 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{AccessList, AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; use either::Either; use hashbrown::HashMap; @@ -165,7 +165,7 @@ impl TorrentMaps { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrent_maps: Arc>, } diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 2662b2b..3f96ef2 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -31,7 +31,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { tasks::update_access_list(&config, &state); - state.torrent_maps.lock().clean(&config, &state.access_list); + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); } } diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index c14ed92..dd7d889 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use aquatic_http_protocol::request::Request; use hashbrown::HashMap; use log::{debug, error, info}; diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs index 3273ebd..341a434 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/tasks.rs @@ -1,6 +1,6 @@ use histogram::Histogram; -use aquatic_common::access_list::AccessListMode; +use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use crate::{common::*, config::Config}; diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 8eb57c0..a1bd561 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -21,6 +21,7 @@ aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" crossbeam-channel = "0.5" hashbrown = "0.11.2" +hex = "0.4" histogram = "0.6" indexmap = "1" log = "0.4" diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 5662e1f..d42951a 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,8 +1,10 @@ +use std::borrow::Borrow; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; +use aquatic_common::access_list::AccessListArcSwap; use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; @@ -127,19 +129,19 @@ pub struct TorrentMaps { impl TorrentMaps { /// Remove disallowed and inactive torrents - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean>(&mut self, config: &Config, access_list: T) { let now = Instant::now(); let access_list_mode = config.access_list.mode; self.ipv4.retain(|info_hash, torrent| { - access_list.allows(access_list_mode, &info_hash.0) + access_list.borrow().allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv4.shrink_to_fit(); self.ipv6.retain(|info_hash, torrent| { - access_list.allows(access_list_mode, &info_hash.0) + access_list.borrow().allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv6.shrink_to_fit(); @@ -183,7 +185,7 @@ pub struct Statistics { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrents: Arc>, pub statistics: Arc, } @@ -191,38 +193,13 @@ pub struct State { impl Default for State { fn default() -> Self { Self { - access_list: Arc::new(AccessList::default()), + access_list: Arc::new(AccessListArcSwap::default()), torrents: Arc::new(Mutex::new(TorrentMaps::default())), statistics: Arc::new(Statistics::default()), } } } -#[derive(Default)] -pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); - -impl ConnectionMap { - pub fn insert( - &mut self, - connection_id: ConnectionId, - socket_addr: SocketAddr, - valid_until: ValidUntil, - ) { - self.0.insert((connection_id, socket_addr), valid_until); - } - - pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { - self.0.contains_key(&(connection_id, socket_addr)) - } - - pub fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.0 > now); - self.0.shrink_to_fit(); - } -} - #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index 8b13789..833c99f 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -1 +1,30 @@ +use std::{net::SocketAddr, time::Instant}; +pub use aquatic_common::{access_list::AccessList, ValidUntil}; +pub use aquatic_udp_protocol::*; +use hashbrown::HashMap; + +#[derive(Default)] +pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); + +impl ConnectionMap { + pub fn insert( + &mut self, + connection_id: ConnectionId, + socket_addr: SocketAddr, + valid_until: ValidUntil, + ) { + self.0.insert((connection_id, socket_addr), valid_until); + } + + pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + self.0.contains_key(&(connection_id, socket_addr)) + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0 > now); + self.0.shrink_to_fit(); + } +} diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 9560920..a6770a6 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -3,10 +3,11 @@ use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; use std::time::Duration; -use futures_lite::{Stream, StreamExt}; +use futures_lite::{AsyncBufReadExt, Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::{enclose, prelude::*}; +use glommio::io::{BufferedFile, StreamReaderBuilder}; use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -24,19 +25,34 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); - let torrents= Rc::new(RefCell::new(TorrentMaps::default())); + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let access_list = Rc::new(RefCell::new(AccessList::default())); - async fn clean( - config: Config, - torrents: Rc>, - ) -> Option { - torrents.borrow_mut(); // .clean(config, access_list); + TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { + enclose!((config, torrents, access_list) move || async move { + if config.access_list.mode.is_on(){ + let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); - Some(Duration::from_secs(config.cleaning.interval)) - } + let mut reader = StreamReaderBuilder::new(access_list_file).build(); - TimerActionRepeat::repeat(enclose!((config, torrents) move || { - clean(config.clone(), torrents.clone()) + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + access_list.borrow_mut().insert_from_line(&buf).unwrap() // FIXME + }, + Err(err) => { + + } + } + } + } + + torrents.borrow_mut().clean(&config, &*access_list.borrow()); + + Some(Duration::from_secs(config.cleaning.interval)) + })() })); for (_, receiver) in request_receivers.streams() { diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 427ec61..9f007b0 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -15,6 +15,7 @@ use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index fa4d882..134427c 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use aquatic_common::access_list::{AccessList, AccessListMode}; +use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; pub mod common; pub mod config; @@ -15,7 +15,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { mio::run(config) } -pub fn update_access_list(config: &Config, access_list: &Arc) { +pub fn update_access_list(config: &Config, access_list: &Arc) { match config.access_list.mode { AccessListMode::White | AccessListMode::Black => { if let Err(err) = access_list.update_from_path(&config.access_list.path) { diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index a59f2ec..f75ce9f 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -1,9 +1,12 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; use std::thread::Builder; use std::time::Duration; +use std::{ + ops::Deref, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use anyhow::Context; use crossbeam_channel::unbounded; @@ -56,7 +59,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config, &state.access_list); - state.torrents.lock().clean(&config, &state.access_list); + state + .torrents + .lock() + .clean(&config, state.access_list.load_full().deref()); } } diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 7e79b0a..e755fdd 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -7,6 +7,7 @@ use std::sync::{ use std::time::Duration; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use crossbeam_channel::{Receiver, Sender}; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -15,6 +16,7 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 4f30d8b..3d6b1df 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{AccessList, AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -136,7 +136,7 @@ impl TorrentMaps { #[derive(Clone)] pub struct State { - pub access_list: Arc, + pub access_list: Arc, pub torrent_maps: Arc>, } diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index f6ee599..6141466 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -33,7 +33,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { tasks::update_access_list(&config, &state); - state.torrent_maps.lock().clean(&config, &state.access_list); + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); } } diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index f12859d..bbb293d 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -2,6 +2,7 @@ use std::io::ErrorKind; use std::time::Duration; use std::vec::Drain; +use aquatic_common::access_list::AccessListQuery; use crossbeam_channel::Receiver; use hashbrown::HashMap; use log::{debug, error, info}; diff --git a/aquatic_ws/src/lib/tasks.rs b/aquatic_ws/src/lib/tasks.rs index 55cb62d..f96426c 100644 --- a/aquatic_ws/src/lib/tasks.rs +++ b/aquatic_ws/src/lib/tasks.rs @@ -1,4 +1,4 @@ -use aquatic_common::access_list::AccessListMode; +use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use histogram::Histogram; use crate::common::*;