diff --git a/aquatic_http/src/lib/glommio/common.rs b/aquatic_http/src/lib/glommio/common.rs index 11b2036..7495165 100644 --- a/aquatic_http/src/lib/glommio/common.rs +++ b/aquatic_http/src/lib/glommio/common.rs @@ -1,10 +1,20 @@ +use std::borrow::Borrow; +use std::cell::RefCell; +use std::rc::Rc; use std::net::SocketAddr; +use aquatic_common::access_list::AccessList; +use futures_lite::AsyncBufReadExt; +use glommio::io::{BufferedFile, StreamReaderBuilder}; +use glommio::prelude::*; + use aquatic_http_protocol::{ request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse}, }; +use crate::config::Config; + #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); @@ -55,3 +65,44 @@ impl ChannelResponse { } } } + +pub async fn update_access_list>(config: C, access_list: Rc>) { + if config.borrow().access_list.mode.is_on() { + match BufferedFile::open(&config.borrow().access_list.path).await { + Ok(file) => { + let mut reader = StreamReaderBuilder::new(file).build(); + let mut new_access_list = AccessList::default(); + + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + if let Err(err) = new_access_list.insert_from_line(&buf) { + ::log::error!( + "Couln't parse access list line '{}': {:?}", + buf, + err + ); + } + } + Err(err) => { + ::log::error!("Couln't read access list line {:?}", err); + + break; + } + } + + yield_if_needed().await; + } + + *access_list.borrow_mut() = new_access_list; + } + Err(err) => { + ::log::error!("Couldn't open access list file: {:?}", err) + } + }; + } +} + + diff --git a/aquatic_http/src/lib/glommio/handlers.rs b/aquatic_http/src/lib/glommio/handlers.rs index 13ca6b2..21a4daf 100644 --- a/aquatic_http/src/lib/glommio/handlers.rs +++ b/aquatic_http/src/lib/glommio/handlers.rs @@ -34,7 +34,7 @@ pub async fn run_request_worker( // Periodically clean torrents and update access list TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - // update_access_list(config.clone(), access_list.clone()).await; + update_access_list(&config, access_list.clone()).await; torrents.borrow_mut().clean(&config, &*access_list.borrow()); diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 31b613b..a756704 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -79,6 +79,15 @@ pub async fn run_socket_worker( let connection_slab = Rc::new(RefCell::new(Slab::new())); let connections_to_remove = Rc::new(RefCell::new(Vec::new())); + // Periodically update access list + TimerActionRepeat::repeat(enclose!((config, access_list) move || { + enclose!((config, access_list) move || async move { + update_access_list(config.clone(), access_list.clone()).await; + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + // Periodically remove closed connections TimerActionRepeat::repeat(enclose!((config, connection_slab, connections_to_remove) move || { enclose!((config, connection_slab, connections_to_remove) move || async move {