diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index edc8441..e457b2e 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -78,10 +78,10 @@ pub struct SocketWorker { send_buffers: SendBuffers, recv_helper: RecvHelper, local_responses: VecDeque<(Response, CanonicalSocketAddr)>, - resubmittable_squeue_buf: Vec, - recv_entry: io_uring::squeue::Entry, - pulse_timeout_entry: io_uring::squeue::Entry, - cleaning_timeout_entry: io_uring::squeue::Entry, + resubmittable_sqe_buf: Vec, + recv_sqe: io_uring::squeue::Entry, + pulse_timeout_sqe: io_uring::squeue::Entry, + cleaning_timeout_sqe: io_uring::squeue::Entry, } impl SocketWorker { @@ -124,12 +124,12 @@ impl SocketWorker { .build() .unwrap(); - let recv_entry = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); + let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); // This timeout enables regular updates of pending_scrape_valid_until // and wakes the main loop to send any pending responses in the case // of no incoming requests - let pulse_timeout_entry = { + let pulse_timeout_sqe = { let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _; Timeout::new(timespec_ptr) @@ -137,7 +137,7 @@ impl SocketWorker { .user_data(USER_DATA_PULSE_TIMEOUT) }; - let cleaning_timeout_entry = { + let cleaning_timeout_sqe = { let timespec_ptr = Box::into_raw(Box::new( Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval), )) as *const _; @@ -147,10 +147,10 @@ impl SocketWorker { .user_data(USER_DATA_CLEANING_TIMEOUT) }; - let resubmittable_squeue_buf = vec![ - recv_entry.clone(), - pulse_timeout_entry.clone(), - cleaning_timeout_entry.clone(), + let resubmittable_sqe_buf = vec![ + recv_sqe.clone(), + pulse_timeout_sqe.clone(), + cleaning_timeout_sqe.clone(), ]; let mut worker = Self { @@ -166,10 +166,10 @@ impl SocketWorker { recv_helper, local_responses: Default::default(), buf_ring, - recv_entry, - pulse_timeout_entry, - cleaning_timeout_entry, - resubmittable_squeue_buf, + recv_sqe, + pulse_timeout_sqe, + cleaning_timeout_sqe, + resubmittable_sqe_buf, socket, }; @@ -183,18 +183,18 @@ impl SocketWorker { ); loop { - for sqe in self.resubmittable_squeue_buf.drain(..) { + for sqe in self.resubmittable_sqe_buf.drain(..) { unsafe { ring.submission().push(&sqe).unwrap() }; } - let mut num_send_added = 0; - let sq_space = { let sq = ring.submission(); sq.capacity() - sq.len() }; + let mut num_send_added = 0; + // Enqueue local responses for _ in 0..sq_space { if let Some((response, addr)) = self.local_responses.pop_front() { @@ -218,14 +218,8 @@ impl SocketWorker { } } - let sq_space = { - let sq = ring.submission(); - - sq.capacity() - sq.len() - }; - // Enqueue swarm worker responses - for _ in 0..sq_space { + for _ in 0..(sq_space - num_send_added) { if let Some((response, addr)) = self.get_next_swarm_response() { match self.send_buffers.prepare_entry(&response, addr) { Ok(entry) => { @@ -258,30 +252,7 @@ impl SocketWorker { self.handle_cqe(&mut pending_scrape_valid_until, cqe); } - self.send_buffers.reset_index(); - } - } - - fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> { - loop { - match self.response_receiver.try_recv() { - Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => { - return Some((Response::AnnounceIpv4(response), addr)); - } - Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => { - return Some((Response::AnnounceIpv6(response), addr)); - } - Ok((ConnectedResponse::Scrape(response), addr)) => { - if let Some(response) = - self.pending_scrape_responses.add_and_get_finished(response) - { - return Some((Response::Scrape(response), addr)); - } - } - Err(_) => { - return None; - } - } + self.send_buffers.reset_likely_next_free_index(); } } @@ -295,7 +266,7 @@ impl SocketWorker { self.handle_recv_cqe(*pending_scrape_valid_until, &cqe); if !io_uring::cqueue::more(cqe.flags()) { - self.resubmittable_squeue_buf.push(self.recv_entry.clone()); + self.resubmittable_sqe_buf.push(self.recv_sqe.clone()); } } USER_DATA_PULSE_TIMEOUT => { @@ -310,15 +281,15 @@ impl SocketWorker { self.response_receiver.len() ); - self.resubmittable_squeue_buf - .push(self.pulse_timeout_entry.clone()); + self.resubmittable_sqe_buf + .push(self.pulse_timeout_sqe.clone()); } USER_DATA_CLEANING_TIMEOUT => { self.pending_scrape_responses .clean(self.server_start_instant.seconds_elapsed()); - self.resubmittable_squeue_buf - .push(self.cleaning_timeout_entry.clone()); + self.resubmittable_sqe_buf + .push(self.cleaning_timeout_sqe.clone()); } send_buffer_index => { let result = cqe.result(); @@ -528,6 +499,29 @@ impl SocketWorker { } } } + + fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> { + loop { + match self.response_receiver.try_recv() { + Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => { + return Some((Response::AnnounceIpv4(response), addr)); + } + Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => { + return Some((Response::AnnounceIpv6(response), addr)); + } + Ok((ConnectedResponse::Scrape(response), addr)) => { + if let Some(response) = + self.pending_scrape_responses.add_and_get_finished(response) + { + return Some((Response::Scrape(response), addr)); + } + } + Err(_) => { + return None; + } + } + } + } } pub fn supported_on_current_kernel() -> anyhow::Result<()> { diff --git a/aquatic_udp/src/workers/socket/uring/send_buffers.rs b/aquatic_udp/src/workers/socket/uring/send_buffers.rs index 3ca69a8..3125416 100644 --- a/aquatic_udp/src/workers/socket/uring/send_buffers.rs +++ b/aquatic_udp/src/workers/socket/uring/send_buffers.rs @@ -204,7 +204,7 @@ impl SendBuffers { } /// Call after going through completion queue - pub fn reset_index(&mut self) { + pub fn reset_likely_next_free_index(&mut self) { self.likely_next_free_index = 0; }