udp: uring: refactor SocketWorker, fix Timespec UB

This commit is contained in:
Joakim Frostegård 2023-03-10 00:17:39 +01:00
parent a0c0e85122
commit 612cc4cf62

View file

@ -71,15 +71,17 @@ pub struct SocketWorker {
access_list_cache: AccessListCache, access_list_cache: AccessListCache,
validator: ConnectionValidator, validator: ConnectionValidator,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
#[allow(dead_code)]
socket: UdpSocket,
pending_scrape_responses: PendingScrapeResponseSlab, pending_scrape_responses: PendingScrapeResponseSlab,
buf_ring: BufRing,
send_buffers: SendBuffers, send_buffers: SendBuffers,
recv_helper: RecvHelper, recv_helper: RecvHelper,
local_responses: VecDeque<(Response, CanonicalSocketAddr)>, local_responses: VecDeque<(Response, CanonicalSocketAddr)>,
pulse_timeout: Timespec, resubmittable_squeue_buf: Vec<io_uring::squeue::Entry>,
cleaning_timeout: Timespec, recv_entry: io_uring::squeue::Entry,
buf_ring: BufRing, pulse_timeout_entry: io_uring::squeue::Entry,
#[allow(dead_code)] cleaning_timeout_entry: io_uring::squeue::Entry,
socket: UdpSocket,
} }
impl SocketWorker { impl SocketWorker {
@ -101,8 +103,6 @@ impl SocketWorker {
let access_list_cache = create_access_list_cache(&shared_state.access_list); let access_list_cache = create_access_list_cache(&shared_state.access_list);
let send_buffers = SendBuffers::new(&config, send_buffer_entries as usize); let send_buffers = SendBuffers::new(&config, send_buffer_entries as usize);
let recv_helper = RecvHelper::new(&config); let recv_helper = RecvHelper::new(&config);
let cleaning_timeout =
Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval);
let ring = IoUring::builder() let ring = IoUring::builder()
.setup_coop_taskrun() .setup_coop_taskrun()
@ -124,6 +124,35 @@ impl SocketWorker {
.build() .build()
.unwrap(); .unwrap();
let recv_entry = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap());
// This timeout enables regular updates of pending_scrape_valid_until
// and wakes the main loop to send any pending responses in the case
// of no incoming requests
let pulse_timeout_entry = {
let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _;
Timeout::new(timespec_ptr)
.build()
.user_data(USER_DATA_PULSE_TIMEOUT)
};
let cleaning_timeout_entry = {
let timespec_ptr = Box::into_raw(Box::new(
Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval),
)) as *const _;
Timeout::new(timespec_ptr)
.build()
.user_data(USER_DATA_CLEANING_TIMEOUT)
};
let resubmittable_squeue_buf = vec![
recv_entry.clone(),
pulse_timeout_entry.clone(),
cleaning_timeout_entry.clone(),
];
let mut worker = Self { let mut worker = Self {
config, config,
shared_state, shared_state,
@ -136,42 +165,25 @@ impl SocketWorker {
send_buffers, send_buffers,
recv_helper, recv_helper,
local_responses: Default::default(), local_responses: Default::default(),
pulse_timeout: Timespec::new().sec(1),
cleaning_timeout,
buf_ring, buf_ring,
recv_entry,
pulse_timeout_entry,
cleaning_timeout_entry,
resubmittable_squeue_buf,
socket, socket,
}; };
CurrentRing::with(|ring| worker.run_inner(ring)); CurrentRing::with(|ring| worker.run_inner(ring));
} }
pub fn run_inner(&mut self, ring: &mut IoUring) { fn run_inner(&mut self, ring: &mut IoUring) {
let mut pending_scrape_valid_until = ValidUntil::new( let mut pending_scrape_valid_until = ValidUntil::new(
self.server_start_instant, self.server_start_instant,
self.config.cleaning.max_pending_scrape_age, self.config.cleaning.max_pending_scrape_age,
); );
let recv_entry = self
.recv_helper
.create_entry(self.buf_ring.bgid().try_into().unwrap());
// This timeout enables regular updates of pending_scrape_valid_until
// and wakes the main loop to send any pending responses in the case
// of no incoming requests
let pulse_timeout_entry = Timeout::new(&self.pulse_timeout as *const _)
.build()
.user_data(USER_DATA_PULSE_TIMEOUT);
let cleaning_timeout_entry = Timeout::new(&self.cleaning_timeout as *const _)
.build()
.user_data(USER_DATA_CLEANING_TIMEOUT);
let mut squeue_buf = vec![
recv_entry.clone(),
pulse_timeout_entry.clone(),
cleaning_timeout_entry.clone(),
];
loop { loop {
for sqe in squeue_buf.drain(..) { for sqe in self.resubmittable_squeue_buf.drain(..) {
unsafe { ring.submission().push(&sqe).unwrap() }; unsafe { ring.submission().push(&sqe).unwrap() };
} }
@ -213,28 +225,8 @@ impl SocketWorker {
}; };
// Enqueue swarm worker responses // Enqueue swarm worker responses
'outer: for _ in 0..sq_space { for _ in 0..sq_space {
let (response, addr) = loop { if let Some((response, addr)) = self.get_next_swarm_response() {
match self.response_receiver.try_recv() {
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
break (Response::AnnounceIpv4(response), addr);
}
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
break (Response::AnnounceIpv6(response), addr);
}
Ok((ConnectedResponse::Scrape(response), addr)) => {
if let Some(response) =
self.pending_scrape_responses.add_and_get_finished(response)
{
break (Response::Scrape(response), addr);
}
}
Err(_) => {
break 'outer;
}
}
};
match self.send_buffers.prepare_entry(&response, addr) { match self.send_buffers.prepare_entry(&response, addr) {
Ok(entry) => { Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() }; unsafe { ring.submission().push(&entry).unwrap() };
@ -250,6 +242,9 @@ impl SocketWorker {
::log::error!("Failed serializing response: {:#}", err); ::log::error!("Failed serializing response: {:#}", err);
} }
} }
} else {
break;
}
} }
// Wait for all sendmsg entries to complete. If none were added, // Wait for all sendmsg entries to complete. If none were added,
@ -260,16 +255,51 @@ impl SocketWorker {
.unwrap(); .unwrap();
for cqe in ring.completion() { for cqe in ring.completion() {
self.handle_cqe(&mut pending_scrape_valid_until, cqe);
}
self.send_buffers.reset_index();
}
}
fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> {
loop {
match self.response_receiver.try_recv() {
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
return Some((Response::AnnounceIpv4(response), addr));
}
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
return Some((Response::AnnounceIpv6(response), addr));
}
Ok((ConnectedResponse::Scrape(response), addr)) => {
if let Some(response) =
self.pending_scrape_responses.add_and_get_finished(response)
{
return Some((Response::Scrape(response), addr));
}
}
Err(_) => {
return None;
}
}
}
}
fn handle_cqe(
&mut self,
pending_scrape_valid_until: &mut ValidUntil,
cqe: io_uring::cqueue::Entry,
) {
match cqe.user_data() { match cqe.user_data() {
USER_DATA_RECV => { USER_DATA_RECV => {
self.handle_recv_cqe(pending_scrape_valid_until, &cqe); self.handle_recv_cqe(*pending_scrape_valid_until, &cqe);
if !io_uring::cqueue::more(cqe.flags()) { if !io_uring::cqueue::more(cqe.flags()) {
squeue_buf.push(recv_entry.clone()); self.resubmittable_squeue_buf.push(self.recv_entry.clone());
} }
} }
USER_DATA_PULSE_TIMEOUT => { USER_DATA_PULSE_TIMEOUT => {
pending_scrape_valid_until = ValidUntil::new( *pending_scrape_valid_until = ValidUntil::new(
self.server_start_instant, self.server_start_instant,
self.config.cleaning.max_pending_scrape_age, self.config.cleaning.max_pending_scrape_age,
); );
@ -280,13 +310,15 @@ impl SocketWorker {
self.response_receiver.len() self.response_receiver.len()
); );
squeue_buf.push(pulse_timeout_entry.clone()); self.resubmittable_squeue_buf
.push(self.pulse_timeout_entry.clone());
} }
USER_DATA_CLEANING_TIMEOUT => { USER_DATA_CLEANING_TIMEOUT => {
self.pending_scrape_responses self.pending_scrape_responses
.clean(self.server_start_instant.seconds_elapsed()); .clean(self.server_start_instant.seconds_elapsed());
squeue_buf.push(cleaning_timeout_entry.clone()); self.resubmittable_squeue_buf
.push(self.cleaning_timeout_entry.clone());
} }
send_buffer_index => { send_buffer_index => {
let result = cqe.result(); let result = cqe.result();
@ -322,9 +354,8 @@ impl SocketWorker {
response_counter.fetch_add(1, Ordering::Relaxed); response_counter.fetch_add(1, Ordering::Relaxed);
} }
// Safety: OK because cqe using buffer has been // Safety: OK because cqe using buffer has been returned and
// returned and contents will no longer be accessed // contents will no longer be accessed by kernel
// by kernel
unsafe { unsafe {
self.send_buffers self.send_buffers
.mark_buffer_as_free(send_buffer_index as usize); .mark_buffer_as_free(send_buffer_index as usize);
@ -333,10 +364,6 @@ impl SocketWorker {
} }
} }
self.send_buffers.reset_index();
}
}
fn handle_recv_cqe( fn handle_recv_cqe(
&mut self, &mut self,
pending_scrape_valid_until: ValidUntil, pending_scrape_valid_until: ValidUntil,