udp: reuse response channel mem, add backpressure, faster peer extract

This commit is contained in:
Joakim Frostegård 2023-12-10 12:07:38 +01:00
parent 0e12dd1b13
commit 0c4140165b
15 changed files with 666 additions and 522 deletions

View file

@ -2,6 +2,7 @@ mod buf_ring;
mod recv_helper;
mod send_buffers;
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::net::UdpSocket;
@ -12,7 +13,6 @@ use std::sync::atomic::Ordering;
use anyhow::Context;
use aquatic_common::access_list::AccessListCache;
use aquatic_common::ServerStartInstant;
use crossbeam_channel::Receiver;
use io_uring::opcode::Timeout;
use io_uring::types::{Fixed, Timespec};
use io_uring::{IoUring, Probe};
@ -78,7 +78,7 @@ pub struct SocketWorker {
config: Config,
shared_state: State,
request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
response_receiver: ConnectedResponseReceiver,
access_list_cache: AccessListCache,
validator: ConnectionValidator,
server_start_instant: ServerStartInstant,
@ -104,7 +104,7 @@ impl SocketWorker {
validator: ConnectionValidator,
server_start_instant: ServerStartInstant,
request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
response_receiver: ConnectedResponseReceiver,
priv_dropper: PrivilegeDropper,
) {
let ring_entries = config.network.ring_size.next_power_of_two();
@ -210,14 +210,15 @@ impl SocketWorker {
// Enqueue local responses
for _ in 0..sq_space {
if let Some((response, addr)) = self.local_responses.pop_front() {
match self.send_buffers.prepare_entry(&response, addr) {
match self.send_buffers.prepare_entry(response.into(), addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers) => {
self.local_responses.push_front((response, addr));
Err(send_buffers::Error::NoBuffers(response)) => {
self.local_responses
.push_front((response.into_owned(), addr));
break;
}
@ -232,24 +233,46 @@ impl SocketWorker {
// Enqueue swarm worker responses
for _ in 0..(sq_space - num_send_added) {
if let Some((response, addr)) = self.get_next_swarm_response() {
match self.send_buffers.prepare_entry(&response, addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers) => {
self.local_responses.push_back((response, addr));
break;
}
Err(send_buffers::Error::SerializationFailed(err)) => {
::log::error!("Failed serializing response: {:#}", err);
}
}
let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() {
recv_ref
} else {
break;
};
let response = match recv_ref.kind {
ConnectedResponseKind::AnnounceIpv4 => {
CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4))
}
ConnectedResponseKind::AnnounceIpv6 => {
CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6))
}
ConnectedResponseKind::Scrape => {
if let Some(response) = self
.pending_scrape_responses
.add_and_get_finished(&recv_ref.scrape)
{
CowResponse::Scrape(Cow::Owned(response))
} else {
continue;
}
}
};
match self.send_buffers.prepare_entry(response, recv_ref.addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers(response)) => {
self.local_responses
.push_back((response.into_owned(), recv_ref.addr));
break;
}
Err(send_buffers::Error::SerializationFailed(err)) => {
::log::error!("Failed serializing response: {:#}", err);
}
}
}
@ -283,12 +306,6 @@ impl SocketWorker {
self.config.cleaning.max_pending_scrape_age,
);
::log::info!(
"pending responses: {} local, {} swarm",
self.local_responses.len(),
self.response_receiver.len()
);
self.resubmittable_sqe_buf
.push(self.pulse_timeout_sqe.clone());
}
@ -464,11 +481,13 @@ impl SocketWorker {
let worker_index =
SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash);
self.request_sender.try_send_to(
if let Err(_) = self.request_sender.try_send_to(
worker_index,
ConnectedRequest::Announce(request),
src,
);
) {
::log::warn!("request sender full, dropping request");
}
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
@ -491,39 +510,18 @@ impl SocketWorker {
);
for (swarm_worker_index, request) in split_requests {
self.request_sender.try_send_to(
if let Err(_) = self.request_sender.try_send_to(
swarm_worker_index,
ConnectedRequest::Scrape(request),
src,
);
) {
::log::warn!("request sender full, dropping request");
}
}
}
}
}
}
fn get_next_swarm_response(&mut self) -> Option<(Response, CanonicalSocketAddr)> {
loop {
match self.response_receiver.try_recv() {
Ok((ConnectedResponse::AnnounceIpv4(response), addr)) => {
return Some((Response::AnnounceIpv4(response), addr));
}
Ok((ConnectedResponse::AnnounceIpv6(response), addr)) => {
return Some((Response::AnnounceIpv6(response), addr));
}
Ok((ConnectedResponse::Scrape(response), addr)) => {
if let Some(response) =
self.pending_scrape_responses.add_and_get_finished(response)
{
return Some((Response::Scrape(response), addr));
}
}
Err(_) => {
return None;
}
}
}
}
}
pub fn supported_on_current_kernel() -> anyhow::Result<()> {