diff --git a/Cargo.lock b/Cargo.lock index 1f2308c..742e3c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,7 @@ dependencies = [ "rand", "rustls-pemfile", "serde", + "signal-hook", "slab", "smartstring", ] @@ -258,6 +259,7 @@ dependencies = [ "rand", "rustls-pemfile", "serde", + "signal-hook", "slab", "tungstenite", ] diff --git a/README.md b/README.md index ef7075e..5b317aa 100644 --- a/README.md +++ b/README.md @@ -13,11 +13,11 @@ of sub-implementations for different protocols: [mio]: https://github.com/tokio-rs/mio [glommio]: https://github.com/DataDog/glommio -| Name | Protocol | OS requirements | -|--------------|--------------------------------------------|-----------------------------------------------------------------| -| aquatic_udp | [BitTorrent over UDP] | Cross-platform with [mio] (default) / Linux 5.8+ with [glommio] | -| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ | -| aquatic_ws | [WebTorrent] with TLS (rustls) | Linux 5.8+ | +| Name | Protocol | OS requirements | +|--------------|--------------------------------------------|------------------------------------------------------------| +| aquatic_udp | [BitTorrent over UDP] | Unix-like with [mio] (default) / Linux 5.8+ with [glommio] | +| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ | +| aquatic_ws | [WebTorrent] with TLS (rustls) | Linux 5.8+ | ## Usage @@ -98,6 +98,8 @@ mode = 'off' # Change to 'black' (blacklist) or 'white' (whitelist) path = '' # Path to text file with newline-delimited hex-encoded info hashes ``` +The file is read on start and when the program receives `SIGUSR1`. + #### More information More documentation of the various configuration options might be available diff --git a/TODO.md b/TODO.md index a4b2b1c..8b6e237 100644 --- a/TODO.md +++ b/TODO.md @@ -1,19 +1,18 @@ # TODO * readme - * document privilige dropping, cpu pinning + * document privilege dropping and cpu pinning * socket_recv_size and ipv6_only in glommio implementations * config: fail on unrecognized keys -* access lists: - * use signals to reload, use arcswap everywhere - * use arc-swap Cache? - * add CI tests +* CI + * test both aquatic_udp implementations + * test access lists? + * cargo-deny * aquatic_udp - * CI for both implementations * glommio * consider sending local responses immediately * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) @@ -21,26 +20,22 @@ response * aquatic_http: - * optimize? - * get_peer_addr only once (takes 1.2% of runtime) - * queue response: allocating takes 2.8% of runtime * clean out connections regularly * Rc> which get set on successful request parsing and successful response sending. Clone kept in connection slab which gets cleaned periodically (= cancel tasks). Means that task handle will need to be stored in slab. Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive. * handle panicked/cancelled tasks? - * load test: use futures-rustls + * optimize? + * get_peer_addr only once (takes 1.2% of runtime) + * queue response: allocating takes 2.8% of runtime + * use futures-rustls for load test * consider better error type for request parsing, so that better error messages can be sent back (e.g., "full scrapes are not supported") - * Scrape: should stats with only zeroes be sent back for non-registered info hashes? - Relevant for mio implementation too. * aquatic_ws * load test cpu pinning * test with multiple socket and request workers - * should it send back error on message parse error, or does that - just indicate that not enough data has been received yet? ## Less important @@ -92,30 +87,7 @@ # Don't do -## General - profile-guided optimization - -Doesn't seem to improve performance, possibly because I only got it to compile -with thin LTO which could have impacted performance. Running non-pgo version -without AVX-512 seems to be the fastest, although the presence of a ctrl-c handler -(meaning the addition of a thread) might have worsed performance in pgo version -(unlikely). - -Benchmarks of aquatic_udp with and without PGO. On hetzer 16x vCPU. 8 workers -just like best results in last benchmark, multiple client ips=true: - -### target-cpu=native (probably with avx512 since such features are listed in /proc/cpuinfo), all with thin lto -* With PGO on aquatic_udp: 370k, without 363k responses per second -* With PGO on both aquatic_udp and aquatic_udp_load_test: 368k - -### with target-cpu=skylake, all with thin lto -* with pgo on aquatic_udp: 400k -* with no pgo: 394k - -### checkout master (no pgo, no thin lto, no ctrlc handler) - -* target-cpu=native: 394k -* target-cpu=skylake: 439k -* no target-cpu set: 388k +* general: PGO didn't seem to help way back ## aquatic_http * request from path: @@ -126,24 +98,7 @@ just like best results in last benchmark, multiple client ips=true: there. then iter over space newlines/just take relevant data. Not faster than httparse and a lot worse -## aquatic_http / aquatic_ws -* Shared state for HTTP with and without TLS. Peers who announce over TLS - should be able to expect that someone snooping on the connection can't - connect them to a info hash. If someone receives their IP in a response - while announcing without TLS, this expectation would be broken. - -## aquatic_udp -* Other HashMap hashers (such as SeaHash): seemingly not worthwhile, see - `https://github.com/tkaitchuck/aHash` -* `sendmmsg`: can't send to multiple socket addresses, so doesn't help -* Config behind Arc in state: it is likely better to be able to pass it around - without state -* Responses: make vectors iterator references so we dont have run .collect(). - Doesn't work since it means conversion to bytes must be done while holding - readable reference to entry in torrent map, hurting concurrency. - ## aquatic_udp_protocol * Use `bytes` crate: seems to worsen performance somewhat * Zerocopy (https://docs.rs/zerocopy/0.3.0/zerocopy/index.html) for requests and responses? Doesn't work on Vec etc -* New array buffer each time in response_to_bytes: doesn't help performance diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index f71f967..54cfd21 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -4,7 +4,7 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::Context; -use arc_swap::ArcSwap; +use arc_swap::{ArcSwap, Cache}; use hashbrown::HashSet; use serde::{Deserialize, Serialize}; @@ -85,6 +85,7 @@ pub trait AccessListQuery { } pub type AccessListArcSwap = ArcSwap; +pub type AccessListCache = Cache, Arc>; impl AccessListQuery for AccessListArcSwap { fn update(&self, config: &AccessListConfig) -> anyhow::Result<()> { @@ -102,6 +103,30 @@ impl AccessListQuery for AccessListArcSwap { } } +pub fn create_access_list_cache(arc_swap: &Arc) -> AccessListCache { + Cache::from(Arc::clone(arc_swap)) +} + +pub fn update_access_list( + config: &AccessListConfig, + access_list: &Arc, +) -> anyhow::Result<()> { + if config.mode.is_on() { + match access_list.update(config) { + Ok(()) => { + ::log::info!("Access list updated") + } + Err(err) => { + ::log::error!("Updating access list failed: {:#}", err); + + return Err(err); + } + } + } + + Ok(()) +} + fn parse_info_hash(line: &str) -> anyhow::Result<[u8; 20]> { let mut bytes = [0u8; 20]; @@ -123,4 +148,37 @@ mod tests { assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeee".into()).is_err()); assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeö".into()).is_err()); } + + #[test] + fn test_cache_allows() { + let mut access_list = AccessList::default(); + + let a = parse_info_hash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap(); + let b = parse_info_hash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(); + let c = parse_info_hash("cccccccccccccccccccccccccccccccccccccccc").unwrap(); + + access_list.0.insert(a); + access_list.0.insert(b); + + let access_list = Arc::new(ArcSwap::new(Arc::new(access_list))); + + let mut access_list_cache = Cache::new(Arc::clone(&access_list)); + + assert!(access_list_cache.load().allows(AccessListMode::White, &a)); + assert!(access_list_cache.load().allows(AccessListMode::White, &b)); + assert!(!access_list_cache.load().allows(AccessListMode::White, &c)); + + assert!(!access_list_cache.load().allows(AccessListMode::Black, &a)); + assert!(!access_list_cache.load().allows(AccessListMode::Black, &b)); + assert!(access_list_cache.load().allows(AccessListMode::Black, &c)); + + assert!(access_list_cache.load().allows(AccessListMode::Off, &a)); + assert!(access_list_cache.load().allows(AccessListMode::Off, &b)); + assert!(access_list_cache.load().allows(AccessListMode::Off, &c)); + + access_list.store(Arc::new(AccessList::default())); + + assert!(access_list_cache.load().allows(AccessListMode::Black, &a)); + assert!(access_list_cache.load().allows(AccessListMode::Black, &b)); + } } diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index b3f9dca..bc0c43b 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -37,6 +37,7 @@ privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } rustls-pemfile = "0.2" serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } slab = "0.4" smartstring = "0.2" diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index df1dfce..19e552f 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -1,7 +1,8 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use either::Either; use hashbrown::HashMap; use indexmap::IndexMap; @@ -14,14 +15,6 @@ use aquatic_http_protocol::response::ResponsePeer; use crate::config::Config; -use std::borrow::Borrow; -use std::cell::RefCell; -use std::rc::Rc; - -use futures_lite::AsyncBufReadExt; -use glommio::io::{BufferedFile, StreamReaderBuilder}; -use glommio::prelude::*; - use aquatic_http_protocol::{ request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse}, @@ -80,48 +73,6 @@ impl ChannelResponse { } } -pub async fn update_access_list>( - config: C, - access_list: Rc>, -) { - if config.borrow().access_list.mode.is_on() { - match BufferedFile::open(&config.borrow().access_list.path).await { - Ok(file) => { - let mut reader = StreamReaderBuilder::new(file).build(); - let mut new_access_list = AccessList::default(); - - loop { - let mut buf = String::with_capacity(42); - - match reader.read_line(&mut buf).await { - Ok(_) => { - if let Err(err) = new_access_list.insert_from_line(&buf) { - ::log::error!( - "Couln't parse access list line '{}': {:?}", - buf, - err - ); - } - } - Err(err) => { - ::log::error!("Couln't read access list line {:?}", err); - - break; - } - } - - yield_if_needed().await; - } - - *access_list.borrow_mut() = new_access_list; - } - Err(err) => { - ::log::error!("Couldn't open access list file: {:?}", err) - } - }; - } -} - pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} impl Ip for Ipv4Addr {} @@ -218,20 +169,25 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &AccessList) { - Self::clean_torrent_map(config, access_list, &mut self.ipv4); - Self::clean_torrent_map(config, access_list, &mut self.ipv6); + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let mut access_list_cache = create_access_list_cache(access_list); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); } fn clean_torrent_map( config: &Config, - access_list: &AccessList, + access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, ) { let now = Instant::now(); torrent_map.retain(|info_hash, torrent_data| { - if !access_list.allows(config.access_list.mode, &info_hash.0) { + if !access_list_cache + .load() + .allows(config.access_list.mode, &info_hash.0) + { return false; } @@ -263,6 +219,11 @@ impl TorrentMaps { } } +#[derive(Default, Clone)] +pub struct State { + pub access_list: Arc, +} + pub fn num_digits_in_usize(mut number: usize) -> usize { let mut num_digits = 1usize; diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index e2e3131..09e690c 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -57,11 +57,9 @@ pub struct ProtocolConfig { #[serde(default)] pub struct CleaningConfig { /// Clean peers this often (seconds) - pub interval: u64, + pub torrent_cleaning_interval: u64, /// Remove peers that haven't announced for this long (seconds) pub max_peer_age: u64, - /// Remove connections that are older than this (seconds) - pub max_connection_age: u64, } impl Default for Config { @@ -105,9 +103,8 @@ impl Default for ProtocolConfig { impl Default for CleaningConfig { fn default() -> Self { Self { - interval: 30, + torrent_cleaning_interval: 30, max_peer_age: 1800, - max_connection_age: 1800, } } } diff --git a/aquatic_http/src/lib/handlers.rs b/aquatic_http/src/lib/handlers.rs index d30fe64..bdd9239 100644 --- a/aquatic_http/src/lib/handlers.rs +++ b/aquatic_http/src/lib/handlers.rs @@ -12,7 +12,6 @@ use std::cell::RefCell; use std::rc::Rc; use std::time::Duration; -use aquatic_common::access_list::AccessList; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::timer::TimerActionRepeat; @@ -25,9 +24,9 @@ use crate::config::Config; pub async fn run_request_worker( config: Config, + state: State, request_mesh_builder: MeshBuilder, response_mesh_builder: MeshBuilder, - access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); @@ -35,16 +34,14 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(access_list)); + let access_list = state.access_list; - // Periodically clean torrents and update access list + // Periodically clean torrents TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - update_access_list(&config, access_list.clone()).await; + torrents.borrow_mut().clean(&config, &access_list); - torrents.borrow_mut().clean(&config, &*access_list.borrow()); - - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() })); diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 690df06..ef584e5 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -4,9 +4,12 @@ use std::{ sync::{atomic::AtomicUsize, Arc}, }; -use aquatic_common::{access_list::AccessList, privileges::drop_privileges_after_socket_binding}; -use common::TlsConfig; +use aquatic_common::{ + access_list::update_access_list, privileges::drop_privileges_after_socket_binding, +}; +use common::{State, TlsConfig}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; use crate::config::Config; @@ -19,18 +22,44 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; -pub fn run(config: Config) -> anyhow::Result<()> { +pub fn run(config: Config) -> ::anyhow::Result<()> { if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { id: config.cpu_pinning.offset, }); } - let access_list = if config.access_list.mode.is_on() { - AccessList::create_from_path(&config.access_list.path).expect("Load access list") - } else { - AccessList::default() - }; + let state = State::default(); + + update_access_list(&config.access_list, &state.access_list)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } let num_peers = config.socket_workers + config.request_workers; @@ -45,11 +74,11 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.socket_workers) { let config = config.clone(); + let state = state.clone(); let tls_config = tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -60,11 +89,11 @@ pub fn run(config: Config) -> anyhow::Result<()> { let executor = builder.spawn(|| async move { network::run_socket_worker( config, + state, tls_config, request_mesh_builder, response_mesh_builder, num_bound_sockets, - access_list, ) .await }); @@ -74,9 +103,9 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.request_workers) { let config = config.clone(); + let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -85,13 +114,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let executor = builder.spawn(|| async move { - handlers::run_request_worker( - config, - request_mesh_builder, - response_mesh_builder, - access_list, - ) - .await + handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) + .await }); executors.push(executor); diff --git a/aquatic_http/src/lib/network.rs b/aquatic_http/src/lib/network.rs index 6970343..51ed339 100644 --- a/aquatic_http/src/lib/network.rs +++ b/aquatic_http/src/lib/network.rs @@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ @@ -41,29 +41,16 @@ struct ConnectionReference { response_sender: LocalSender, } -struct Connection { - config: Rc, - access_list: Rc>, - request_senders: Rc>, - response_receiver: LocalReceiver, - response_consumer_id: ConsumerId, - stream: TlsStream, - peer_addr: SocketAddr, - connection_id: ConnectionId, - request_buffer: [u8; MAX_REQUEST_SIZE], - request_buffer_position: usize, -} - pub async fn run_socket_worker( config: Config, + state: State, tls_config: Arc, request_mesh_builder: MeshBuilder, response_mesh_builder: MeshBuilder, num_bound_sockets: Arc, - access_list: AccessList, ) { let config = Rc::new(config); - let access_list = Rc::new(RefCell::new(access_list)); + let access_list = state.access_list; let listener = TcpListener::bind(config.network.address).expect("bind socket"); num_bound_sockets.fetch_add(1, Ordering::SeqCst); @@ -77,15 +64,6 @@ pub async fn run_socket_worker( let connection_slab = Rc::new(RefCell::new(Slab::new())); let connections_to_remove = Rc::new(RefCell::new(Vec::new())); - // Periodically update access list - TimerActionRepeat::repeat(enclose!((config, access_list) move || { - enclose!((config, access_list) move || async move { - update_access_list(config.clone(), access_list.clone()).await; - - Some(Duration::from_secs(config.cleaning.interval)) - })() - })); - // Periodically remove closed connections TimerActionRepeat::repeat( enclose!((config, connection_slab, connections_to_remove) move || { @@ -158,7 +136,9 @@ async fn remove_closed_connections( } } - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )) } async fn receive_responses( @@ -177,10 +157,23 @@ async fn receive_responses( } } +struct Connection { + config: Rc, + access_list_cache: AccessListCache, + request_senders: Rc>, + response_receiver: LocalReceiver, + response_consumer_id: ConsumerId, + stream: TlsStream, + peer_addr: SocketAddr, + connection_id: ConnectionId, + request_buffer: [u8; MAX_REQUEST_SIZE], + request_buffer_position: usize, +} + impl Connection { async fn run( config: Rc, - access_list: Rc>, + access_list: Arc, request_senders: Rc>, response_receiver: LocalReceiver, response_consumer_id: ConsumerId, @@ -197,7 +190,7 @@ impl Connection { let mut conn = Connection { config: config.clone(), - access_list: access_list.clone(), + access_list_cache: create_access_list_cache(&access_list), request_senders: request_senders.clone(), response_receiver, response_consumer_id, @@ -297,14 +290,14 @@ impl Connection { /// response /// - If it is a scrape requests, split it up, pass on the parts to /// relevant request workers and await a response - async fn handle_request(&self, request: Request) -> anyhow::Result { + async fn handle_request(&mut self, request: Request) -> anyhow::Result { match request { Request::Announce(request) => { let info_hash = request.info_hash; if self - .access_list - .borrow() + .access_list_cache + .load() .allows(self.config.access_list.mode, &info_hash.0) { let request = ChannelRequest::Announce { diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index cf47c8c..d448f5a 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,8 +1,9 @@ -use std::borrow::Borrow; use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::sync::Arc; use std::time::Instant; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -107,19 +108,24 @@ pub struct TorrentMaps { impl TorrentMaps { /// Remove disallowed and inactive torrents - pub fn clean>(&mut self, config: &Config, access_list: T) { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { let now = Instant::now(); - let access_list_mode = config.access_list.mode; + let mut access_list_cache = create_access_list_cache(access_list); + self.ipv4.retain(|info_hash, torrent| { - access_list.borrow().allows(access_list_mode, &info_hash.0) + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv4.shrink_to_fit(); self.ipv6.retain(|info_hash, torrent| { - access_list.borrow().allows(access_list_mode, &info_hash.0) + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv6.shrink_to_fit(); @@ -149,6 +155,8 @@ impl TorrentMaps { keep }); + torrent.peers.shrink_to_fit(); + !torrent.peers.is_empty() } } diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index dbda176..58e8446 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -91,12 +91,14 @@ pub struct StatisticsConfig { #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct CleaningConfig { - /// Update access list and clean torrents this often (seconds) - pub interval: u64, - /// Remove peers that haven't announced for this long (seconds) - pub max_peer_age: u64, + /// Clean connections this often (seconds) + pub connection_cleaning_interval: u64, + /// Clean torrents this often (seconds) + pub torrent_cleaning_interval: u64, /// Remove connections that are older than this (seconds) pub max_connection_age: u64, + /// Remove peers that haven't announced for this long (seconds) + pub max_peer_age: u64, } impl Default for Config { @@ -160,9 +162,10 @@ impl Default for StatisticsConfig { impl Default for CleaningConfig { fn default() -> Self { Self { - interval: 30, - max_peer_age: 60 * 20, + connection_cleaning_interval: 60, + torrent_cleaning_interval: 60 * 2, max_connection_age: 60 * 5, + max_peer_age: 60 * 20, } } } diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 4b3d80d..3506b09 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -1,52 +1,8 @@ -use std::borrow::Borrow; -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::Arc; -use futures_lite::AsyncBufReadExt; -use glommio::io::{BufferedFile, StreamReaderBuilder}; -use glommio::prelude::*; +use aquatic_common::access_list::AccessListArcSwap; -use crate::common::*; -use crate::config::Config; - -pub async fn update_access_list>( - config: C, - access_list: Rc>, -) { - if config.borrow().access_list.mode.is_on() { - match BufferedFile::open(&config.borrow().access_list.path).await { - Ok(file) => { - let mut reader = StreamReaderBuilder::new(file).build(); - let mut new_access_list = AccessList::default(); - - loop { - let mut buf = String::with_capacity(42); - - match reader.read_line(&mut buf).await { - Ok(_) => { - if let Err(err) = new_access_list.insert_from_line(&buf) { - ::log::error!( - "Couln't parse access list line '{}': {:?}", - buf, - err - ); - } - } - Err(err) => { - ::log::error!("Couln't read access list line {:?}", err); - - break; - } - } - - yield_if_needed().await; - } - - *access_list.borrow_mut() = new_access_list; - } - Err(err) => { - ::log::error!("Couldn't open access list file: {:?}", err) - } - }; - } +#[derive(Default, Clone)] +pub struct State { + pub access_list: Arc, } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 3bfbc0d..fbfbcb2 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -14,30 +14,27 @@ use crate::common::handlers::handle_announce_request; use crate::common::handlers::*; use crate::common::*; use crate::config::Config; -use crate::glommio::common::update_access_list; + +use super::common::State; pub async fn run_request_worker( config: Config, + state: State, request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, - access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let response_senders = Rc::new(response_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(access_list)); - // Periodically clean torrents and update access list - TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { - enclose!((config, torrents, access_list) move || async move { - update_access_list(config.clone(), access_list.clone()).await; + // Periodically clean torrents + TimerActionRepeat::repeat(enclose!((config, torrents, state) move || { + enclose!((config, torrents, state) move || async move { + torrents.borrow_mut().clean(&config, &state.access_list); - torrents.borrow_mut().clean(&config, &*access_list.borrow()); - - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() })); diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 72a8eb8..6836ed2 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,30 +1,60 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::update_access_list; use aquatic_common::privileges::drop_privileges_after_socket_binding; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; +use signal_hook::consts::SIGUSR1; +use signal_hook::iterator::Signals; use crate::config::Config; +use self::common::State; + mod common; pub mod handlers; pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; -pub fn run(config: Config) -> anyhow::Result<()> { +pub fn run(config: Config) -> ::anyhow::Result<()> { if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { id: config.cpu_pinning.offset, }); } - let access_list = if config.access_list.mode.is_on() { - AccessList::create_from_path(&config.access_list.path).expect("Load access list") - } else { - AccessList::default() - }; + let state = State::default(); + + update_access_list(&config.access_list, &state.access_list)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } let num_peers = config.socket_workers + config.request_workers; @@ -37,10 +67,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.socket_workers) { let config = config.clone(); + let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -51,10 +81,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { let executor = builder.spawn(|| async move { network::run_socket_worker( config, + state, request_mesh_builder, response_mesh_builder, num_bound_sockets, - access_list, ) .await }); @@ -64,9 +94,9 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.request_workers) { let config = config.clone(); + let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -75,13 +105,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let executor = builder.spawn(|| async move { - handlers::run_request_worker( - config, - request_mesh_builder, - response_mesh_builder, - access_list, - ) - .await + handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) + .await }); executors.push(executor); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index cb046c0..df5d186 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -9,6 +9,7 @@ use std::sync::{ }; use std::time::{Duration, Instant}; +use aquatic_common::access_list::create_access_list_cache; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalSender}; @@ -21,7 +22,7 @@ use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; -use super::common::update_access_list; +use super::common::State; use crate::common::handlers::*; use crate::common::network::ConnectionMap; @@ -99,10 +100,10 @@ impl PendingScrapeResponses { pub async fn run_socket_worker( config: Config, + state: State, request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, num_bound_sockets: Arc, - access_list: AccessList, ) { let (local_sender, local_receiver) = new_unbounded(); @@ -126,22 +127,22 @@ pub async fn run_socket_worker( let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); // Periodically clean pending_scrape_responses - TimerActionRepeat::repeat(enclose!((config, pending_scrape_responses) move || { - enclose!((config, pending_scrape_responses) move || async move { + TimerActionRepeat::repeat(enclose!((pending_scrape_responses) move || { + enclose!((pending_scrape_responses) move || async move { pending_scrape_responses.borrow_mut().clean(); - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs(120)) })() })); spawn_local(enclose!((pending_scrape_responses) read_requests( config.clone(), + state, request_senders, response_consumer_index, local_sender, socket.clone(), pending_scrape_responses, - access_list, ))) .detach(); @@ -159,12 +160,12 @@ pub async fn run_socket_worker( async fn read_requests( config: Config, + state: State, request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>, response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, pending_scrape_responses: Rc>, - access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); @@ -174,8 +175,8 @@ async fn read_requests( let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); - let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); + let mut access_list_cache = create_access_list_cache(&state.access_list); // Periodically update connection_valid_until TimerActionRepeat::repeat(enclose!((connection_valid_until) move || { @@ -195,21 +196,12 @@ async fn read_requests( })() })); - // Periodically update access list - TimerActionRepeat::repeat(enclose!((config, access_list) move || { - enclose!((config, access_list) move || async move { - update_access_list(config.clone(), access_list.clone()).await; - - Some(Duration::from_secs(config.cleaning.interval)) - })() - })); - // Periodically clean connections TimerActionRepeat::repeat(enclose!((config, connections) move || { enclose!((config, connections) move || async move { connections.borrow_mut().clean(); - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs(config.cleaning.connection_cleaning_interval)) })() })); @@ -241,8 +233,8 @@ async fn read_requests( } Ok(Request::Announce(request)) => { if connections.borrow().contains(request.connection_id, src) { - if access_list - .borrow() + if access_list_cache + .load() .allows(access_list_mode, &request.info_hash.0) { let request_consumer_index = diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 4104d47..f21f9e7 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -1,15 +1,12 @@ +use std::sync::{atomic::AtomicUsize, Arc}; use std::thread::Builder; use std::time::Duration; -use std::{ - ops::Deref, - sync::{atomic::AtomicUsize, Arc}, -}; use anyhow::Context; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; -use aquatic_common::access_list::AccessListQuery; +use aquatic_common::access_list::update_access_list; use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; @@ -31,7 +28,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); - update_access_list(&config, &state)?; + update_access_list(&config.access_list, &state.access_list)?; let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; @@ -45,7 +42,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { for signal in &mut signals { match signal { SIGUSR1 => { - let _ = update_access_list(&config, &state); + let _ = update_access_list(&config.access_list, &state.access_list); } _ => unreachable!(), } @@ -144,28 +141,10 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { .unwrap(); loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + ::std::thread::sleep(Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )); - state - .torrents - .lock() - .clean(&config, state.access_list.load_full().deref()); + state.torrents.lock().clean(&config, &state.access_list); } } - -fn update_access_list(config: &Config, state: &State) -> anyhow::Result<()> { - if config.access_list.mode.is_on() { - match state.access_list.update(&config.access_list) { - Ok(()) => { - ::log::info!("Access list updated") - } - Err(err) => { - ::log::error!("Updating access list failed: {:#}", err); - - return Err(err); - } - } - } - - Ok(()) -} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 388d5f3..dfe00d6 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -7,7 +7,7 @@ use std::sync::{ use std::time::{Duration, Instant}; use std::vec::Drain; -use aquatic_common::access_list::AccessListQuery; +use aquatic_common::access_list::create_access_list_cache; use crossbeam_channel::{Receiver, Sender}; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -52,6 +52,8 @@ pub fn run_socket_worker( let timeout = Duration::from_millis(50); + let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval); + let mut iter_counter = 0usize; let mut last_cleaning = Instant::now(); @@ -88,7 +90,7 @@ pub fn run_socket_worker( if iter_counter % 32 == 0 { let now = Instant::now(); - if last_cleaning + Duration::from_secs(config.cleaning.interval) > now { + if now > last_cleaning + cleaning_duration { connections.clean(); last_cleaning = now; @@ -149,6 +151,8 @@ fn read_requests( let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; + let mut access_list_cache = create_access_list_cache(&state.access_list); + loop { match socket.recv_from(&mut buffer[..]) { Ok((amt, src)) => { @@ -176,8 +180,8 @@ fn read_requests( } Ok(Request::Announce(request)) => { if connections.contains(request.connection_id, src) { - if state - .access_list + if access_list_cache + .load() .allows(access_list_mode, &request.info_hash.0) { if let Err(err) = request_sender diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index af69a0a..f2a41bb 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -35,6 +35,7 @@ privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } rustls-pemfile = "0.2" serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } slab = "0.4" tungstenite = "0.15" diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 9ee675c..1b136d9 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -1,13 +1,8 @@ -use std::borrow::Borrow; -use std::cell::RefCell; use std::net::{IpAddr, SocketAddr}; -use std::rc::Rc; +use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::AccessList; -use futures_lite::AsyncBufReadExt; -use glommio::io::{BufferedFile, StreamReaderBuilder}; -use glommio::yield_if_needed; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -99,16 +94,25 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &AccessList) { - Self::clean_torrent_map(config, access_list, &mut self.ipv4); - Self::clean_torrent_map(config, access_list, &mut self.ipv6); + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let mut access_list_cache = create_access_list_cache(access_list); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); } - fn clean_torrent_map(config: &Config, access_list: &AccessList, torrent_map: &mut TorrentMap) { + fn clean_torrent_map( + config: &Config, + access_list_cache: &mut AccessListCache, + torrent_map: &mut TorrentMap, + ) { let now = Instant::now(); torrent_map.retain(|info_hash, torrent_data| { - if !access_list.allows(config.access_list.mode, &info_hash.0) { + if !access_list_cache + .load() + .allows(config.access_list.mode, &info_hash.0) + { return false; } @@ -140,44 +144,7 @@ impl TorrentMaps { } } -pub async fn update_access_list>( - config: C, - access_list: Rc>, -) { - if config.borrow().access_list.mode.is_on() { - match BufferedFile::open(&config.borrow().access_list.path).await { - Ok(file) => { - let mut reader = StreamReaderBuilder::new(file).build(); - let mut new_access_list = AccessList::default(); - - loop { - let mut buf = String::with_capacity(42); - - match reader.read_line(&mut buf).await { - Ok(_) => { - if let Err(err) = new_access_list.insert_from_line(&buf) { - ::log::error!( - "Couln't parse access list line '{}': {:?}", - buf, - err - ); - } - } - Err(err) => { - ::log::error!("Couln't read access list line {:?}", err); - - break; - } - } - - yield_if_needed().await; - } - - *access_list.borrow_mut() = new_access_list; - } - Err(err) => { - ::log::error!("Couldn't open access list file: {:?}", err) - } - }; - } +#[derive(Default, Clone)] +pub struct State { + pub access_list: Arc, } diff --git a/aquatic_ws/src/lib/config.rs b/aquatic_ws/src/lib/config.rs index ed9dd94..a73e870 100644 --- a/aquatic_ws/src/lib/config.rs +++ b/aquatic_ws/src/lib/config.rs @@ -59,7 +59,7 @@ pub struct ProtocolConfig { #[serde(default)] pub struct CleaningConfig { /// Clean peers this often (seconds) - pub interval: u64, + pub torrent_cleaning_interval: u64, /// Remove peers that haven't announced for this long (seconds) pub max_peer_age: u64, } @@ -106,7 +106,7 @@ impl Default for ProtocolConfig { impl Default for CleaningConfig { fn default() -> Self { Self { - interval: 30, + torrent_cleaning_interval: 30, max_peer_age: 1800, } } diff --git a/aquatic_ws/src/lib/handlers.rs b/aquatic_ws/src/lib/handlers.rs index 31e0388..cd64de3 100644 --- a/aquatic_ws/src/lib/handlers.rs +++ b/aquatic_ws/src/lib/handlers.rs @@ -2,7 +2,6 @@ use std::cell::RefCell; use std::rc::Rc; use std::time::Duration; -use aquatic_common::access_list::AccessList; use aquatic_common::extract_response_peers; use futures_lite::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; @@ -19,9 +18,9 @@ use crate::config::Config; pub async fn run_request_worker( config: Config, + state: State, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, - access_list: AccessList, ) { let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap(); let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap(); @@ -29,16 +28,14 @@ pub async fn run_request_worker( let out_message_senders = Rc::new(out_message_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(access_list)); + let access_list = state.access_list; - // Periodically clean torrents and update access list + // Periodically clean torrents TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - update_access_list(&config, access_list.clone()).await; + torrents.borrow_mut().clean(&config, &access_list); - torrents.borrow_mut().clean(&config, &*access_list.borrow()); - - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() })); diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index c1de1d2..b367bab 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -4,9 +4,12 @@ use std::{ sync::{atomic::AtomicUsize, Arc}, }; -use aquatic_common::{access_list::AccessList, privileges::drop_privileges_after_socket_binding}; -use common::TlsConfig; +use aquatic_common::{ + access_list::update_access_list, privileges::drop_privileges_after_socket_binding, +}; +use common::{State, TlsConfig}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; use crate::config::Config; @@ -19,18 +22,44 @@ pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; const SHARED_CHANNEL_SIZE: usize = 1024; -pub fn run(config: Config) -> anyhow::Result<()> { +pub fn run(config: Config) -> ::anyhow::Result<()> { if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { id: config.cpu_pinning.offset, }); } - let access_list = if config.access_list.mode.is_on() { - AccessList::create_from_path(&config.access_list.path).expect("Load access list") - } else { - AccessList::default() - }; + let state = State::default(); + + update_access_list(&config.access_list, &state.access_list)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } let num_peers = config.socket_workers + config.request_workers; @@ -45,11 +74,11 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.socket_workers) { let config = config.clone(); + let state = state.clone(); let tls_config = tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -60,11 +89,11 @@ pub fn run(config: Config) -> anyhow::Result<()> { let executor = builder.spawn(|| async move { network::run_socket_worker( config, + state, tls_config, request_mesh_builder, response_mesh_builder, num_bound_sockets, - access_list, ) .await }); @@ -74,9 +103,9 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.request_workers) { let config = config.clone(); + let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -85,13 +114,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let executor = builder.spawn(|| async move { - handlers::run_request_worker( - config, - request_mesh_builder, - response_mesh_builder, - access_list, - ) - .await + handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) + .await }); executors.push(executor); diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index d0e9679..f4e71ca 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -7,7 +7,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; @@ -40,14 +40,14 @@ struct ConnectionReference { pub async fn run_socket_worker( config: Config, + state: State, tls_config: Arc, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, num_bound_sockets: Arc, - access_list: AccessList, ) { let config = Rc::new(config); - let access_list = Rc::new(RefCell::new(access_list)); + let access_list = state.access_list; let listener = TcpListener::bind(config.network.address).expect("bind socket"); num_bound_sockets.fetch_add(1, Ordering::SeqCst); @@ -62,15 +62,6 @@ pub async fn run_socket_worker( let connection_slab = Rc::new(RefCell::new(Slab::new())); let connections_to_remove = Rc::new(RefCell::new(Vec::new())); - // Periodically update access list - TimerActionRepeat::repeat(enclose!((config, access_list) move || { - enclose!((config, access_list) move || async move { - update_access_list(config.clone(), access_list.clone()).await; - - Some(Duration::from_secs(config.cleaning.interval)) - })() - })); - // Periodically remove closed connections TimerActionRepeat::repeat( enclose!((config, connection_slab, connections_to_remove) move || { @@ -146,7 +137,9 @@ async fn remove_closed_connections( } } - Some(Duration::from_secs(config.cleaning.interval)) + Some(Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )) } async fn receive_out_messages( @@ -176,7 +169,7 @@ struct Connection; impl Connection { async fn run( config: Rc, - access_list: Rc>, + access_list: Arc, in_message_senders: Rc>, out_message_sender: Rc>, out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, @@ -201,11 +194,12 @@ impl Connection { let (ws_out, ws_in) = futures::StreamExt::split(stream); let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); + let access_list_cache = create_access_list_cache(&access_list); let reader_handle = spawn_local(enclose!((pending_scrape_slab) async move { let mut reader = ConnectionReader { config, - access_list, + access_list_cache, in_message_senders, out_message_sender, pending_scrape_slab, @@ -237,7 +231,7 @@ impl Connection { struct ConnectionReader { config: Rc, - access_list: Rc>, + access_list_cache: AccessListCache, in_message_senders: Rc>, out_message_sender: Rc>, pending_scrape_slab: Rc>>, @@ -275,8 +269,8 @@ impl ConnectionReader { let info_hash = announce_request.info_hash; if self - .access_list - .borrow() + .access_list_cache + .load() .allows(self.config.access_list.mode, &info_hash.0) { let in_message = InMessage::AnnounceRequest(announce_request);