aquatic_udp: glommio: start work on periodic cleaning

This commit is contained in:
Joakim Frostegård 2021-10-19 11:54:00 +02:00
parent b5aa07c21f
commit cad3618fad

View file

@ -1,10 +1,12 @@
use std::cell::RefCell;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::time::Duration;
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::prelude::*;
use glommio::{enclose, prelude::*};
use glommio::timer::TimerActionRepeat;
use rand::prelude::SmallRng;
use rand::SeedableRng;
@ -22,15 +24,25 @@ pub async fn run_request_worker(
let response_senders = Rc::new(response_senders);
// Need to be cleaned periodically: use timer?
let torrents_ipv4 = Rc::new(RefCell::new(TorrentMap::<Ipv4Addr>::default()));
let torrents_ipv6 = Rc::new(RefCell::new(TorrentMap::<Ipv6Addr>::default()));
let torrents= Rc::new(RefCell::new(TorrentMaps::default()));
async fn clean(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
) -> Option<Duration> {
torrents.borrow_mut(); // .clean(config, access_list);
Some(Duration::from_secs(config.cleaning.interval))
}
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
clean(config.clone(), torrents.clone())
}));
for (_, receiver) in request_receivers.streams() {
handle_request_stream(
&config,
torrents_ipv4.clone(),
torrents_ipv6.clone(),
torrents.clone(),
response_senders.clone(),
receiver,
)
@ -40,8 +52,7 @@ pub async fn run_request_worker(
async fn handle_request_stream<S>(
config: &Config,
torrents_ipv4: Rc<RefCell<TorrentMap<Ipv4Addr>>>,
torrents_ipv6: Rc<RefCell<TorrentMap<Ipv6Addr>>>,
torrents: Rc<RefCell<TorrentMaps>>,
response_senders: Rc<Senders<(AnnounceResponse, SocketAddr)>>,
mut stream: S,
) where
@ -57,7 +68,7 @@ async fn handle_request_stream<S>(
IpAddr::V4(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents_ipv4.borrow_mut(),
&mut torrents.borrow_mut().ipv4,
request,
ip,
peer_valid_until,
@ -65,7 +76,7 @@ async fn handle_request_stream<S>(
IpAddr::V6(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents_ipv6.borrow_mut(),
&mut torrents.borrow_mut().ipv6,
request,
ip,
peer_valid_until,