From b6f6a2d73b9d2c70e9845dcc94df5d5e4fe12ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 16 Nov 2021 01:59:48 +0100 Subject: [PATCH] udp: uring: attempt to send more responses per syscall --- aquatic_udp/src/lib/network_uring.rs | 55 +++++++++++++--------------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/aquatic_udp/src/lib/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs index 63be9cc..c0c2c64 100644 --- a/aquatic_udp/src/lib/network_uring.rs +++ b/aquatic_udp/src/lib/network_uring.rs @@ -175,8 +175,10 @@ pub fn run_socket_worker( }) .collect(); - let timeout = Timespec::new().nsec(500_000_000); - let mut timeout_set = false; + let timeout = Timespec::new().sec(1); + + let mut force_send_responses = false; + let mut timeout_queued = false; let mut recv_entries = Slab::with_capacity(MAX_RECV_EVENTS); let mut send_entries = Slab::with_capacity(MAX_SEND_EVENTS); @@ -275,7 +277,8 @@ pub fn run_socket_worker( } } UserData::Timeout => { - timeout_set = false; + force_send_responses = true; + timeout_queued = false; } } } @@ -295,8 +298,8 @@ pub fn run_socket_worker( } } - if !timeout_set { - // Setup timer to occasionally check if there are pending responses + if !timeout_queued { + // Setup timer to occasionally force sending of responses let user_data = UserData::Timeout; let timespec_ptr: *const Timespec = &timeout; @@ -309,32 +312,10 @@ pub fn run_socket_worker( sq.push(&entry).unwrap(); } - timeout_set = true; + timeout_queued = true; } - let num_local_to_queue = (MAX_SEND_EVENTS - send_entries.len()).min(local_responses.len()); - - for (response, addr) in local_responses.drain(local_responses.len() - num_local_to_queue..) - { - queue_response( - &config, - &mut sq, - fd, - &mut send_entries, - &mut buffers, - &mut iovs, - &mut sockaddrs_ipv4, - &mut sockaddrs_ipv6, - &mut msghdrs, - response, - addr, - ); - } - - for (response, addr) in response_receiver - .try_iter() - .take(MAX_SEND_EVENTS - send_entries.len()) - { + for (response, addr) in response_receiver.try_iter() { let opt_response = match response { ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), @@ -342,6 +323,18 @@ pub fn run_socket_worker( }; if let Some(response) = opt_response { + local_responses.push((response, addr)); + } + } + + let space_in_send_queue = MAX_SEND_EVENTS - send_entries.len(); + + if force_send_responses | (local_responses.len() >= space_in_send_queue) { + let num_to_queue = (space_in_send_queue).min(local_responses.len()); + let drain_from_index = local_responses.len() - num_to_queue; + + for (response, addr) in local_responses.drain(drain_from_index..) + { queue_response( &config, &mut sq, @@ -356,6 +349,10 @@ pub fn run_socket_worker( addr, ); } + + if local_responses.is_empty() { + force_send_responses = false; + } } if iter_counter % 32 == 0 {