From a2e1dd4eef41e3701ff29c58452e969117555c26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 10 Feb 2024 11:35:52 +0100 Subject: [PATCH] udp: use shared swarm state in io uring implementation --- crates/udp/src/workers/socket/mod.rs | 5 +- crates/udp/src/workers/socket/uring/mod.rs | 104 ++++++--------------- 2 files changed, 27 insertions(+), 82 deletions(-) diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index 67e6f7f..2d71fba 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -48,7 +48,6 @@ pub fn run_socket_worker( validator: ConnectionValidator, priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { - /* #[cfg(all(target_os = "linux", feature = "io-uring"))] if config.network.use_io_uring { self::uring::supported_on_current_kernel().context("check for io_uring compatibility")?; @@ -57,13 +56,11 @@ pub fn run_socket_worker( config, shared_state, statistics, + statistics_sender, validator, - request_sender, - response_receiver, priv_dropper, ); } - */ self::mio::SocketWorker::run( config, diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index fe8b490..7ad26c7 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -11,6 +11,7 @@ use std::sync::atomic::Ordering; use anyhow::Context; use aquatic_common::access_list::AccessListCache; +use crossbeam_channel::Sender; use io_uring::opcode::Timeout; use io_uring::types::{Fixed, Timespec}; use io_uring::{IoUring, Probe}; @@ -20,6 +21,8 @@ use aquatic_common::{ ValidUntil, }; use aquatic_udp_protocol::*; +use rand::rngs::SmallRng; +use rand::SeedableRng; use crate::common::*; use crate::config::Config; @@ -28,7 +31,6 @@ use self::buf_ring::BufRing; use self::recv_helper::RecvHelper; use self::send_buffers::{ResponseType, SendBuffers}; -use super::storage::PendingScrapeResponseSlab; use super::validator::ConnectionValidator; use super::{create_socket, EXTRA_PACKET_SIZE_IPV4, EXTRA_PACKET_SIZE_IPV6}; @@ -76,13 +78,11 @@ pub struct SocketWorker { config: Config, shared_state: State, statistics: CachePaddedArc>, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, + statistics_sender: Sender, access_list_cache: AccessListCache, validator: ConnectionValidator, #[allow(dead_code)] socket: UdpSocket, - pending_scrape_responses: PendingScrapeResponseSlab, buf_ring: BufRing, send_buffers: SendBuffers, recv_helper: RecvHelper, @@ -91,7 +91,8 @@ pub struct SocketWorker { recv_sqe: io_uring::squeue::Entry, pulse_timeout_sqe: io_uring::squeue::Entry, cleaning_timeout_sqe: io_uring::squeue::Entry, - pending_scrape_valid_until: ValidUntil, + peer_valid_until: ValidUntil, + rng: SmallRng, } impl SocketWorker { @@ -99,9 +100,8 @@ impl SocketWorker { config: Config, shared_state: State, statistics: CachePaddedArc>, + statistics_sender: Sender, validator: ConnectionValidator, - request_sender: ConnectedRequestSender, - response_receiver: ConnectedResponseReceiver, priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { let ring_entries = config.network.ring_size.next_power_of_two(); @@ -163,20 +163,18 @@ impl SocketWorker { cleaning_timeout_sqe.clone(), ]; - let pending_scrape_valid_until = ValidUntil::new( + let peer_valid_until = ValidUntil::new( shared_state.server_start_instant, - config.cleaning.max_pending_scrape_age, + config.cleaning.max_peer_age, ); let mut worker = Self { config, shared_state, statistics, + statistics_sender, validator, - request_sender, - response_receiver, access_list_cache, - pending_scrape_responses: Default::default(), send_buffers, recv_helper, local_responses: Default::default(), @@ -186,7 +184,8 @@ impl SocketWorker { cleaning_timeout_sqe, resubmittable_sqe_buf, socket, - pending_scrape_valid_until, + peer_valid_until, + rng: SmallRng::from_entropy(), }; CurrentRing::with(|ring| worker.run_inner(ring)); @@ -231,43 +230,6 @@ impl SocketWorker { } } - // Enqueue swarm worker responses - for _ in 0..(sq_space - num_send_added) { - let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() { - r - } else { - break; - }; - - let response = match response { - ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), - ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), - ConnectedResponse::Scrape(r) => { - if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) { - Response::Scrape(r) - } else { - continue; - } - } - }; - - match self.send_buffers.prepare_entry(response, addr) { - Ok(entry) => { - unsafe { ring.submission().push(&entry).unwrap() }; - - num_send_added += 1; - } - Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses.push_back((response, addr)); - - break; - } - Err(send_buffers::Error::SerializationFailed(err)) => { - ::log::error!("Failed serializing response: {:#}", err); - } - } - } - // Wait for all sendmsg entries to complete. If none were added, // wait for at least one recvmsg or timeout in order to avoid // busy-polling if there is no incoming data. @@ -293,18 +255,15 @@ impl SocketWorker { } } USER_DATA_PULSE_TIMEOUT => { - self.pending_scrape_valid_until = ValidUntil::new( + self.peer_valid_until = ValidUntil::new( self.shared_state.server_start_instant, - self.config.cleaning.max_pending_scrape_age, + self.config.cleaning.max_peer_age, ); self.resubmittable_sqe_buf .push(self.pulse_timeout_sqe.clone()); } USER_DATA_CLEANING_TIMEOUT => { - self.pending_scrape_responses - .clean(self.shared_state.server_start_instant.seconds_elapsed()); - self.resubmittable_sqe_buf .push(self.cleaning_timeout_sqe.clone()); } @@ -470,16 +429,16 @@ impl SocketWorker { .load() .allows(access_list_mode, &request.info_hash.0) { - let worker_index = - SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash); + let response = self.shared_state.torrent_maps.announce( + &self.config, + &self.statistics_sender, + &mut self.rng, + &request, + src, + self.peer_valid_until, + ); - if self - .request_sender - .try_send_to(worker_index, ConnectedRequest::Announce(request), src) - .is_err() - { - ::log::warn!("request sender full, dropping request"); - } + self.local_responses.push_back((response, src)); } else { let response = Response::Error(ErrorResponse { transaction_id: request.transaction_id, @@ -495,21 +454,10 @@ impl SocketWorker { .validator .connection_id_valid(src, request.connection_id) { - let split_requests = self.pending_scrape_responses.prepare_split_requests( - &self.config, - request, - self.pending_scrape_valid_until, - ); + let response = + Response::Scrape(self.shared_state.torrent_maps.scrape(request, src)); - for (swarm_worker_index, request) in split_requests { - if self - .request_sender - .try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src) - .is_err() - { - ::log::warn!("request sender full, dropping request"); - } - } + self.local_responses.push_back((response, src)); } } }