diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 0ef3ae4..eabbec4 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -6,7 +6,7 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; @@ -27,6 +27,8 @@ use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +const PENDING_SCRAPE_MAX_WAIT: u64 = 30; + struct PendingScrapeResponse { pending_worker_responses: usize, valid_until: ValidUntil, @@ -70,6 +72,15 @@ impl PendingScrapeResponses { None } } + + fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| { + v.valid_until.0 > now + }); + self.0.shrink_to_fit(); + } } pub async fn run_socket_worker( @@ -99,9 +110,17 @@ pub async fn run_socket_worker( let response_consumer_index = response_receivers.consumer_id().unwrap(); - // FIXME: needs cleaning let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); + // Periodically clean pending_scrape_responses + TimerActionRepeat::repeat(enclose!((config, pending_scrape_responses) move || { + enclose!((config, pending_scrape_responses) move || async move { + pending_scrape_responses.borrow_mut().clean(); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + spawn_local(enclose!((pending_scrape_responses) read_requests( config.clone(), request_senders, @@ -140,6 +159,7 @@ async fn read_requests( let max_connection_age = config.cleaning.max_connection_age; let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); + let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); @@ -152,6 +172,15 @@ async fn read_requests( })() })); + // Periodically update pending_scrape_valid_until + TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || { + enclose!((pending_scrape_valid_until) move || async move { + *pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT); + + Some(Duration::from_secs(10)) + })() + })); + // Periodically update access list TimerActionRepeat::repeat(enclose!((config, access_list) move || { enclose!((config, access_list) move || async move { @@ -241,7 +270,7 @@ async fn read_requests( pending_scrape_responses.borrow_mut().prepare( request.transaction_id, consumer_requests.len(), - connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil + pending_scrape_valid_until.borrow().to_owned(), ); for (consumer_index, request) in consumer_requests {