udp: remove thingbuf in favor of crossbeam channel

thingbuf didn't have obvious performance advantages and is a lot less
mature. Furthermore, it doesn't support anything like crossbeam
Receiver::try_iter, which is prefereable now that announce responses
can be sent to any socket worker.
This commit is contained in:
Joakim Frostegård 2024-01-20 09:41:07 +01:00
parent e77c9f46e7
commit 1a6b4345d4
10 changed files with 163 additions and 329 deletions

View file

@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::io::{Cursor, ErrorKind};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
@ -42,7 +41,7 @@ pub struct SocketWorker {
server_start_instant: ServerStartInstant,
pending_scrape_responses: PendingScrapeResponseSlab,
socket: UdpSocket,
opt_resend_buffer: Option<Vec<(Response, CanonicalSocketAddr)>>,
opt_resend_buffer: Option<Vec<(CanonicalSocketAddr, Response)>>,
buffer: [u8; BUFFER_SIZE],
polling_mode: PollMode,
/// Storage for requests that couldn't be sent to swarm worker because channel was full
@ -133,14 +132,14 @@ impl SocketWorker {
// If resend buffer is enabled, send any responses in it
if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() {
for (response, addr) in resend_buffer.drain(..) {
for (addr, response) in resend_buffer.drain(..) {
Self::send_response(
&self.config,
&self.shared_state,
&mut self.socket,
&mut self.buffer,
&mut None,
response.into(),
response,
addr,
);
}
@ -235,7 +234,7 @@ impl SocketWorker {
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
CowResponse::Error(Cow::Owned(response)),
Response::Error(response),
src,
);
}
@ -310,7 +309,7 @@ impl SocketWorker {
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
CowResponse::Connect(Cow::Owned(response)),
Response::Connect(response),
src,
);
@ -346,7 +345,7 @@ impl SocketWorker {
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
CowResponse::Error(Cow::Owned(response)),
Response::Error(response),
src,
);
@ -392,30 +391,20 @@ impl SocketWorker {
}
fn handle_swarm_worker_responses(&mut self) {
loop {
let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() {
recv_ref
} else {
break;
};
let response = match recv_ref.kind {
ConnectedResponseKind::Scrape => {
for (addr, response) in self.response_receiver.try_iter() {
let response = match response {
ConnectedResponse::Scrape(response) => {
if let Some(r) = self
.pending_scrape_responses
.add_and_get_finished(&recv_ref.scrape)
.add_and_get_finished(&response)
{
CowResponse::Scrape(Cow::Owned(r))
Response::Scrape(r)
} else {
continue;
}
}
ConnectedResponseKind::AnnounceIpv4 => {
CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4))
}
ConnectedResponseKind::AnnounceIpv6 => {
CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6))
}
ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r),
ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r),
};
Self::send_response(
@ -425,7 +414,7 @@ impl SocketWorker {
&mut self.buffer,
&mut self.opt_resend_buffer,
response,
recv_ref.addr,
addr,
);
}
}
@ -435,8 +424,8 @@ impl SocketWorker {
shared_state: &State,
socket: &mut UdpSocket,
buffer: &mut [u8],
opt_resend_buffer: &mut Option<Vec<(Response, CanonicalSocketAddr)>>,
response: CowResponse,
opt_resend_buffer: &mut Option<Vec<(CanonicalSocketAddr, Response)>>,
response: Response,
canonical_addr: CanonicalSocketAddr,
) {
let mut buffer = Cursor::new(&mut buffer[..]);
@ -478,18 +467,18 @@ impl SocketWorker {
};
match response {
CowResponse::Connect(_) => {
Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
}
CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => {
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
}
CowResponse::Scrape(_) => {
Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
}
CowResponse::Error(_) => {
Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
}
}
@ -503,7 +492,7 @@ impl SocketWorker {
if resend_buffer.len() < config.network.resend_buffer_max_len {
::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err);
resend_buffer.push((response.into_owned(), canonical_addr));
resend_buffer.push((canonical_addr, response));
} else {
::log::warn!("Response resend buffer full, dropping response");
}