udp: clean pending scrape map

This commit is contained in:
Joakim Frostegård 2021-11-19 01:10:37 +01:00
parent 2e7c8ac904
commit 9c919a6ecb
2 changed files with 37 additions and 9 deletions

View file

@ -142,10 +142,19 @@ pub struct CleaningConfig {
pub connection_cleaning_interval: u64,
/// Clean torrents this often (seconds)
pub torrent_cleaning_interval: u64,
/// Clean pending scrape responses this often (seconds)
///
/// In regular operation, there should be no pending scrape responses
/// lingering for a long time. However, the cleaning also returns unused
/// allocated memory to the OS, so the interval can be configured here.
pub pending_scrape_cleaning_interval: u64,
/// Remove connections that are older than this (seconds)
pub max_connection_age: u64,
/// Remove peers that haven't announced for this long (seconds)
pub max_peer_age: u64,
/// Remove pending scrape responses that haven't been returned from request
/// workers for this long (seconds)
pub max_pending_scrape_age: u64,
}
impl Default for CleaningConfig {
@ -153,8 +162,10 @@ impl Default for CleaningConfig {
Self {
connection_cleaning_interval: 60,
torrent_cleaning_interval: 60 * 2,
pending_scrape_cleaning_interval: 60 * 10,
max_connection_age: 60 * 5,
max_peer_age: 60 * 20,
max_pending_scrape_age: 60,
}
}
}

View file

@ -140,10 +140,15 @@ pub fn run_socket_worker(
let timeout = Duration::from_millis(50);
let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval);
let connection_cleaning_duration =
Duration::from_secs(config.cleaning.connection_cleaning_interval);
let pending_scrape_cleaning_duration =
Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval);
let mut last_connection_cleaning = Instant::now();
let mut last_pending_scrape_cleaning = Instant::now();
let mut iter_counter = 0usize;
let mut last_cleaning = Instant::now();
loop {
poll.poll(&mut events, Some(timeout))
@ -180,10 +185,15 @@ pub fn run_socket_worker(
if iter_counter % 32 == 0 {
let now = Instant::now();
if now > last_cleaning + cleaning_duration {
if now > last_connection_cleaning + connection_cleaning_duration {
connections.clean();
last_cleaning = now;
last_connection_cleaning = now;
}
if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration {
pending_scrape_responses.clean();
last_pending_scrape_cleaning = now;
}
}
@ -206,7 +216,8 @@ fn read_requests(
let mut requests_received: usize = 0;
let mut bytes_received: usize = 0;
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
let connection_valid_until = ValidUntil::new(config.cleaning.max_connection_age);
let pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age);
let mut access_list_cache = create_access_list_cache(&state.access_list);
@ -246,7 +257,8 @@ fn read_requests(
rng,
request_sender,
local_responses,
valid_until,
connection_valid_until,
pending_scrape_valid_until,
res_request,
src,
);
@ -281,7 +293,8 @@ pub fn handle_request(
rng: &mut StdRng,
request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, SocketAddr)>,
valid_until: ValidUntil,
connection_valid_until: ValidUntil,
pending_scrape_valid_until: ValidUntil,
res_request: Result<Request, RequestParseError>,
src: SocketAddr,
) {
@ -291,7 +304,7 @@ pub fn handle_request(
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.insert(connection_id, src, valid_until);
connections.insert(connection_id, src, connection_valid_until);
let response = Response::Connect(ConnectResponse {
connection_id,
@ -342,7 +355,11 @@ pub fn handle_request(
pending.info_hashes.insert(i, info_hash);
}
pending_scrape_responses.prepare(transaction_id, requests.len(), valid_until);
pending_scrape_responses.prepare(
transaction_id,
requests.len(),
pending_scrape_valid_until,
);
for (request_worker_index, request) in requests {
request_sender.try_send_to(