mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 02:35:31 +00:00
Merge pull request #134 from greatest-ape/work-2023-03-08_2
udp: uring: refactor, fix UB
This commit is contained in:
commit
c290062feb
2 changed files with 165 additions and 144 deletions
|
|
@ -71,15 +71,17 @@ pub struct SocketWorker {
|
||||||
access_list_cache: AccessListCache,
|
access_list_cache: AccessListCache,
|
||||||
validator: ConnectionValidator,
|
validator: ConnectionValidator,
|
||||||
server_start_instant: ServerStartInstant,
|
server_start_instant: ServerStartInstant,
|
||||||
|
#[allow(dead_code)]
|
||||||
|
socket: UdpSocket,
|
||||||
pending_scrape_responses: PendingScrapeResponseSlab,
|
pending_scrape_responses: PendingScrapeResponseSlab,
|
||||||
|
buf_ring: BufRing,
|
||||||
send_buffers: SendBuffers,
|
send_buffers: SendBuffers,
|
||||||
recv_helper: RecvHelper,
|
recv_helper: RecvHelper,
|
||||||
local_responses: VecDeque<(Response, CanonicalSocketAddr)>,
|
local_responses: VecDeque<(Response, CanonicalSocketAddr)>,
|
||||||
pulse_timeout: Timespec,
|
resubmittable_sqe_buf: Vec<io_uring::squeue::Entry>,
|
||||||
cleaning_timeout: Timespec,
|
recv_sqe: io_uring::squeue::Entry,
|
||||||
buf_ring: BufRing,
|
pulse_timeout_sqe: io_uring::squeue::Entry,
|
||||||
#[allow(dead_code)]
|
cleaning_timeout_sqe: io_uring::squeue::Entry,
|
||||||
socket: UdpSocket,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SocketWorker {
|
impl SocketWorker {
|
||||||
|
|
@ -101,8 +103,6 @@ impl SocketWorker {
|
||||||
let access_list_cache = create_access_list_cache(&shared_state.access_list);
|
let access_list_cache = create_access_list_cache(&shared_state.access_list);
|
||||||
let send_buffers = SendBuffers::new(&config, send_buffer_entries as usize);
|
let send_buffers = SendBuffers::new(&config, send_buffer_entries as usize);
|
||||||
let recv_helper = RecvHelper::new(&config);
|
let recv_helper = RecvHelper::new(&config);
|
||||||
let cleaning_timeout =
|
|
||||||
Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval);
|
|
||||||
|
|
||||||
let ring = IoUring::builder()
|
let ring = IoUring::builder()
|
||||||
.setup_coop_taskrun()
|
.setup_coop_taskrun()
|
||||||
|
|
@ -124,6 +124,35 @@ impl SocketWorker {
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap());
|
||||||
|
|
||||||
|
// This timeout enables regular updates of pending_scrape_valid_until
|
||||||
|
// and wakes the main loop to send any pending responses in the case
|
||||||
|
// of no incoming requests
|
||||||
|
let pulse_timeout_sqe = {
|
||||||
|
let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _;
|
||||||
|
|
||||||
|
Timeout::new(timespec_ptr)
|
||||||
|
.build()
|
||||||
|
.user_data(USER_DATA_PULSE_TIMEOUT)
|
||||||
|
};
|
||||||
|
|
||||||
|
let cleaning_timeout_sqe = {
|
||||||
|
let timespec_ptr = Box::into_raw(Box::new(
|
||||||
|
Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval),
|
||||||
|
)) as *const _;
|
||||||
|
|
||||||
|
Timeout::new(timespec_ptr)
|
||||||
|
.build()
|
||||||
|
.user_data(USER_DATA_CLEANING_TIMEOUT)
|
||||||
|
};
|
||||||
|
|
||||||
|
let resubmittable_sqe_buf = vec![
|
||||||
|
recv_sqe.clone(),
|
||||||
|
pulse_timeout_sqe.clone(),
|
||||||
|
cleaning_timeout_sqe.clone(),
|
||||||
|
];
|
||||||
|
|
||||||
let mut worker = Self {
|
let mut worker = Self {
|
||||||
config,
|
config,
|
||||||
shared_state,
|
shared_state,
|
||||||
|
|
@ -136,53 +165,36 @@ impl SocketWorker {
|
||||||
send_buffers,
|
send_buffers,
|
||||||
recv_helper,
|
recv_helper,
|
||||||
local_responses: Default::default(),
|
local_responses: Default::default(),
|
||||||
pulse_timeout: Timespec::new().sec(1),
|
|
||||||
cleaning_timeout,
|
|
||||||
buf_ring,
|
buf_ring,
|
||||||
|
recv_sqe,
|
||||||
|
pulse_timeout_sqe,
|
||||||
|
cleaning_timeout_sqe,
|
||||||
|
resubmittable_sqe_buf,
|
||||||
socket,
|
socket,
|
||||||
};
|
};
|
||||||
|
|
||||||
CurrentRing::with(|ring| worker.run_inner(ring));
|
CurrentRing::with(|ring| worker.run_inner(ring));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_inner(&mut self, ring: &mut IoUring) {
|
fn run_inner(&mut self, ring: &mut IoUring) {
|
||||||
let mut pending_scrape_valid_until = ValidUntil::new(
|
let mut pending_scrape_valid_until = ValidUntil::new(
|
||||||
self.server_start_instant,
|
self.server_start_instant,
|
||||||
self.config.cleaning.max_pending_scrape_age,
|
self.config.cleaning.max_pending_scrape_age,
|
||||||
);
|
);
|
||||||
|
|
||||||
let recv_entry = self
|
|
||||||
.recv_helper
|
|
||||||
.create_entry(self.buf_ring.bgid().try_into().unwrap());
|
|
||||||
// This timeout enables regular updates of pending_scrape_valid_until
|
|
||||||
// and wakes the main loop to send any pending responses in the case
|
|
||||||
// of no incoming requests
|
|
||||||
let pulse_timeout_entry = Timeout::new(&self.pulse_timeout as *const _)
|
|
||||||
.build()
|
|
||||||
.user_data(USER_DATA_PULSE_TIMEOUT);
|
|
||||||
let cleaning_timeout_entry = Timeout::new(&self.cleaning_timeout as *const _)
|
|
||||||
.build()
|
|
||||||
.user_data(USER_DATA_CLEANING_TIMEOUT);
|
|
||||||
|
|
||||||
let mut squeue_buf = vec![
|
|
||||||
recv_entry.clone(),
|
|
||||||
pulse_timeout_entry.clone(),
|
|
||||||
cleaning_timeout_entry.clone(),
|
|
||||||
];
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
for sqe in squeue_buf.drain(..) {
|
for sqe in self.resubmittable_sqe_buf.drain(..) {
|
||||||
unsafe { ring.submission().push(&sqe).unwrap() };
|
unsafe { ring.submission().push(&sqe).unwrap() };
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut num_send_added = 0;
|
|
||||||
|
|
||||||
let sq_space = {
|
let sq_space = {
|
||||||
let sq = ring.submission();
|
let sq = ring.submission();
|
||||||
|
|
||||||
sq.capacity() - sq.len()
|
sq.capacity() - sq.len()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut num_send_added = 0;
|
||||||
|
|
||||||
// Enqueue local responses
|
// Enqueue local responses
|
||||||
for _ in 0..sq_space {
|
for _ in 0..sq_space {
|
||||||
if let Some((response, addr)) = self.local_responses.pop_front() {
|
if let Some((response, addr)) = self.local_responses.pop_front() {
|
||||||
|
|
@ -206,49 +218,26 @@ impl SocketWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let sq_space = {
|
|
||||||
let sq = ring.submission();
|
|
||||||
|
|
||||||
sq.capacity() - sq.len()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Enqueue swarm worker responses
|
// Enqueue swarm worker responses
|
||||||
'outer: for _ in 0..sq_space {
|
for _ in 0..(sq_space - num_send_added) {
|
||||||
let (response, addr) = loop {
|
if let Some((response, addr)) = self.get_next_swarm_response() {
|
||||||
match self.response_receiver.try_recv() {
|
match self.send_buffers.prepare_entry(&response, addr) {
|
||||||
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
|
Ok(entry) => {
|
||||||
break (Response::AnnounceIpv4(response), addr);
|
unsafe { ring.submission().push(&entry).unwrap() };
|
||||||
}
|
|
||||||
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
|
|
||||||
break (Response::AnnounceIpv6(response), addr);
|
|
||||||
}
|
|
||||||
Ok((ConnectedResponse::Scrape(response), addr)) => {
|
|
||||||
if let Some(response) =
|
|
||||||
self.pending_scrape_responses.add_and_get_finished(response)
|
|
||||||
{
|
|
||||||
break (Response::Scrape(response), addr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
break 'outer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.send_buffers.prepare_entry(&response, addr) {
|
num_send_added += 1;
|
||||||
Ok(entry) => {
|
}
|
||||||
unsafe { ring.submission().push(&entry).unwrap() };
|
Err(send_buffers::Error::NoBuffers) => {
|
||||||
|
self.local_responses.push_back((response, addr));
|
||||||
|
|
||||||
num_send_added += 1;
|
break;
|
||||||
}
|
}
|
||||||
Err(send_buffers::Error::NoBuffers) => {
|
Err(send_buffers::Error::SerializationFailed(err)) => {
|
||||||
self.local_responses.push_back((response, addr));
|
::log::error!("Failed serializing response: {:#}", err);
|
||||||
|
}
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(send_buffers::Error::SerializationFailed(err)) => {
|
|
||||||
::log::error!("Failed serializing response: {:#}", err);
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -260,80 +249,89 @@ impl SocketWorker {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
for cqe in ring.completion() {
|
for cqe in ring.completion() {
|
||||||
match cqe.user_data() {
|
self.handle_cqe(&mut pending_scrape_valid_until, cqe);
|
||||||
USER_DATA_RECV => {
|
|
||||||
self.handle_recv_cqe(pending_scrape_valid_until, &cqe);
|
|
||||||
|
|
||||||
if !io_uring::cqueue::more(cqe.flags()) {
|
|
||||||
squeue_buf.push(recv_entry.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
USER_DATA_PULSE_TIMEOUT => {
|
|
||||||
pending_scrape_valid_until = ValidUntil::new(
|
|
||||||
self.server_start_instant,
|
|
||||||
self.config.cleaning.max_pending_scrape_age,
|
|
||||||
);
|
|
||||||
|
|
||||||
::log::info!(
|
|
||||||
"pending responses: {} local, {} swarm",
|
|
||||||
self.local_responses.len(),
|
|
||||||
self.response_receiver.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
squeue_buf.push(pulse_timeout_entry.clone());
|
|
||||||
}
|
|
||||||
USER_DATA_CLEANING_TIMEOUT => {
|
|
||||||
self.pending_scrape_responses
|
|
||||||
.clean(self.server_start_instant.seconds_elapsed());
|
|
||||||
|
|
||||||
squeue_buf.push(cleaning_timeout_entry.clone());
|
|
||||||
}
|
|
||||||
send_buffer_index => {
|
|
||||||
let result = cqe.result();
|
|
||||||
|
|
||||||
if result < 0 {
|
|
||||||
::log::error!(
|
|
||||||
"Couldn't send response: {:#}",
|
|
||||||
::std::io::Error::from_raw_os_error(-result)
|
|
||||||
);
|
|
||||||
} else if self.config.statistics.active() {
|
|
||||||
let send_buffer_index = send_buffer_index as usize;
|
|
||||||
|
|
||||||
let (response_type, receiver_is_ipv4) =
|
|
||||||
self.send_buffers.response_type_and_ipv4(send_buffer_index);
|
|
||||||
|
|
||||||
let (statistics, extra_bytes) = if receiver_is_ipv4 {
|
|
||||||
(&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4)
|
|
||||||
} else {
|
|
||||||
(&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6)
|
|
||||||
};
|
|
||||||
|
|
||||||
statistics
|
|
||||||
.bytes_sent
|
|
||||||
.fetch_add(result as usize + extra_bytes, Ordering::Relaxed);
|
|
||||||
|
|
||||||
let response_counter = match response_type {
|
|
||||||
ResponseType::Connect => &statistics.responses_sent_connect,
|
|
||||||
ResponseType::Announce => &statistics.responses_sent_announce,
|
|
||||||
ResponseType::Scrape => &statistics.responses_sent_scrape,
|
|
||||||
ResponseType::Error => &statistics.responses_sent_error,
|
|
||||||
};
|
|
||||||
|
|
||||||
response_counter.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Safety: OK because cqe using buffer has been
|
|
||||||
// returned and contents will no longer be accessed
|
|
||||||
// by kernel
|
|
||||||
unsafe {
|
|
||||||
self.send_buffers
|
|
||||||
.mark_buffer_as_free(send_buffer_index as usize);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
self.send_buffers.reset_index();
|
self.send_buffers.reset_likely_next_free_index();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_cqe(
|
||||||
|
&mut self,
|
||||||
|
pending_scrape_valid_until: &mut ValidUntil,
|
||||||
|
cqe: io_uring::cqueue::Entry,
|
||||||
|
) {
|
||||||
|
match cqe.user_data() {
|
||||||
|
USER_DATA_RECV => {
|
||||||
|
self.handle_recv_cqe(*pending_scrape_valid_until, &cqe);
|
||||||
|
|
||||||
|
if !io_uring::cqueue::more(cqe.flags()) {
|
||||||
|
self.resubmittable_sqe_buf.push(self.recv_sqe.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
USER_DATA_PULSE_TIMEOUT => {
|
||||||
|
*pending_scrape_valid_until = ValidUntil::new(
|
||||||
|
self.server_start_instant,
|
||||||
|
self.config.cleaning.max_pending_scrape_age,
|
||||||
|
);
|
||||||
|
|
||||||
|
::log::info!(
|
||||||
|
"pending responses: {} local, {} swarm",
|
||||||
|
self.local_responses.len(),
|
||||||
|
self.response_receiver.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
self.resubmittable_sqe_buf
|
||||||
|
.push(self.pulse_timeout_sqe.clone());
|
||||||
|
}
|
||||||
|
USER_DATA_CLEANING_TIMEOUT => {
|
||||||
|
self.pending_scrape_responses
|
||||||
|
.clean(self.server_start_instant.seconds_elapsed());
|
||||||
|
|
||||||
|
self.resubmittable_sqe_buf
|
||||||
|
.push(self.cleaning_timeout_sqe.clone());
|
||||||
|
}
|
||||||
|
send_buffer_index => {
|
||||||
|
let result = cqe.result();
|
||||||
|
|
||||||
|
if result < 0 {
|
||||||
|
::log::error!(
|
||||||
|
"Couldn't send response: {:#}",
|
||||||
|
::std::io::Error::from_raw_os_error(-result)
|
||||||
|
);
|
||||||
|
} else if self.config.statistics.active() {
|
||||||
|
let send_buffer_index = send_buffer_index as usize;
|
||||||
|
|
||||||
|
let (response_type, receiver_is_ipv4) =
|
||||||
|
self.send_buffers.response_type_and_ipv4(send_buffer_index);
|
||||||
|
|
||||||
|
let (statistics, extra_bytes) = if receiver_is_ipv4 {
|
||||||
|
(&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4)
|
||||||
|
} else {
|
||||||
|
(&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6)
|
||||||
|
};
|
||||||
|
|
||||||
|
statistics
|
||||||
|
.bytes_sent
|
||||||
|
.fetch_add(result as usize + extra_bytes, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let response_counter = match response_type {
|
||||||
|
ResponseType::Connect => &statistics.responses_sent_connect,
|
||||||
|
ResponseType::Announce => &statistics.responses_sent_announce,
|
||||||
|
ResponseType::Scrape => &statistics.responses_sent_scrape,
|
||||||
|
ResponseType::Error => &statistics.responses_sent_error,
|
||||||
|
};
|
||||||
|
|
||||||
|
response_counter.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Safety: OK because cqe using buffer has been returned and
|
||||||
|
// contents will no longer be accessed by kernel
|
||||||
|
unsafe {
|
||||||
|
self.send_buffers
|
||||||
|
.mark_buffer_as_free(send_buffer_index as usize);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -501,6 +499,29 @@ impl SocketWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> {
|
||||||
|
loop {
|
||||||
|
match self.response_receiver.try_recv() {
|
||||||
|
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
|
||||||
|
return Some((Response::AnnounceIpv4(response), addr));
|
||||||
|
}
|
||||||
|
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
|
||||||
|
return Some((Response::AnnounceIpv6(response), addr));
|
||||||
|
}
|
||||||
|
Ok((ConnectedResponse::Scrape(response), addr)) => {
|
||||||
|
if let Some(response) =
|
||||||
|
self.pending_scrape_responses.add_and_get_finished(response)
|
||||||
|
{
|
||||||
|
return Some((Response::Scrape(response), addr));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn supported_on_current_kernel() -> anyhow::Result<()> {
|
pub fn supported_on_current_kernel() -> anyhow::Result<()> {
|
||||||
|
|
|
||||||
|
|
@ -204,7 +204,7 @@ impl SendBuffers {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call after going through completion queue
|
/// Call after going through completion queue
|
||||||
pub fn reset_index(&mut self) {
|
pub fn reset_likely_next_free_index(&mut self) {
|
||||||
self.likely_next_free_index = 0;
|
self.likely_next_free_index = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue