udp: uring: attempt to send more responses per syscall

This commit is contained in:
Joakim Frostegård 2021-11-16 01:59:48 +01:00
parent b617ff9d09
commit b6f6a2d73b

View file

@ -175,8 +175,10 @@ pub fn run_socket_worker(
}) })
.collect(); .collect();
let timeout = Timespec::new().nsec(500_000_000); let timeout = Timespec::new().sec(1);
let mut timeout_set = false;
let mut force_send_responses = false;
let mut timeout_queued = false;
let mut recv_entries = Slab::with_capacity(MAX_RECV_EVENTS); let mut recv_entries = Slab::with_capacity(MAX_RECV_EVENTS);
let mut send_entries = Slab::with_capacity(MAX_SEND_EVENTS); let mut send_entries = Slab::with_capacity(MAX_SEND_EVENTS);
@ -275,7 +277,8 @@ pub fn run_socket_worker(
} }
} }
UserData::Timeout => { UserData::Timeout => {
timeout_set = false; force_send_responses = true;
timeout_queued = false;
} }
} }
} }
@ -295,8 +298,8 @@ pub fn run_socket_worker(
} }
} }
if !timeout_set { if !timeout_queued {
// Setup timer to occasionally check if there are pending responses // Setup timer to occasionally force sending of responses
let user_data = UserData::Timeout; let user_data = UserData::Timeout;
let timespec_ptr: *const Timespec = &timeout; let timespec_ptr: *const Timespec = &timeout;
@ -309,32 +312,10 @@ pub fn run_socket_worker(
sq.push(&entry).unwrap(); sq.push(&entry).unwrap();
} }
timeout_set = true; timeout_queued = true;
} }
let num_local_to_queue = (MAX_SEND_EVENTS - send_entries.len()).min(local_responses.len()); for (response, addr) in response_receiver.try_iter() {
for (response, addr) in local_responses.drain(local_responses.len() - num_local_to_queue..)
{
queue_response(
&config,
&mut sq,
fd,
&mut send_entries,
&mut buffers,
&mut iovs,
&mut sockaddrs_ipv4,
&mut sockaddrs_ipv6,
&mut msghdrs,
response,
addr,
);
}
for (response, addr) in response_receiver
.try_iter()
.take(MAX_SEND_EVENTS - send_entries.len())
{
let opt_response = match response { let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
@ -342,6 +323,18 @@ pub fn run_socket_worker(
}; };
if let Some(response) = opt_response { if let Some(response) = opt_response {
local_responses.push((response, addr));
}
}
let space_in_send_queue = MAX_SEND_EVENTS - send_entries.len();
if force_send_responses | (local_responses.len() >= space_in_send_queue) {
let num_to_queue = (space_in_send_queue).min(local_responses.len());
let drain_from_index = local_responses.len() - num_to_queue;
for (response, addr) in local_responses.drain(drain_from_index..)
{
queue_response( queue_response(
&config, &config,
&mut sq, &mut sq,
@ -356,6 +349,10 @@ pub fn run_socket_worker(
addr, addr,
); );
} }
if local_responses.is_empty() {
force_send_responses = false;
}
} }
if iter_counter % 32 == 0 { if iter_counter % 32 == 0 {