From 612cc4cf627add2c06a145a051f01c2643aaadde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 10 Mar 2023 00:17:39 +0100 Subject: [PATCH 1/2] udp: uring: refactor SocketWorker, fix Timespec UB --- aquatic_udp/src/workers/socket/uring/mod.rs | 295 +++++++++++--------- 1 file changed, 161 insertions(+), 134 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index e93ad61..edc8441 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -71,15 +71,17 @@ pub struct SocketWorker { access_list_cache: AccessListCache, validator: ConnectionValidator, server_start_instant: ServerStartInstant, + #[allow(dead_code)] + socket: UdpSocket, pending_scrape_responses: PendingScrapeResponseSlab, + buf_ring: BufRing, send_buffers: SendBuffers, recv_helper: RecvHelper, local_responses: VecDeque<(Response, CanonicalSocketAddr)>, - pulse_timeout: Timespec, - cleaning_timeout: Timespec, - buf_ring: BufRing, - #[allow(dead_code)] - socket: UdpSocket, + resubmittable_squeue_buf: Vec, + recv_entry: io_uring::squeue::Entry, + pulse_timeout_entry: io_uring::squeue::Entry, + cleaning_timeout_entry: io_uring::squeue::Entry, } impl SocketWorker { @@ -101,8 +103,6 @@ impl SocketWorker { let access_list_cache = create_access_list_cache(&shared_state.access_list); let send_buffers = SendBuffers::new(&config, send_buffer_entries as usize); let recv_helper = RecvHelper::new(&config); - let cleaning_timeout = - Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval); let ring = IoUring::builder() .setup_coop_taskrun() @@ -124,6 +124,35 @@ impl SocketWorker { .build() .unwrap(); + let recv_entry = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); + + // This timeout enables regular updates of pending_scrape_valid_until + // and wakes the main loop to send any pending responses in the case + // of no incoming requests + let pulse_timeout_entry = { + let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _; + + Timeout::new(timespec_ptr) + .build() + .user_data(USER_DATA_PULSE_TIMEOUT) + }; + + let cleaning_timeout_entry = { + let timespec_ptr = Box::into_raw(Box::new( + Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval), + )) as *const _; + + Timeout::new(timespec_ptr) + .build() + .user_data(USER_DATA_CLEANING_TIMEOUT) + }; + + let resubmittable_squeue_buf = vec![ + recv_entry.clone(), + pulse_timeout_entry.clone(), + cleaning_timeout_entry.clone(), + ]; + let mut worker = Self { config, shared_state, @@ -136,42 +165,25 @@ impl SocketWorker { send_buffers, recv_helper, local_responses: Default::default(), - pulse_timeout: Timespec::new().sec(1), - cleaning_timeout, buf_ring, + recv_entry, + pulse_timeout_entry, + cleaning_timeout_entry, + resubmittable_squeue_buf, socket, }; CurrentRing::with(|ring| worker.run_inner(ring)); } - pub fn run_inner(&mut self, ring: &mut IoUring) { + fn run_inner(&mut self, ring: &mut IoUring) { let mut pending_scrape_valid_until = ValidUntil::new( self.server_start_instant, self.config.cleaning.max_pending_scrape_age, ); - let recv_entry = self - .recv_helper - .create_entry(self.buf_ring.bgid().try_into().unwrap()); - // This timeout enables regular updates of pending_scrape_valid_until - // and wakes the main loop to send any pending responses in the case - // of no incoming requests - let pulse_timeout_entry = Timeout::new(&self.pulse_timeout as *const _) - .build() - .user_data(USER_DATA_PULSE_TIMEOUT); - let cleaning_timeout_entry = Timeout::new(&self.cleaning_timeout as *const _) - .build() - .user_data(USER_DATA_CLEANING_TIMEOUT); - - let mut squeue_buf = vec![ - recv_entry.clone(), - pulse_timeout_entry.clone(), - cleaning_timeout_entry.clone(), - ]; - loop { - for sqe in squeue_buf.drain(..) { + for sqe in self.resubmittable_squeue_buf.drain(..) { unsafe { ring.submission().push(&sqe).unwrap() }; } @@ -213,42 +225,25 @@ impl SocketWorker { }; // Enqueue swarm worker responses - 'outer: for _ in 0..sq_space { - let (response, addr) = loop { - match self.response_receiver.try_recv() { - Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => { - break (Response::AnnounceIpv4(response), addr); - } - Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => { - break (Response::AnnounceIpv6(response), addr); - } - Ok((ConnectedResponse::Scrape(response), addr)) => { - if let Some(response) = - self.pending_scrape_responses.add_and_get_finished(response) - { - break (Response::Scrape(response), addr); - } - } - Err(_) => { - break 'outer; - } - } - }; + for _ in 0..sq_space { + if let Some((response, addr)) = self.get_next_swarm_response() { + match self.send_buffers.prepare_entry(&response, addr) { + Ok(entry) => { + unsafe { ring.submission().push(&entry).unwrap() }; - match self.send_buffers.prepare_entry(&response, addr) { - Ok(entry) => { - unsafe { ring.submission().push(&entry).unwrap() }; + num_send_added += 1; + } + Err(send_buffers::Error::NoBuffers) => { + self.local_responses.push_back((response, addr)); - num_send_added += 1; - } - Err(send_buffers::Error::NoBuffers) => { - self.local_responses.push_back((response, addr)); - - break; - } - Err(send_buffers::Error::SerializationFailed(err)) => { - ::log::error!("Failed serializing response: {:#}", err); + break; + } + Err(send_buffers::Error::SerializationFailed(err)) => { + ::log::error!("Failed serializing response: {:#}", err); + } } + } else { + break; } } @@ -260,83 +255,115 @@ impl SocketWorker { .unwrap(); for cqe in ring.completion() { - match cqe.user_data() { - USER_DATA_RECV => { - self.handle_recv_cqe(pending_scrape_valid_until, &cqe); - - if !io_uring::cqueue::more(cqe.flags()) { - squeue_buf.push(recv_entry.clone()); - } - } - USER_DATA_PULSE_TIMEOUT => { - pending_scrape_valid_until = ValidUntil::new( - self.server_start_instant, - self.config.cleaning.max_pending_scrape_age, - ); - - ::log::info!( - "pending responses: {} local, {} swarm", - self.local_responses.len(), - self.response_receiver.len() - ); - - squeue_buf.push(pulse_timeout_entry.clone()); - } - USER_DATA_CLEANING_TIMEOUT => { - self.pending_scrape_responses - .clean(self.server_start_instant.seconds_elapsed()); - - squeue_buf.push(cleaning_timeout_entry.clone()); - } - send_buffer_index => { - let result = cqe.result(); - - if result < 0 { - ::log::error!( - "Couldn't send response: {:#}", - ::std::io::Error::from_raw_os_error(-result) - ); - } else if self.config.statistics.active() { - let send_buffer_index = send_buffer_index as usize; - - let (response_type, receiver_is_ipv4) = - self.send_buffers.response_type_and_ipv4(send_buffer_index); - - let (statistics, extra_bytes) = if receiver_is_ipv4 { - (&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4) - } else { - (&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6) - }; - - statistics - .bytes_sent - .fetch_add(result as usize + extra_bytes, Ordering::Relaxed); - - let response_counter = match response_type { - ResponseType::Connect => &statistics.responses_sent_connect, - ResponseType::Announce => &statistics.responses_sent_announce, - ResponseType::Scrape => &statistics.responses_sent_scrape, - ResponseType::Error => &statistics.responses_sent_error, - }; - - response_counter.fetch_add(1, Ordering::Relaxed); - } - - // Safety: OK because cqe using buffer has been - // returned and contents will no longer be accessed - // by kernel - unsafe { - self.send_buffers - .mark_buffer_as_free(send_buffer_index as usize); - } - } - } + self.handle_cqe(&mut pending_scrape_valid_until, cqe); } self.send_buffers.reset_index(); } } + fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> { + loop { + match self.response_receiver.try_recv() { + Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => { + return Some((Response::AnnounceIpv4(response), addr)); + } + Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => { + return Some((Response::AnnounceIpv6(response), addr)); + } + Ok((ConnectedResponse::Scrape(response), addr)) => { + if let Some(response) = + self.pending_scrape_responses.add_and_get_finished(response) + { + return Some((Response::Scrape(response), addr)); + } + } + Err(_) => { + return None; + } + } + } + } + + fn handle_cqe( + &mut self, + pending_scrape_valid_until: &mut ValidUntil, + cqe: io_uring::cqueue::Entry, + ) { + match cqe.user_data() { + USER_DATA_RECV => { + self.handle_recv_cqe(*pending_scrape_valid_until, &cqe); + + if !io_uring::cqueue::more(cqe.flags()) { + self.resubmittable_squeue_buf.push(self.recv_entry.clone()); + } + } + USER_DATA_PULSE_TIMEOUT => { + *pending_scrape_valid_until = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_pending_scrape_age, + ); + + ::log::info!( + "pending responses: {} local, {} swarm", + self.local_responses.len(), + self.response_receiver.len() + ); + + self.resubmittable_squeue_buf + .push(self.pulse_timeout_entry.clone()); + } + USER_DATA_CLEANING_TIMEOUT => { + self.pending_scrape_responses + .clean(self.server_start_instant.seconds_elapsed()); + + self.resubmittable_squeue_buf + .push(self.cleaning_timeout_entry.clone()); + } + send_buffer_index => { + let result = cqe.result(); + + if result < 0 { + ::log::error!( + "Couldn't send response: {:#}", + ::std::io::Error::from_raw_os_error(-result) + ); + } else if self.config.statistics.active() { + let send_buffer_index = send_buffer_index as usize; + + let (response_type, receiver_is_ipv4) = + self.send_buffers.response_type_and_ipv4(send_buffer_index); + + let (statistics, extra_bytes) = if receiver_is_ipv4 { + (&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4) + } else { + (&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6) + }; + + statistics + .bytes_sent + .fetch_add(result as usize + extra_bytes, Ordering::Relaxed); + + let response_counter = match response_type { + ResponseType::Connect => &statistics.responses_sent_connect, + ResponseType::Announce => &statistics.responses_sent_announce, + ResponseType::Scrape => &statistics.responses_sent_scrape, + ResponseType::Error => &statistics.responses_sent_error, + }; + + response_counter.fetch_add(1, Ordering::Relaxed); + } + + // Safety: OK because cqe using buffer has been returned and + // contents will no longer be accessed by kernel + unsafe { + self.send_buffers + .mark_buffer_as_free(send_buffer_index as usize); + } + } + } + } + fn handle_recv_cqe( &mut self, pending_scrape_valid_until: ValidUntil, From d61bc345215333172e6dda2d1f53d6517c1d24bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 10 Mar 2023 00:29:22 +0100 Subject: [PATCH 2/2] udp: uring: minor refactoring --- aquatic_udp/src/workers/socket/uring/mod.rs | 102 +++++++++--------- .../src/workers/socket/uring/send_buffers.rs | 2 +- 2 files changed, 49 insertions(+), 55 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index edc8441..e457b2e 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -78,10 +78,10 @@ pub struct SocketWorker { send_buffers: SendBuffers, recv_helper: RecvHelper, local_responses: VecDeque<(Response, CanonicalSocketAddr)>, - resubmittable_squeue_buf: Vec, - recv_entry: io_uring::squeue::Entry, - pulse_timeout_entry: io_uring::squeue::Entry, - cleaning_timeout_entry: io_uring::squeue::Entry, + resubmittable_sqe_buf: Vec, + recv_sqe: io_uring::squeue::Entry, + pulse_timeout_sqe: io_uring::squeue::Entry, + cleaning_timeout_sqe: io_uring::squeue::Entry, } impl SocketWorker { @@ -124,12 +124,12 @@ impl SocketWorker { .build() .unwrap(); - let recv_entry = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); + let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); // This timeout enables regular updates of pending_scrape_valid_until // and wakes the main loop to send any pending responses in the case // of no incoming requests - let pulse_timeout_entry = { + let pulse_timeout_sqe = { let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _; Timeout::new(timespec_ptr) @@ -137,7 +137,7 @@ impl SocketWorker { .user_data(USER_DATA_PULSE_TIMEOUT) }; - let cleaning_timeout_entry = { + let cleaning_timeout_sqe = { let timespec_ptr = Box::into_raw(Box::new( Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval), )) as *const _; @@ -147,10 +147,10 @@ impl SocketWorker { .user_data(USER_DATA_CLEANING_TIMEOUT) }; - let resubmittable_squeue_buf = vec![ - recv_entry.clone(), - pulse_timeout_entry.clone(), - cleaning_timeout_entry.clone(), + let resubmittable_sqe_buf = vec![ + recv_sqe.clone(), + pulse_timeout_sqe.clone(), + cleaning_timeout_sqe.clone(), ]; let mut worker = Self { @@ -166,10 +166,10 @@ impl SocketWorker { recv_helper, local_responses: Default::default(), buf_ring, - recv_entry, - pulse_timeout_entry, - cleaning_timeout_entry, - resubmittable_squeue_buf, + recv_sqe, + pulse_timeout_sqe, + cleaning_timeout_sqe, + resubmittable_sqe_buf, socket, }; @@ -183,18 +183,18 @@ impl SocketWorker { ); loop { - for sqe in self.resubmittable_squeue_buf.drain(..) { + for sqe in self.resubmittable_sqe_buf.drain(..) { unsafe { ring.submission().push(&sqe).unwrap() }; } - let mut num_send_added = 0; - let sq_space = { let sq = ring.submission(); sq.capacity() - sq.len() }; + let mut num_send_added = 0; + // Enqueue local responses for _ in 0..sq_space { if let Some((response, addr)) = self.local_responses.pop_front() { @@ -218,14 +218,8 @@ impl SocketWorker { } } - let sq_space = { - let sq = ring.submission(); - - sq.capacity() - sq.len() - }; - // Enqueue swarm worker responses - for _ in 0..sq_space { + for _ in 0..(sq_space - num_send_added) { if let Some((response, addr)) = self.get_next_swarm_response() { match self.send_buffers.prepare_entry(&response, addr) { Ok(entry) => { @@ -258,30 +252,7 @@ impl SocketWorker { self.handle_cqe(&mut pending_scrape_valid_until, cqe); } - self.send_buffers.reset_index(); - } - } - - fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> { - loop { - match self.response_receiver.try_recv() { - Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => { - return Some((Response::AnnounceIpv4(response), addr)); - } - Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => { - return Some((Response::AnnounceIpv6(response), addr)); - } - Ok((ConnectedResponse::Scrape(response), addr)) => { - if let Some(response) = - self.pending_scrape_responses.add_and_get_finished(response) - { - return Some((Response::Scrape(response), addr)); - } - } - Err(_) => { - return None; - } - } + self.send_buffers.reset_likely_next_free_index(); } } @@ -295,7 +266,7 @@ impl SocketWorker { self.handle_recv_cqe(*pending_scrape_valid_until, &cqe); if !io_uring::cqueue::more(cqe.flags()) { - self.resubmittable_squeue_buf.push(self.recv_entry.clone()); + self.resubmittable_sqe_buf.push(self.recv_sqe.clone()); } } USER_DATA_PULSE_TIMEOUT => { @@ -310,15 +281,15 @@ impl SocketWorker { self.response_receiver.len() ); - self.resubmittable_squeue_buf - .push(self.pulse_timeout_entry.clone()); + self.resubmittable_sqe_buf + .push(self.pulse_timeout_sqe.clone()); } USER_DATA_CLEANING_TIMEOUT => { self.pending_scrape_responses .clean(self.server_start_instant.seconds_elapsed()); - self.resubmittable_squeue_buf - .push(self.cleaning_timeout_entry.clone()); + self.resubmittable_sqe_buf + .push(self.cleaning_timeout_sqe.clone()); } send_buffer_index => { let result = cqe.result(); @@ -528,6 +499,29 @@ impl SocketWorker { } } } + + fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> { + loop { + match self.response_receiver.try_recv() { + Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => { + return Some((Response::AnnounceIpv4(response), addr)); + } + Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => { + return Some((Response::AnnounceIpv6(response), addr)); + } + Ok((ConnectedResponse::Scrape(response), addr)) => { + if let Some(response) = + self.pending_scrape_responses.add_and_get_finished(response) + { + return Some((Response::Scrape(response), addr)); + } + } + Err(_) => { + return None; + } + } + } + } } pub fn supported_on_current_kernel() -> anyhow::Result<()> { diff --git a/aquatic_udp/src/workers/socket/uring/send_buffers.rs b/aquatic_udp/src/workers/socket/uring/send_buffers.rs index 3ca69a8..3125416 100644 --- a/aquatic_udp/src/workers/socket/uring/send_buffers.rs +++ b/aquatic_udp/src/workers/socket/uring/send_buffers.rs @@ -204,7 +204,7 @@ impl SendBuffers { } /// Call after going through completion queue - pub fn reset_index(&mut self) { + pub fn reset_likely_next_free_index(&mut self) { self.likely_next_free_index = 0; }