udp: uring: store pending_scrape_valid_until in SocketWorker

This commit is contained in:
Joakim Frostegård 2023-03-15 23:32:13 +01:00
parent d6f8adcb53
commit cf08e96a7e

View file

@ -91,6 +91,7 @@ pub struct SocketWorker {
recv_sqe: io_uring::squeue::Entry, recv_sqe: io_uring::squeue::Entry,
pulse_timeout_sqe: io_uring::squeue::Entry, pulse_timeout_sqe: io_uring::squeue::Entry,
cleaning_timeout_sqe: io_uring::squeue::Entry, cleaning_timeout_sqe: io_uring::squeue::Entry,
pending_scrape_valid_until: ValidUntil,
} }
impl SocketWorker { impl SocketWorker {
@ -162,6 +163,9 @@ impl SocketWorker {
cleaning_timeout_sqe.clone(), cleaning_timeout_sqe.clone(),
]; ];
let pending_scrape_valid_until =
ValidUntil::new(server_start_instant, config.cleaning.max_pending_scrape_age);
let mut worker = Self { let mut worker = Self {
config, config,
shared_state, shared_state,
@ -180,17 +184,13 @@ impl SocketWorker {
cleaning_timeout_sqe, cleaning_timeout_sqe,
resubmittable_sqe_buf, resubmittable_sqe_buf,
socket, socket,
pending_scrape_valid_until,
}; };
CurrentRing::with(|ring| worker.run_inner(ring)); CurrentRing::with(|ring| worker.run_inner(ring));
} }
fn run_inner(&mut self, ring: &mut IoUring) { fn run_inner(&mut self, ring: &mut IoUring) {
let mut pending_scrape_valid_until = ValidUntil::new(
self.server_start_instant,
self.config.cleaning.max_pending_scrape_age,
);
loop { loop {
for sqe in self.resubmittable_sqe_buf.drain(..) { for sqe in self.resubmittable_sqe_buf.drain(..) {
unsafe { ring.submission().push(&sqe).unwrap() }; unsafe { ring.submission().push(&sqe).unwrap() };
@ -258,28 +258,24 @@ impl SocketWorker {
.unwrap(); .unwrap();
for cqe in ring.completion() { for cqe in ring.completion() {
self.handle_cqe(&mut pending_scrape_valid_until, cqe); self.handle_cqe(cqe);
} }
self.send_buffers.reset_likely_next_free_index(); self.send_buffers.reset_likely_next_free_index();
} }
} }
fn handle_cqe( fn handle_cqe(&mut self, cqe: io_uring::cqueue::Entry) {
&mut self,
pending_scrape_valid_until: &mut ValidUntil,
cqe: io_uring::cqueue::Entry,
) {
match cqe.user_data() { match cqe.user_data() {
USER_DATA_RECV => { USER_DATA_RECV => {
self.handle_recv_cqe(*pending_scrape_valid_until, &cqe); self.handle_recv_cqe(&cqe);
if !io_uring::cqueue::more(cqe.flags()) { if !io_uring::cqueue::more(cqe.flags()) {
self.resubmittable_sqe_buf.push(self.recv_sqe.clone()); self.resubmittable_sqe_buf.push(self.recv_sqe.clone());
} }
} }
USER_DATA_PULSE_TIMEOUT => { USER_DATA_PULSE_TIMEOUT => {
*pending_scrape_valid_until = ValidUntil::new( self.pending_scrape_valid_until = ValidUntil::new(
self.server_start_instant, self.server_start_instant,
self.config.cleaning.max_pending_scrape_age, self.config.cleaning.max_pending_scrape_age,
); );
@ -344,11 +340,7 @@ impl SocketWorker {
} }
} }
fn handle_recv_cqe( fn handle_recv_cqe(&mut self, cqe: &io_uring::cqueue::Entry) {
&mut self,
pending_scrape_valid_until: ValidUntil,
cqe: &io_uring::cqueue::Entry,
) {
let result = cqe.result(); let result = cqe.result();
if result < 0 { if result < 0 {
@ -384,7 +376,7 @@ impl SocketWorker {
let addr = match self.recv_helper.parse(buffer) { let addr = match self.recv_helper.parse(buffer) {
Ok((request, addr)) => { Ok((request, addr)) => {
self.handle_request(pending_scrape_valid_until, request, addr); self.handle_request(request, addr);
addr addr
} }
@ -439,12 +431,7 @@ impl SocketWorker {
} }
} }
fn handle_request( fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) {
&mut self,
pending_scrape_valid_until: ValidUntil,
request: Request,
src: CanonicalSocketAddr,
) {
let access_list_mode = self.config.access_list.mode; let access_list_mode = self.config.access_list.mode;
match request { match request {
@ -494,7 +481,7 @@ impl SocketWorker {
let split_requests = self.pending_scrape_responses.prepare_split_requests( let split_requests = self.pending_scrape_responses.prepare_split_requests(
&self.config, &self.config,
request, request,
pending_scrape_valid_until, self.pending_scrape_valid_until,
); );
for (swarm_worker_index, request) in split_requests { for (swarm_worker_index, request) in split_requests {