diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index bae5471..25488f0 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -23,6 +23,9 @@ impl ValidUntil { pub fn new(offset_seconds: u64) -> Self { Self(Instant::now() + Duration::from_secs(offset_seconds)) } + pub fn new_with_now(now: Instant, offset_seconds: u64) -> Self { + Self(now + Duration::from_secs(offset_seconds)) + } } /// Extract response peers diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 4fbb97a..c38c311 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -145,6 +145,9 @@ pub fn run_socket_worker( let pending_scrape_cleaning_duration = Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); + let mut connection_valid_until = ValidUntil::new(config.cleaning.max_connection_age); + let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); + let mut last_connection_cleaning = Instant::now(); let mut last_pending_scrape_cleaning = Instant::now(); @@ -168,6 +171,8 @@ pub fn run_socket_worker( &mut buffer, &request_sender, &mut local_responses, + connection_valid_until, + pending_scrape_valid_until, ); } } @@ -182,9 +187,15 @@ pub fn run_socket_worker( local_responses.drain(..), ); + // Run periodic ValidUntil updates and state cleaning if iter_counter % 32 == 0 { let now = Instant::now(); + connection_valid_until = + ValidUntil::new_with_now(now, config.cleaning.max_connection_age); + pending_scrape_valid_until = + ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age); + if now > last_connection_cleaning + connection_cleaning_duration { connections.clean(); @@ -212,13 +223,12 @@ fn read_requests( buffer: &mut [u8], request_sender: &ConnectedRequestSender, local_responses: &mut Vec<(Response, SocketAddr)>, + connection_valid_until: ValidUntil, + pending_scrape_valid_until: ValidUntil, ) { let mut requests_received: usize = 0; let mut bytes_received: usize = 0; - let connection_valid_until = ValidUntil::new(config.cleaning.max_connection_age); - let pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); - let mut access_list_cache = create_access_list_cache(&state.access_list); loop {