diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index f71f967..20b9e61 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -4,7 +4,7 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::Context; -use arc_swap::ArcSwap; +use arc_swap::{ArcSwap, Cache}; use hashbrown::HashSet; use serde::{Deserialize, Serialize}; @@ -85,6 +85,7 @@ pub trait AccessListQuery { } pub type AccessListArcSwap = ArcSwap; +pub type AccessListCache = Cache, Arc>; impl AccessListQuery for AccessListArcSwap { fn update(&self, config: &AccessListConfig) -> anyhow::Result<()> { @@ -102,6 +103,10 @@ impl AccessListQuery for AccessListArcSwap { } } +pub fn create_access_list_cache(arc_swap: &Arc) -> AccessListCache { + Cache::from(Arc::clone(arc_swap)) +} + fn parse_info_hash(line: &str) -> anyhow::Result<[u8; 20]> { let mut bytes = [0u8; 20]; diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index cf47c8c..59af31c 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,8 +1,9 @@ -use std::borrow::Borrow; use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::sync::Arc; use std::time::Instant; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -107,19 +108,24 @@ pub struct TorrentMaps { impl TorrentMaps { /// Remove disallowed and inactive torrents - pub fn clean>(&mut self, config: &Config, access_list: T) { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { let now = Instant::now(); + let mut access_list_cache = create_access_list_cache(access_list); let access_list_mode = config.access_list.mode; self.ipv4.retain(|info_hash, torrent| { - access_list.borrow().allows(access_list_mode, &info_hash.0) + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv4.shrink_to_fit(); self.ipv6.retain(|info_hash, torrent| { - access_list.borrow().allows(access_list_mode, &info_hash.0) + access_list_cache + .load() + .allows(access_list_mode, &info_hash.0) && Self::clean_torrent_and_peers(now, torrent) }); self.ipv6.shrink_to_fit(); diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 4b3d80d..3506b09 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -1,52 +1,8 @@ -use std::borrow::Borrow; -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::Arc; -use futures_lite::AsyncBufReadExt; -use glommio::io::{BufferedFile, StreamReaderBuilder}; -use glommio::prelude::*; +use aquatic_common::access_list::AccessListArcSwap; -use crate::common::*; -use crate::config::Config; - -pub async fn update_access_list>( - config: C, - access_list: Rc>, -) { - if config.borrow().access_list.mode.is_on() { - match BufferedFile::open(&config.borrow().access_list.path).await { - Ok(file) => { - let mut reader = StreamReaderBuilder::new(file).build(); - let mut new_access_list = AccessList::default(); - - loop { - let mut buf = String::with_capacity(42); - - match reader.read_line(&mut buf).await { - Ok(_) => { - if let Err(err) = new_access_list.insert_from_line(&buf) { - ::log::error!( - "Couln't parse access list line '{}': {:?}", - buf, - err - ); - } - } - Err(err) => { - ::log::error!("Couln't read access list line {:?}", err); - - break; - } - } - - yield_if_needed().await; - } - - *access_list.borrow_mut() = new_access_list; - } - Err(err) => { - ::log::error!("Couldn't open access list file: {:?}", err) - } - }; - } +#[derive(Default, Clone)] +pub struct State { + pub access_list: Arc, } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 3bfbc0d..aefe701 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -14,28 +14,25 @@ use crate::common::handlers::handle_announce_request; use crate::common::handlers::*; use crate::common::*; use crate::config::Config; -use crate::glommio::common::update_access_list; + +use super::common::State; pub async fn run_request_worker( config: Config, + state: State, request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, - access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let response_senders = Rc::new(response_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(access_list)); - // Periodically clean torrents and update access list - TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { - enclose!((config, torrents, access_list) move || async move { - update_access_list(config.clone(), access_list.clone()).await; - - torrents.borrow_mut().clean(&config, &*access_list.borrow()); + // Periodically clean torrents + TimerActionRepeat::repeat(enclose!((config, torrents, state) move || { + enclose!((config, torrents, state) move || async move { + torrents.borrow_mut().clean(&config, &state.access_list); Some(Duration::from_secs(config.cleaning.interval)) })() diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 72a8eb8..e550c4e 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,30 +1,60 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use aquatic_common::access_list::AccessList; +use aquatic_common::access_list::AccessListQuery; use aquatic_common::privileges::drop_privileges_after_socket_binding; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; +use signal_hook::consts::SIGUSR1; +use signal_hook::iterator::Signals; use crate::config::Config; +use self::common::State; + mod common; pub mod handlers; pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; -pub fn run(config: Config) -> anyhow::Result<()> { +pub fn run(config: Config) -> ::anyhow::Result<()> { if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { id: config.cpu_pinning.offset, }); } - let access_list = if config.access_list.mode.is_on() { - AccessList::create_from_path(&config.access_list.path).expect("Load access list") - } else { - AccessList::default() - }; + let state = State::default(); + + update_access_list(&config, &state)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config, &state); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } let num_peers = config.socket_workers + config.request_workers; @@ -37,10 +67,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.socket_workers) { let config = config.clone(); + let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -51,10 +81,10 @@ pub fn run(config: Config) -> anyhow::Result<()> { let executor = builder.spawn(|| async move { network::run_socket_worker( config, + state, request_mesh_builder, response_mesh_builder, num_bound_sockets, - access_list, ) .await }); @@ -64,9 +94,9 @@ pub fn run(config: Config) -> anyhow::Result<()> { for i in 0..(config.request_workers) { let config = config.clone(); + let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -75,13 +105,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let executor = builder.spawn(|| async move { - handlers::run_request_worker( - config, - request_mesh_builder, - response_mesh_builder, - access_list, - ) - .await + handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) + .await }); executors.push(executor); @@ -103,3 +128,20 @@ pub fn run(config: Config) -> anyhow::Result<()> { Ok(()) } + +fn update_access_list(config: &Config, state: &State) -> anyhow::Result<()> { + if config.access_list.mode.is_on() { + match state.access_list.update(&config.access_list) { + Ok(()) => { + ::log::info!("Access list updated") + } + Err(err) => { + ::log::error!("Updating access list failed: {:#}", err); + + return Err(err); + } + } + } + + Ok(()) +} diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index cb046c0..8daf0d0 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -9,6 +9,7 @@ use std::sync::{ }; use std::time::{Duration, Instant}; +use aquatic_common::access_list::create_access_list_cache; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalSender}; @@ -21,7 +22,7 @@ use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; -use super::common::update_access_list; +use super::common::State; use crate::common::handlers::*; use crate::common::network::ConnectionMap; @@ -99,10 +100,10 @@ impl PendingScrapeResponses { pub async fn run_socket_worker( config: Config, + state: State, request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, num_bound_sockets: Arc, - access_list: AccessList, ) { let (local_sender, local_receiver) = new_unbounded(); @@ -136,12 +137,12 @@ pub async fn run_socket_worker( spawn_local(enclose!((pending_scrape_responses) read_requests( config.clone(), + state, request_senders, response_consumer_index, local_sender, socket.clone(), pending_scrape_responses, - access_list, ))) .detach(); @@ -159,12 +160,12 @@ pub async fn run_socket_worker( async fn read_requests( config: Config, + state: State, request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>, response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, pending_scrape_responses: Rc>, - access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); @@ -174,8 +175,8 @@ async fn read_requests( let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); - let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); + let mut access_list_cache = create_access_list_cache(&state.access_list); // Periodically update connection_valid_until TimerActionRepeat::repeat(enclose!((connection_valid_until) move || { @@ -195,15 +196,6 @@ async fn read_requests( })() })); - // Periodically update access list - TimerActionRepeat::repeat(enclose!((config, access_list) move || { - enclose!((config, access_list) move || async move { - update_access_list(config.clone(), access_list.clone()).await; - - Some(Duration::from_secs(config.cleaning.interval)) - })() - })); - // Periodically clean connections TimerActionRepeat::repeat(enclose!((config, connections) move || { enclose!((config, connections) move || async move { @@ -241,8 +233,8 @@ async fn read_requests( } Ok(Request::Announce(request)) => { if connections.borrow().contains(request.connection_id, src) { - if access_list - .borrow() + if access_list_cache + .load() .allows(access_list_mode, &request.info_hash.0) { let request_consumer_index = diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 4104d47..ffe2c1e 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -1,9 +1,6 @@ +use std::sync::{atomic::AtomicUsize, Arc}; use std::thread::Builder; use std::time::Duration; -use std::{ - ops::Deref, - sync::{atomic::AtomicUsize, Arc}, -}; use anyhow::Context; use aquatic_common::privileges::drop_privileges_after_socket_binding; @@ -146,10 +143,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - state - .torrents - .lock() - .clean(&config, state.access_list.load_full().deref()); + state.torrents.lock().clean(&config, &state.access_list); } }