udp: uring: minor refactoring

This commit is contained in:
Joakim Frostegård 2023-03-10 00:29:22 +01:00
parent 612cc4cf62
commit d61bc34521
2 changed files with 49 additions and 55 deletions

View file

@ -78,10 +78,10 @@ pub struct SocketWorker {
send_buffers: SendBuffers,
recv_helper: RecvHelper,
local_responses: VecDeque<(Response, CanonicalSocketAddr)>,
resubmittable_squeue_buf: Vec<io_uring::squeue::Entry>,
recv_entry: io_uring::squeue::Entry,
pulse_timeout_entry: io_uring::squeue::Entry,
cleaning_timeout_entry: io_uring::squeue::Entry,
resubmittable_sqe_buf: Vec<io_uring::squeue::Entry>,
recv_sqe: io_uring::squeue::Entry,
pulse_timeout_sqe: io_uring::squeue::Entry,
cleaning_timeout_sqe: io_uring::squeue::Entry,
}
impl SocketWorker {
@ -124,12 +124,12 @@ impl SocketWorker {
.build()
.unwrap();
let recv_entry = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap());
let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap());
// This timeout enables regular updates of pending_scrape_valid_until
// and wakes the main loop to send any pending responses in the case
// of no incoming requests
let pulse_timeout_entry = {
let pulse_timeout_sqe = {
let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _;
Timeout::new(timespec_ptr)
@ -137,7 +137,7 @@ impl SocketWorker {
.user_data(USER_DATA_PULSE_TIMEOUT)
};
let cleaning_timeout_entry = {
let cleaning_timeout_sqe = {
let timespec_ptr = Box::into_raw(Box::new(
Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval),
)) as *const _;
@ -147,10 +147,10 @@ impl SocketWorker {
.user_data(USER_DATA_CLEANING_TIMEOUT)
};
let resubmittable_squeue_buf = vec![
recv_entry.clone(),
pulse_timeout_entry.clone(),
cleaning_timeout_entry.clone(),
let resubmittable_sqe_buf = vec![
recv_sqe.clone(),
pulse_timeout_sqe.clone(),
cleaning_timeout_sqe.clone(),
];
let mut worker = Self {
@ -166,10 +166,10 @@ impl SocketWorker {
recv_helper,
local_responses: Default::default(),
buf_ring,
recv_entry,
pulse_timeout_entry,
cleaning_timeout_entry,
resubmittable_squeue_buf,
recv_sqe,
pulse_timeout_sqe,
cleaning_timeout_sqe,
resubmittable_sqe_buf,
socket,
};
@ -183,18 +183,18 @@ impl SocketWorker {
);
loop {
for sqe in self.resubmittable_squeue_buf.drain(..) {
for sqe in self.resubmittable_sqe_buf.drain(..) {
unsafe { ring.submission().push(&sqe).unwrap() };
}
let mut num_send_added = 0;
let sq_space = {
let sq = ring.submission();
sq.capacity() - sq.len()
};
let mut num_send_added = 0;
// Enqueue local responses
for _ in 0..sq_space {
if let Some((response, addr)) = self.local_responses.pop_front() {
@ -218,14 +218,8 @@ impl SocketWorker {
}
}
let sq_space = {
let sq = ring.submission();
sq.capacity() - sq.len()
};
// Enqueue swarm worker responses
for _ in 0..sq_space {
for _ in 0..(sq_space - num_send_added) {
if let Some((response, addr)) = self.get_next_swarm_response() {
match self.send_buffers.prepare_entry(&response, addr) {
Ok(entry) => {
@ -258,30 +252,7 @@ impl SocketWorker {
self.handle_cqe(&mut pending_scrape_valid_until, cqe);
}
self.send_buffers.reset_index();
}
}
fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> {
loop {
match self.response_receiver.try_recv() {
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
return Some((Response::AnnounceIpv4(response), addr));
}
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
return Some((Response::AnnounceIpv6(response), addr));
}
Ok((ConnectedResponse::Scrape(response), addr)) => {
if let Some(response) =
self.pending_scrape_responses.add_and_get_finished(response)
{
return Some((Response::Scrape(response), addr));
}
}
Err(_) => {
return None;
}
}
self.send_buffers.reset_likely_next_free_index();
}
}
@ -295,7 +266,7 @@ impl SocketWorker {
self.handle_recv_cqe(*pending_scrape_valid_until, &cqe);
if !io_uring::cqueue::more(cqe.flags()) {
self.resubmittable_squeue_buf.push(self.recv_entry.clone());
self.resubmittable_sqe_buf.push(self.recv_sqe.clone());
}
}
USER_DATA_PULSE_TIMEOUT => {
@ -310,15 +281,15 @@ impl SocketWorker {
self.response_receiver.len()
);
self.resubmittable_squeue_buf
.push(self.pulse_timeout_entry.clone());
self.resubmittable_sqe_buf
.push(self.pulse_timeout_sqe.clone());
}
USER_DATA_CLEANING_TIMEOUT => {
self.pending_scrape_responses
.clean(self.server_start_instant.seconds_elapsed());
self.resubmittable_squeue_buf
.push(self.cleaning_timeout_entry.clone());
self.resubmittable_sqe_buf
.push(self.cleaning_timeout_sqe.clone());
}
send_buffer_index => {
let result = cqe.result();
@ -528,6 +499,29 @@ impl SocketWorker {
}
}
}
fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> {
loop {
match self.response_receiver.try_recv() {
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
return Some((Response::AnnounceIpv4(response), addr));
}
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
return Some((Response::AnnounceIpv6(response), addr));
}
Ok((ConnectedResponse::Scrape(response), addr)) => {
if let Some(response) =
self.pending_scrape_responses.add_and_get_finished(response)
{
return Some((Response::Scrape(response), addr));
}
}
Err(_) => {
return None;
}
}
}
}
}
pub fn supported_on_current_kernel() -> anyhow::Result<()> {

View file

@ -204,7 +204,7 @@ impl SendBuffers {
}
/// Call after going through completion queue
pub fn reset_index(&mut self) {
pub fn reset_likely_next_free_index(&mut self) {
self.likely_next_free_index = 0;
}