From cf08e96a7edc869a14a460d9325e794f7934c4c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 15 Mar 2023 23:32:13 +0100 Subject: [PATCH] udp: uring: store pending_scrape_valid_until in SocketWorker --- aquatic_udp/src/workers/socket/uring/mod.rs | 39 +++++++-------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index 283d0c3..dd2b728 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -91,6 +91,7 @@ pub struct SocketWorker { recv_sqe: io_uring::squeue::Entry, pulse_timeout_sqe: io_uring::squeue::Entry, cleaning_timeout_sqe: io_uring::squeue::Entry, + pending_scrape_valid_until: ValidUntil, } impl SocketWorker { @@ -162,6 +163,9 @@ impl SocketWorker { cleaning_timeout_sqe.clone(), ]; + let pending_scrape_valid_until = + ValidUntil::new(server_start_instant, config.cleaning.max_pending_scrape_age); + let mut worker = Self { config, shared_state, @@ -180,17 +184,13 @@ impl SocketWorker { cleaning_timeout_sqe, resubmittable_sqe_buf, socket, + pending_scrape_valid_until, }; CurrentRing::with(|ring| worker.run_inner(ring)); } fn run_inner(&mut self, ring: &mut IoUring) { - let mut pending_scrape_valid_until = ValidUntil::new( - self.server_start_instant, - self.config.cleaning.max_pending_scrape_age, - ); - loop { for sqe in self.resubmittable_sqe_buf.drain(..) { unsafe { ring.submission().push(&sqe).unwrap() }; @@ -258,28 +258,24 @@ impl SocketWorker { .unwrap(); for cqe in ring.completion() { - self.handle_cqe(&mut pending_scrape_valid_until, cqe); + self.handle_cqe(cqe); } self.send_buffers.reset_likely_next_free_index(); } } - fn handle_cqe( - &mut self, - pending_scrape_valid_until: &mut ValidUntil, - cqe: io_uring::cqueue::Entry, - ) { + fn handle_cqe(&mut self, cqe: io_uring::cqueue::Entry) { match cqe.user_data() { USER_DATA_RECV => { - self.handle_recv_cqe(*pending_scrape_valid_until, &cqe); + self.handle_recv_cqe(&cqe); if !io_uring::cqueue::more(cqe.flags()) { self.resubmittable_sqe_buf.push(self.recv_sqe.clone()); } } USER_DATA_PULSE_TIMEOUT => { - *pending_scrape_valid_until = ValidUntil::new( + self.pending_scrape_valid_until = ValidUntil::new( self.server_start_instant, self.config.cleaning.max_pending_scrape_age, ); @@ -344,11 +340,7 @@ impl SocketWorker { } } - fn handle_recv_cqe( - &mut self, - pending_scrape_valid_until: ValidUntil, - cqe: &io_uring::cqueue::Entry, - ) { + fn handle_recv_cqe(&mut self, cqe: &io_uring::cqueue::Entry) { let result = cqe.result(); if result < 0 { @@ -384,7 +376,7 @@ impl SocketWorker { let addr = match self.recv_helper.parse(buffer) { Ok((request, addr)) => { - self.handle_request(pending_scrape_valid_until, request, addr); + self.handle_request(request, addr); addr } @@ -439,12 +431,7 @@ impl SocketWorker { } } - fn handle_request( - &mut self, - pending_scrape_valid_until: ValidUntil, - request: Request, - src: CanonicalSocketAddr, - ) { + fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) { let access_list_mode = self.config.access_list.mode; match request { @@ -494,7 +481,7 @@ impl SocketWorker { let split_requests = self.pending_scrape_responses.prepare_split_requests( &self.config, request, - pending_scrape_valid_until, + self.pending_scrape_valid_until, ); for (swarm_worker_index, request) in split_requests {