aquatic_udp: glommio: use signals for access list update

This commit is contained in:
Joakim Frostegård 2021-11-02 22:23:47 +01:00
parent a4d131359c
commit fdaafae4b7
7 changed files with 97 additions and 105 deletions

View file

@ -9,6 +9,7 @@ use std::sync::{
};
use std::time::{Duration, Instant};
use aquatic_common::access_list::create_access_list_cache;
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_unbounded, LocalSender};
@ -21,7 +22,7 @@ use rand::prelude::{Rng, SeedableRng, StdRng};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use super::common::update_access_list;
use super::common::State;
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
@ -99,10 +100,10 @@ impl PendingScrapeResponses {
pub async fn run_socket_worker(
config: Config,
state: State,
request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
access_list: AccessList,
) {
let (local_sender, local_receiver) = new_unbounded();
@ -136,12 +137,12 @@ pub async fn run_socket_worker(
spawn_local(enclose!((pending_scrape_responses) read_requests(
config.clone(),
state,
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
pending_scrape_responses,
access_list,
)))
.detach();
@ -159,12 +160,12 @@ pub async fn run_socket_worker(
async fn read_requests(
config: Config,
state: State,
request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>,
response_consumer_index: usize,
local_sender: LocalSender<(Response, SocketAddr)>,
socket: Rc<UdpSocket>,
pending_scrape_responses: Rc<RefCell<PendingScrapeResponses>>,
access_list: AccessList,
) {
let mut rng = StdRng::from_entropy();
@ -174,8 +175,8 @@ async fn read_requests(
let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age)));
let pending_scrape_valid_until =
Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT)));
let access_list = Rc::new(RefCell::new(access_list));
let connections = Rc::new(RefCell::new(ConnectionMap::default()));
let mut access_list_cache = create_access_list_cache(&state.access_list);
// Periodically update connection_valid_until
TimerActionRepeat::repeat(enclose!((connection_valid_until) move || {
@ -195,15 +196,6 @@ async fn read_requests(
})()
}));
// Periodically update access list
TimerActionRepeat::repeat(enclose!((config, access_list) move || {
enclose!((config, access_list) move || async move {
update_access_list(config.clone(), access_list.clone()).await;
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
// Periodically clean connections
TimerActionRepeat::repeat(enclose!((config, connections) move || {
enclose!((config, connections) move || async move {
@ -241,8 +233,8 @@ async fn read_requests(
}
Ok(Request::Announce(request)) => {
if connections.borrow().contains(request.connection_id, src) {
if access_list
.borrow()
if access_list_cache
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let request_consumer_index =