udp: add optional resending buffer for responses that failed to send

This commit is contained in:
Joakim Frostegård 2022-06-28 00:56:29 +02:00
parent 945ff41ef2
commit 5cfd270ab7
6 changed files with 105 additions and 16 deletions

View file

@ -23,6 +23,8 @@ use requests::read_requests;
use responses::send_responses;
use storage::PendingScrapeResponseSlab;
use self::responses::send_responses_with_resends;
pub fn run_socket_worker(
_sentinel: PanicSentinel,
state: State,
@ -50,6 +52,7 @@ pub fn run_socket_worker(
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new();
let mut resend_buffer: Vec<(Response, CanonicalSocketAddr)> = Vec::new();
let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms);
@ -60,6 +63,7 @@ pub fn run_socket_worker(
let mut last_pending_scrape_cleaning = Instant::now();
let mut iter_counter = 0usize;
let response_resending_active = config.network.resend_buffer_max_len > 0;
loop {
poll.poll(&mut events, Some(poll_timeout))
@ -84,15 +88,28 @@ pub fn run_socket_worker(
}
}
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
);
if response_resending_active {
send_responses_with_resends(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
&mut resend_buffer,
);
} else {
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
);
}
// Run periodic ValidUntil updates and state cleaning
if iter_counter % 256 == 0 {