diff --git a/aquatic_udp/src/lib/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs index c0c2c64..22263b3 100644 --- a/aquatic_udp/src/lib/network_uring.rs +++ b/aquatic_udp/src/lib/network_uring.rs @@ -175,7 +175,7 @@ pub fn run_socket_worker( }) .collect(); - let timeout = Timespec::new().sec(1); + let timeout = Timespec::new().nsec(100_000_000); let mut force_send_responses = false; let mut timeout_queued = false; @@ -298,23 +298,6 @@ pub fn run_socket_worker( } } - if !timeout_queued { - // Setup timer to occasionally force sending of responses - let user_data = UserData::Timeout; - - let timespec_ptr: *const Timespec = &timeout; - - let entry = io_uring::opcode::Timeout::new(timespec_ptr) - .build() - .user_data(user_data.into()); - - unsafe { - sq.push(&entry).unwrap(); - } - - timeout_queued = true; - } - for (response, addr) in response_receiver.try_iter() { let opt_response = match response { ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), @@ -355,6 +338,23 @@ pub fn run_socket_worker( } } + if !timeout_queued & !force_send_responses { + // Setup timer to occasionally force sending of responses + let user_data = UserData::Timeout; + + let timespec_ptr: *const Timespec = &timeout; + + let entry = io_uring::opcode::Timeout::new(timespec_ptr) + .build() + .user_data(user_data.into()); + + unsafe { + sq.push(&entry).unwrap(); + } + + timeout_queued = true; + } + if iter_counter % 32 == 0 { let now = Instant::now(); @@ -365,12 +365,10 @@ pub fn run_socket_worker( } } - let all_responses_sent = local_responses.is_empty() & response_receiver.is_empty(); - - let wait_for_num = if all_responses_sent { - send_entries.len() + recv_entries.len() - } else { + let wait_for_num = if force_send_responses { send_entries.len() + } else { + send_entries.len() + recv_entries.len() }; sq.sync();