udp: use shared swarm state in io uring implementation

This commit is contained in:
Joakim Frostegård 2024-02-10 11:35:52 +01:00
parent 2da966098f
commit a2e1dd4eef
2 changed files with 27 additions and 82 deletions

View file

@ -48,7 +48,6 @@ pub fn run_socket_worker(
validator: ConnectionValidator,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
/*
#[cfg(all(target_os = "linux", feature = "io-uring"))]
if config.network.use_io_uring {
self::uring::supported_on_current_kernel().context("check for io_uring compatibility")?;
@ -57,13 +56,11 @@ pub fn run_socket_worker(
config,
shared_state,
statistics,
statistics_sender,
validator,
request_sender,
response_receiver,
priv_dropper,
);
}
*/
self::mio::SocketWorker::run(
config,

View file

@ -11,6 +11,7 @@ use std::sync::atomic::Ordering;
use anyhow::Context;
use aquatic_common::access_list::AccessListCache;
use crossbeam_channel::Sender;
use io_uring::opcode::Timeout;
use io_uring::types::{Fixed, Timespec};
use io_uring::{IoUring, Probe};
@ -20,6 +21,8 @@ use aquatic_common::{
ValidUntil,
};
use aquatic_udp_protocol::*;
use rand::rngs::SmallRng;
use rand::SeedableRng;
use crate::common::*;
use crate::config::Config;
@ -28,7 +31,6 @@ use self::buf_ring::BufRing;
use self::recv_helper::RecvHelper;
use self::send_buffers::{ResponseType, SendBuffers};
use super::storage::PendingScrapeResponseSlab;
use super::validator::ConnectionValidator;
use super::{create_socket, EXTRA_PACKET_SIZE_IPV4, EXTRA_PACKET_SIZE_IPV6};
@ -76,13 +78,11 @@ pub struct SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
statistics_sender: Sender<StatisticsMessage>,
access_list_cache: AccessListCache,
validator: ConnectionValidator,
#[allow(dead_code)]
socket: UdpSocket,
pending_scrape_responses: PendingScrapeResponseSlab,
buf_ring: BufRing,
send_buffers: SendBuffers,
recv_helper: RecvHelper,
@ -91,7 +91,8 @@ pub struct SocketWorker {
recv_sqe: io_uring::squeue::Entry,
pulse_timeout_sqe: io_uring::squeue::Entry,
cleaning_timeout_sqe: io_uring::squeue::Entry,
pending_scrape_valid_until: ValidUntil,
peer_valid_until: ValidUntil,
rng: SmallRng,
}
impl SocketWorker {
@ -99,9 +100,8 @@ impl SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
statistics_sender: Sender<StatisticsMessage>,
validator: ConnectionValidator,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
let ring_entries = config.network.ring_size.next_power_of_two();
@ -163,20 +163,18 @@ impl SocketWorker {
cleaning_timeout_sqe.clone(),
];
let pending_scrape_valid_until = ValidUntil::new(
let peer_valid_until = ValidUntil::new(
shared_state.server_start_instant,
config.cleaning.max_pending_scrape_age,
config.cleaning.max_peer_age,
);
let mut worker = Self {
config,
shared_state,
statistics,
statistics_sender,
validator,
request_sender,
response_receiver,
access_list_cache,
pending_scrape_responses: Default::default(),
send_buffers,
recv_helper,
local_responses: Default::default(),
@ -186,7 +184,8 @@ impl SocketWorker {
cleaning_timeout_sqe,
resubmittable_sqe_buf,
socket,
pending_scrape_valid_until,
peer_valid_until,
rng: SmallRng::from_entropy(),
};
CurrentRing::with(|ring| worker.run_inner(ring));
@ -231,43 +230,6 @@ impl SocketWorker {
}
}
// Enqueue swarm worker responses
for _ in 0..(sq_space - num_send_added) {
let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() {
r
} else {
break;
};
let response = match response {
ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r),
ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r),
ConnectedResponse::Scrape(r) => {
if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) {
Response::Scrape(r)
} else {
continue;
}
}
};
match self.send_buffers.prepare_entry(response, addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers(response)) => {
self.local_responses.push_back((response, addr));
break;
}
Err(send_buffers::Error::SerializationFailed(err)) => {
::log::error!("Failed serializing response: {:#}", err);
}
}
}
// Wait for all sendmsg entries to complete. If none were added,
// wait for at least one recvmsg or timeout in order to avoid
// busy-polling if there is no incoming data.
@ -293,18 +255,15 @@ impl SocketWorker {
}
}
USER_DATA_PULSE_TIMEOUT => {
self.pending_scrape_valid_until = ValidUntil::new(
self.peer_valid_until = ValidUntil::new(
self.shared_state.server_start_instant,
self.config.cleaning.max_pending_scrape_age,
self.config.cleaning.max_peer_age,
);
self.resubmittable_sqe_buf
.push(self.pulse_timeout_sqe.clone());
}
USER_DATA_CLEANING_TIMEOUT => {
self.pending_scrape_responses
.clean(self.shared_state.server_start_instant.seconds_elapsed());
self.resubmittable_sqe_buf
.push(self.cleaning_timeout_sqe.clone());
}
@ -470,16 +429,16 @@ impl SocketWorker {
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let worker_index =
SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash);
let response = self.shared_state.torrent_maps.announce(
&self.config,
&self.statistics_sender,
&mut self.rng,
&request,
src,
self.peer_valid_until,
);
if self
.request_sender
.try_send_to(worker_index, ConnectedRequest::Announce(request), src)
.is_err()
{
::log::warn!("request sender full, dropping request");
}
self.local_responses.push_back((response, src));
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
@ -495,21 +454,10 @@ impl SocketWorker {
.validator
.connection_id_valid(src, request.connection_id)
{
let split_requests = self.pending_scrape_responses.prepare_split_requests(
&self.config,
request,
self.pending_scrape_valid_until,
);
let response =
Response::Scrape(self.shared_state.torrent_maps.scrape(request, src));
for (swarm_worker_index, request) in split_requests {
if self
.request_sender
.try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src)
.is_err()
{
::log::warn!("request sender full, dropping request");
}
}
self.local_responses.push_back((response, src));
}
}
}