Refactor AccessList; update it periodically in aquatic_udp glommio

This commit is contained in:
Joakim Frostegård 2021-10-19 22:51:05 +02:00
parent cad3618fad
commit 38617c70f4
18 changed files with 141 additions and 77 deletions

View file

@ -1,8 +1,10 @@
use std::borrow::Borrow;
use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant;
use aquatic_common::access_list::AccessListArcSwap;
use hashbrown::HashMap;
use indexmap::IndexMap;
use parking_lot::Mutex;
@ -127,19 +129,19 @@ pub struct TorrentMaps {
impl TorrentMaps {
/// Remove disallowed and inactive torrents
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessList>) {
pub fn clean<T: Borrow<AccessList>>(&mut self, config: &Config, access_list: T) {
let now = Instant::now();
let access_list_mode = config.access_list.mode;
self.ipv4.retain(|info_hash, torrent| {
access_list.allows(access_list_mode, &info_hash.0)
access_list.borrow().allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv4.shrink_to_fit();
self.ipv6.retain(|info_hash, torrent| {
access_list.allows(access_list_mode, &info_hash.0)
access_list.borrow().allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv6.shrink_to_fit();
@ -183,7 +185,7 @@ pub struct Statistics {
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessList>,
pub access_list: Arc<AccessListArcSwap>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
@ -191,38 +193,13 @@ pub struct State {
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessList::default()),
access_list: Arc::new(AccessListArcSwap::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}
#[derive(Default)]
pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>);
impl ConnectionMap {
pub fn insert(
&mut self,
connection_id: ConnectionId,
socket_addr: SocketAddr,
valid_until: ValidUntil,
) {
self.0.insert((connection_id, socket_addr), valid_until);
}
pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool {
self.0.contains_key(&(connection_id, socket_addr))
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0 > now);
self.0.shrink_to_fit();
}
}
#[cfg(test)]
mod tests {
#[test]

View file

@ -1 +1,30 @@
use std::{net::SocketAddr, time::Instant};
pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*;
use hashbrown::HashMap;
#[derive(Default)]
pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>);
impl ConnectionMap {
pub fn insert(
&mut self,
connection_id: ConnectionId,
socket_addr: SocketAddr,
valid_until: ValidUntil,
) {
self.0.insert((connection_id, socket_addr), valid_until);
}
pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool {
self.0.contains_key(&(connection_id, socket_addr))
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0 > now);
self.0.shrink_to_fit();
}
}

View file

@ -3,10 +3,11 @@ use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::time::Duration;
use futures_lite::{Stream, StreamExt};
use futures_lite::{AsyncBufReadExt, Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::{enclose, prelude::*};
use glommio::io::{BufferedFile, StreamReaderBuilder};
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use rand::prelude::SmallRng;
use rand::SeedableRng;
@ -24,19 +25,34 @@ pub async fn run_request_worker(
let response_senders = Rc::new(response_senders);
let torrents= Rc::new(RefCell::new(TorrentMaps::default()));
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let access_list = Rc::new(RefCell::new(AccessList::default()));
async fn clean(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
) -> Option<Duration> {
torrents.borrow_mut(); // .clean(config, access_list);
TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || {
enclose!((config, torrents, access_list) move || async move {
if config.access_list.mode.is_on(){
let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap();
Some(Duration::from_secs(config.cleaning.interval))
}
let mut reader = StreamReaderBuilder::new(access_list_file).build();
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
clean(config.clone(), torrents.clone())
loop {
let mut buf = String::with_capacity(42);
match reader.read_line(&mut buf).await {
Ok(_) => {
access_list.borrow_mut().insert_from_line(&buf).unwrap() // FIXME
},
Err(err) => {
}
}
}
}
torrents.borrow_mut().clean(&config, &*access_list.borrow());
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
for (_, receiver) in request_receivers.streams() {

View file

@ -15,6 +15,7 @@ use rand::prelude::{Rng, SeedableRng, StdRng};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use aquatic_common::access_list::{AccessList, AccessListMode};
use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery};
pub mod common;
pub mod config;
@ -15,7 +15,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
mio::run(config)
}
pub fn update_access_list(config: &Config, access_list: &Arc<AccessList>) {
pub fn update_access_list(config: &Config, access_list: &Arc<AccessListArcSwap>) {
match config.access_list.mode {
AccessListMode::White | AccessListMode::Black => {
if let Err(err) = access_list.update_from_path(&config.access_list.path) {

View file

@ -1,9 +1,12 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread::Builder;
use std::time::Duration;
use std::{
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use anyhow::Context;
use crossbeam_channel::unbounded;
@ -56,7 +59,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
update_access_list(&config, &state.access_list);
state.torrents.lock().clean(&config, &state.access_list);
state
.torrents
.lock()
.clean(&config, state.access_list.load_full().deref());
}
}

View file

@ -7,6 +7,7 @@ use std::sync::{
use std::time::Duration;
use std::vec::Drain;
use aquatic_common::access_list::AccessListQuery;
use crossbeam_channel::{Receiver, Sender};
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
@ -15,6 +16,7 @@ use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;