From 9c919a6ecbefdb8243854ac38aedf6d99e856061 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 19 Nov 2021 01:10:37 +0100 Subject: [PATCH] udp: clean pending scrape map --- aquatic_udp/src/lib/config.rs | 11 +++++++++++ aquatic_udp/src/lib/network.rs | 35 +++++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 3db300d..978344d 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -142,10 +142,19 @@ pub struct CleaningConfig { pub connection_cleaning_interval: u64, /// Clean torrents this often (seconds) pub torrent_cleaning_interval: u64, + /// Clean pending scrape responses this often (seconds) + /// + /// In regular operation, there should be no pending scrape responses + /// lingering for a long time. However, the cleaning also returns unused + /// allocated memory to the OS, so the interval can be configured here. + pub pending_scrape_cleaning_interval: u64, /// Remove connections that are older than this (seconds) pub max_connection_age: u64, /// Remove peers that haven't announced for this long (seconds) pub max_peer_age: u64, + /// Remove pending scrape responses that haven't been returned from request + /// workers for this long (seconds) + pub max_pending_scrape_age: u64, } impl Default for CleaningConfig { @@ -153,8 +162,10 @@ impl Default for CleaningConfig { Self { connection_cleaning_interval: 60, torrent_cleaning_interval: 60 * 2, + pending_scrape_cleaning_interval: 60 * 10, max_connection_age: 60 * 5, max_peer_age: 60 * 20, + max_pending_scrape_age: 60, } } } diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 704fb5a..4fbb97a 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -140,10 +140,15 @@ pub fn run_socket_worker( let timeout = Duration::from_millis(50); - let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval); + let connection_cleaning_duration = + Duration::from_secs(config.cleaning.connection_cleaning_interval); + let pending_scrape_cleaning_duration = + Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); + + let mut last_connection_cleaning = Instant::now(); + let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; - let mut last_cleaning = Instant::now(); loop { poll.poll(&mut events, Some(timeout)) @@ -180,10 +185,15 @@ pub fn run_socket_worker( if iter_counter % 32 == 0 { let now = Instant::now(); - if now > last_cleaning + cleaning_duration { + if now > last_connection_cleaning + connection_cleaning_duration { connections.clean(); - last_cleaning = now; + last_connection_cleaning = now; + } + if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { + pending_scrape_responses.clean(); + + last_pending_scrape_cleaning = now; } } @@ -206,7 +216,8 @@ fn read_requests( let mut requests_received: usize = 0; let mut bytes_received: usize = 0; - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + let connection_valid_until = ValidUntil::new(config.cleaning.max_connection_age); + let pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); let mut access_list_cache = create_access_list_cache(&state.access_list); @@ -246,7 +257,8 @@ fn read_requests( rng, request_sender, local_responses, - valid_until, + connection_valid_until, + pending_scrape_valid_until, res_request, src, ); @@ -281,7 +293,8 @@ pub fn handle_request( rng: &mut StdRng, request_sender: &ConnectedRequestSender, local_responses: &mut Vec<(Response, SocketAddr)>, - valid_until: ValidUntil, + connection_valid_until: ValidUntil, + pending_scrape_valid_until: ValidUntil, res_request: Result, src: SocketAddr, ) { @@ -291,7 +304,7 @@ pub fn handle_request( Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); - connections.insert(connection_id, src, valid_until); + connections.insert(connection_id, src, connection_valid_until); let response = Response::Connect(ConnectResponse { connection_id, @@ -342,7 +355,11 @@ pub fn handle_request( pending.info_hashes.insert(i, info_hash); } - pending_scrape_responses.prepare(transaction_id, requests.len(), valid_until); + pending_scrape_responses.prepare( + transaction_id, + requests.len(), + pending_scrape_valid_until, + ); for (request_worker_index, request) in requests { request_sender.try_send_to(