aquatic_udp: glommio: periodically clean/update pending scrape vars

This commit is contained in:
Joakim Frostegård 2021-10-23 14:50:06 +02:00
parent 96196239f5
commit 3355822422

View file

@ -6,7 +6,7 @@ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}; };
use std::time::Duration; use std::time::{Duration, Instant};
use futures_lite::{Stream, StreamExt}; use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
@ -27,6 +27,8 @@ use crate::common::network::ConnectionMap;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
const PENDING_SCRAPE_MAX_WAIT: u64 = 30;
struct PendingScrapeResponse { struct PendingScrapeResponse {
pending_worker_responses: usize, pending_worker_responses: usize,
valid_until: ValidUntil, valid_until: ValidUntil,
@ -70,6 +72,15 @@ impl PendingScrapeResponses {
None None
} }
} }
fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| {
v.valid_until.0 > now
});
self.0.shrink_to_fit();
}
} }
pub async fn run_socket_worker( pub async fn run_socket_worker(
@ -99,9 +110,17 @@ pub async fn run_socket_worker(
let response_consumer_index = response_receivers.consumer_id().unwrap(); let response_consumer_index = response_receivers.consumer_id().unwrap();
// FIXME: needs cleaning
let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default()));
// Periodically clean pending_scrape_responses
TimerActionRepeat::repeat(enclose!((config, pending_scrape_responses) move || {
enclose!((config, pending_scrape_responses) move || async move {
pending_scrape_responses.borrow_mut().clean();
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
spawn_local(enclose!((pending_scrape_responses) read_requests( spawn_local(enclose!((pending_scrape_responses) read_requests(
config.clone(), config.clone(),
request_senders, request_senders,
@ -140,6 +159,7 @@ async fn read_requests(
let max_connection_age = config.cleaning.max_connection_age; let max_connection_age = config.cleaning.max_connection_age;
let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age)));
let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT)));
let access_list = Rc::new(RefCell::new(access_list)); let access_list = Rc::new(RefCell::new(access_list));
let connections = Rc::new(RefCell::new(ConnectionMap::default())); let connections = Rc::new(RefCell::new(ConnectionMap::default()));
@ -152,6 +172,15 @@ async fn read_requests(
})() })()
})); }));
// Periodically update pending_scrape_valid_until
TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || {
enclose!((pending_scrape_valid_until) move || async move {
*pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT);
Some(Duration::from_secs(10))
})()
}));
// Periodically update access list // Periodically update access list
TimerActionRepeat::repeat(enclose!((config, access_list) move || { TimerActionRepeat::repeat(enclose!((config, access_list) move || {
enclose!((config, access_list) move || async move { enclose!((config, access_list) move || async move {
@ -241,7 +270,7 @@ async fn read_requests(
pending_scrape_responses.borrow_mut().prepare( pending_scrape_responses.borrow_mut().prepare(
request.transaction_id, request.transaction_id,
consumer_requests.len(), consumer_requests.len(),
connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil pending_scrape_valid_until.borrow().to_owned(),
); );
for (consumer_index, request) in consumer_requests { for (consumer_index, request) in consumer_requests {