diff --git a/CHANGELOG.md b/CHANGELOG.md index 49183b7..1d06801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,6 @@ * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed -* Reuse allocations in swarm response channel * Remove config key `network.poll_event_capacity` * Harden ConnectionValidator to make IP spoofing even more costly * Distribute announce responses from swarm workers over socket workers to diff --git a/Cargo.lock b/Cargo.lock index 184c491..e3a0a4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,7 +320,6 @@ dependencies = [ "slab", "socket2 0.5.5", "tempfile", - "thingbuf", "time", "tinytemplate", ] @@ -2079,29 +2078,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.48.5", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -2849,16 +2825,6 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" -[[package]] -name = "thingbuf" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4706f1bfb859af03f099ada2de3cea3e515843c2d3e93b7893f16d94a37f9415" -dependencies = [ - "parking_lot", - "pin-project", -] - [[package]] name = "thiserror" version = "1.0.56" diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 599fa39..977ae6a 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -52,7 +52,6 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" socket2 = { version = "0.5", features = ["all"] } -thingbuf = "0.1" time = { version = "0.3", features = ["formatting"] } tinytemplate = "1" diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 6504f06..dbfc868 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -1,68 +1,19 @@ -use std::borrow::Cow; use std::collections::BTreeMap; use std::hash::Hash; -use std::io::Write; -use std::mem::size_of; -use std::net::{SocketAddr, SocketAddrV4}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use crossbeam_channel::{Sender, TrySendError}; +use crossbeam_channel::{Receiver, SendError, Sender, TrySendError}; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::CanonicalSocketAddr; use aquatic_udp_protocol::*; use hdrhistogram::Histogram; -use thingbuf::mpsc::blocking::SendRef; use crate::config::Config; pub const BUFFER_SIZE: usize = 8192; -#[derive(PartialEq, Eq, Clone, Debug)] -pub enum CowResponse<'a> { - Connect(Cow<'a, ConnectResponse>), - AnnounceIpv4(Cow<'a, AnnounceResponse>), - AnnounceIpv6(Cow<'a, AnnounceResponse>), - Scrape(Cow<'a, ScrapeResponse>), - Error(Cow<'a, ErrorResponse>), -} - -impl From for CowResponse<'_> { - fn from(value: Response) -> Self { - match value { - Response::AnnounceIpv4(r) => Self::AnnounceIpv4(Cow::Owned(r)), - Response::AnnounceIpv6(r) => Self::AnnounceIpv6(Cow::Owned(r)), - Response::Connect(r) => Self::Connect(Cow::Owned(r)), - Response::Scrape(r) => Self::Scrape(Cow::Owned(r)), - Response::Error(r) => Self::Error(Cow::Owned(r)), - } - } -} - -impl<'a> CowResponse<'a> { - pub fn into_owned(self) -> Response { - match self { - CowResponse::Connect(r) => Response::Connect(r.into_owned()), - CowResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r.into_owned()), - CowResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r.into_owned()), - CowResponse::Scrape(r) => Response::Scrape(r.into_owned()), - CowResponse::Error(r) => Response::Error(r.into_owned()), - } - } - - #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), ::std::io::Error> { - match self { - Self::Connect(r) => r.write(bytes), - Self::AnnounceIpv4(r) => r.write(bytes), - Self::AnnounceIpv6(r) => r.write(bytes), - Self::Scrape(r) => r.write(bytes), - Self::Error(r) => r.write(bytes), - } - } -} - #[derive(Debug)] pub struct PendingScrapeRequest { pub slab_key: usize, @@ -88,52 +39,6 @@ pub enum ConnectedResponse { Scrape(PendingScrapeResponse), } -pub enum ConnectedResponseKind { - AnnounceIpv4, - AnnounceIpv6, - Scrape, -} - -pub struct ConnectedResponseWithAddr { - pub kind: ConnectedResponseKind, - pub announce_ipv4: AnnounceResponse, - pub announce_ipv6: AnnounceResponse, - pub scrape: PendingScrapeResponse, - pub addr: CanonicalSocketAddr, -} - -impl ConnectedResponseWithAddr { - pub fn estimated_max_size(config: &Config) -> usize { - size_of::() - + config.protocol.max_response_peers - * (size_of::>() - + size_of::>()) - } -} - -pub struct Recycler; - -impl thingbuf::Recycle for Recycler { - fn new_element(&self) -> ConnectedResponseWithAddr { - ConnectedResponseWithAddr { - kind: ConnectedResponseKind::AnnounceIpv4, - announce_ipv4: AnnounceResponse::empty(), - announce_ipv6: AnnounceResponse::empty(), - scrape: PendingScrapeResponse { - slab_key: 0, - torrent_stats: Default::default(), - }, - addr: CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(0.into(), 0))), - } - } - fn recycle(&self, element: &mut ConnectedResponseWithAddr) { - element.announce_ipv4.peers.clear(); - element.announce_ipv6.peers.clear(); - element.scrape.torrent_stats.clear(); - element.addr = CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(0.into(), 0))); - } -} - #[derive(Clone, Copy, Debug)] pub struct SocketWorkerIndex(pub usize); @@ -180,54 +85,73 @@ impl ConnectedRequestSender { } pub struct ConnectedResponseSender { - senders: Vec>, + senders: Vec>, to_any_last_index_picked: usize, } impl ConnectedResponseSender { - pub fn new( - senders: Vec>, - ) -> Self { + pub fn new(senders: Vec>) -> Self { Self { senders, to_any_last_index_picked: 0, } } - pub fn try_send_ref_to( + pub fn try_send_to( &self, index: SocketWorkerIndex, - ) -> Result, thingbuf::mpsc::errors::TrySendError> { - self.senders[index.0].try_send_ref() + addr: CanonicalSocketAddr, + response: ConnectedResponse, + ) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> { + self.senders[index.0].try_send((addr, response)) } - pub fn send_ref_to( + pub fn send_to( &self, index: SocketWorkerIndex, - ) -> Result, thingbuf::mpsc::errors::Closed> { - self.senders[index.0].send_ref() + addr: CanonicalSocketAddr, + response: ConnectedResponse, + ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { + self.senders[index.0].send((addr, response)) } - pub fn send_ref_to_any( + pub fn send_to_any( &mut self, - ) -> Result, thingbuf::mpsc::errors::Closed> { + addr: CanonicalSocketAddr, + response: ConnectedResponse, + ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { let start = self.to_any_last_index_picked + 1; - for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { - if let Ok(sender) = self.senders[i].try_send_ref() { - self.to_any_last_index_picked = i; + let mut message = Some((addr, response)); - return Ok(sender); + for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { + match self.senders[i].try_send(message.take().unwrap()) { + Ok(()) => { + self.to_any_last_index_picked = i; + + return Ok(()); + } + Err(TrySendError::Full(msg)) => { + message = Some(msg); + } + Err(TrySendError::Disconnected(_)) => { + panic!("ConnectedResponseReceiver disconnected"); + } } } + let (addr, response) = message.unwrap(); + self.to_any_last_index_picked = start % self.senders.len(); - self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked)) + self.send_to( + SocketWorkerIndex(self.to_any_last_index_picked), + addr, + response, + ) } } -pub type ConnectedResponseReceiver = - thingbuf::mpsc::blocking::Receiver; +pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index c9871ba..1a05df5 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,7 +3,6 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::mem::size_of; use std::thread::Builder; use std::time::Duration; @@ -16,28 +15,20 @@ use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::{CanonicalSocketAddr, PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use common::{ - ConnectedRequestSender, ConnectedResponseSender, Recycler, SocketWorkerIndex, State, - SwarmWorkerIndex, + ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, }; use config::Config; use workers::socket::ConnectionValidator; -use crate::common::{ConnectedRequest, ConnectedResponseWithAddr}; - pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - ::log::info!( - "Estimated max channel memory use: {:.02} MB", - est_max_total_channel_memory(&config) - ); - let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); @@ -56,19 +47,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let server_start_instant = ServerStartInstant::new(); for i in 0..config.swarm_workers { - let (request_sender, request_receiver) = if config.worker_channel_size == 0 { - unbounded() - } else { - bounded(config.worker_channel_size) - }; + let (request_sender, request_receiver) = bounded(config.worker_channel_size); request_senders.push(request_sender); request_receivers.insert(i, request_receiver); } for i in 0..config.socket_workers { - let (response_sender, response_receiver) = - thingbuf::mpsc::blocking::with_recycle(config.worker_channel_size, Recycler); + let (response_sender, response_receiver) = bounded(config.worker_channel_size); response_senders.push(response_sender); response_receivers.insert(i, response_receiver); @@ -214,16 +200,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } - -fn est_max_total_channel_memory(config: &Config) -> f64 { - let request_channel_max_size = config.swarm_workers - * config.worker_channel_size - * (size_of::() - + size_of::() - + size_of::()); - let response_channel_max_size = config.socket_workers - * config.worker_channel_size - * ConnectedResponseWithAddr::estimated_max_size(&config); - - (request_channel_max_size as u64 + response_channel_max_size as u64) as f64 / (1024.0 * 1024.0) -} diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 64daf5f..eddef10 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; @@ -42,7 +41,7 @@ pub struct SocketWorker { server_start_instant: ServerStartInstant, pending_scrape_responses: PendingScrapeResponseSlab, socket: UdpSocket, - opt_resend_buffer: Option>, + opt_resend_buffer: Option>, buffer: [u8; BUFFER_SIZE], polling_mode: PollMode, /// Storage for requests that couldn't be sent to swarm worker because channel was full @@ -133,14 +132,14 @@ impl SocketWorker { // If resend buffer is enabled, send any responses in it if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() { - for (response, addr) in resend_buffer.drain(..) { + for (addr, response) in resend_buffer.drain(..) { Self::send_response( &self.config, &self.shared_state, &mut self.socket, &mut self.buffer, &mut None, - response.into(), + response, addr, ); } @@ -235,7 +234,7 @@ impl SocketWorker { &mut self.socket, &mut self.buffer, &mut self.opt_resend_buffer, - CowResponse::Error(Cow::Owned(response)), + Response::Error(response), src, ); } @@ -310,7 +309,7 @@ impl SocketWorker { &mut self.socket, &mut self.buffer, &mut self.opt_resend_buffer, - CowResponse::Connect(Cow::Owned(response)), + Response::Connect(response), src, ); @@ -346,7 +345,7 @@ impl SocketWorker { &mut self.socket, &mut self.buffer, &mut self.opt_resend_buffer, - CowResponse::Error(Cow::Owned(response)), + Response::Error(response), src, ); @@ -392,30 +391,20 @@ impl SocketWorker { } fn handle_swarm_worker_responses(&mut self) { - loop { - let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() { - recv_ref - } else { - break; - }; - - let response = match recv_ref.kind { - ConnectedResponseKind::Scrape => { + for (addr, response) in self.response_receiver.try_iter() { + let response = match response { + ConnectedResponse::Scrape(response) => { if let Some(r) = self .pending_scrape_responses - .add_and_get_finished(&recv_ref.scrape) + .add_and_get_finished(&response) { - CowResponse::Scrape(Cow::Owned(r)) + Response::Scrape(r) } else { continue; } } - ConnectedResponseKind::AnnounceIpv4 => { - CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4)) - } - ConnectedResponseKind::AnnounceIpv6 => { - CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6)) - } + ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), + ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), }; Self::send_response( @@ -425,7 +414,7 @@ impl SocketWorker { &mut self.buffer, &mut self.opt_resend_buffer, response, - recv_ref.addr, + addr, ); } } @@ -435,8 +424,8 @@ impl SocketWorker { shared_state: &State, socket: &mut UdpSocket, buffer: &mut [u8], - opt_resend_buffer: &mut Option>, - response: CowResponse, + opt_resend_buffer: &mut Option>, + response: Response, canonical_addr: CanonicalSocketAddr, ) { let mut buffer = Cursor::new(&mut buffer[..]); @@ -478,18 +467,18 @@ impl SocketWorker { }; match response { - CowResponse::Connect(_) => { + Response::Connect(_) => { stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); } - CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => { + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { stats .responses_sent_announce .fetch_add(1, Ordering::Relaxed); } - CowResponse::Scrape(_) => { + Response::Scrape(_) => { stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); } - CowResponse::Error(_) => { + Response::Error(_) => { stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); } } @@ -503,7 +492,7 @@ impl SocketWorker { if resend_buffer.len() < config.network.resend_buffer_max_len { ::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); - resend_buffer.push((response.into_owned(), canonical_addr)); + resend_buffer.push((canonical_addr, response)); } else { ::log::warn!("Response resend buffer full, dropping response"); } diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index 4ddf875..d41aecb 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -2,7 +2,6 @@ mod buf_ring; mod recv_helper; mod send_buffers; -use std::borrow::Cow; use std::cell::RefCell; use std::collections::VecDeque; use std::net::UdpSocket; @@ -217,8 +216,7 @@ impl SocketWorker { num_send_added += 1; } Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses - .push_front((response.into_owned(), addr)); + self.local_responses.push_front((response, addr)); break; } @@ -233,40 +231,32 @@ impl SocketWorker { // Enqueue swarm worker responses for _ in 0..(sq_space - num_send_added) { - let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() { - recv_ref + let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() { + r } else { break; }; - let response = match recv_ref.kind { - ConnectedResponseKind::AnnounceIpv4 => { - CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4)) - } - ConnectedResponseKind::AnnounceIpv6 => { - CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6)) - } - ConnectedResponseKind::Scrape => { - if let Some(response) = self - .pending_scrape_responses - .add_and_get_finished(&recv_ref.scrape) - { - CowResponse::Scrape(Cow::Owned(response)) + let response = match response { + ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), + ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), + ConnectedResponse::Scrape(r) => { + if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) { + Response::Scrape(r) } else { continue; } } }; - match self.send_buffers.prepare_entry(response, recv_ref.addr) { + match self.send_buffers.prepare_entry(response, addr) { Ok(entry) => { unsafe { ring.submission().push(&entry).unwrap() }; num_send_added += 1; } Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses - .push_back((response.into_owned(), recv_ref.addr)); + self.local_responses.push_back((response, addr)); break; } diff --git a/crates/udp/src/workers/socket/uring/send_buffers.rs b/crates/udp/src/workers/socket/uring/send_buffers.rs index b62ca90..dec4843 100644 --- a/crates/udp/src/workers/socket/uring/send_buffers.rs +++ b/crates/udp/src/workers/socket/uring/send_buffers.rs @@ -6,14 +6,15 @@ use std::{ }; use aquatic_common::CanonicalSocketAddr; +use aquatic_udp_protocol::Response; use io_uring::opcode::SendMsg; -use crate::{common::CowResponse, config::Config}; +use crate::config::Config; use super::{RESPONSE_BUF_LEN, SOCKET_IDENTIFIER}; -pub enum Error<'a> { - NoBuffers(CowResponse<'a>), +pub enum Error { + NoBuffers(Response), SerializationFailed(std::io::Error), } @@ -59,9 +60,9 @@ impl SendBuffers { pub fn prepare_entry<'a>( &mut self, - response: CowResponse<'a>, + response: Response, addr: CanonicalSocketAddr, - ) -> Result> { + ) -> Result { let index = if let Some(index) = self.next_free_index() { index } else { @@ -163,7 +164,7 @@ impl SendBuffer { fn prepare_entry( &mut self, - response: CowResponse, + response: Response, addr: CanonicalSocketAddr, socket_is_ipv4: bool, metadata: &mut SendBufferMetadata, @@ -237,12 +238,12 @@ pub enum ResponseType { } impl ResponseType { - fn from_response(response: &CowResponse) -> Self { + fn from_response(response: &Response) -> Self { match response { - CowResponse::Connect(_) => Self::Connect, - CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => Self::Announce, - CowResponse::Scrape(_) => Self::Scrape, - CowResponse::Error(_) => Self::Error, + Response::Connect(_) => Self::Connect, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => Self::Announce, + Response::Scrape(_) => Self::Scrape, + Response::Error(_) => Self::Error, } } } diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index ea551f3..9c7e00f 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -47,15 +47,7 @@ pub fn run_swarm_worker( // sends in socket workers (doing both could cause a deadlock) match (request, src.get().ip()) { (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - // It doesn't matter which socket worker receives announce responses - let mut send_ref = response_sender - .send_ref_to_any() - .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - - torrents + let response = torrents .ipv4 .0 .entry(request.info_hash) @@ -67,19 +59,15 @@ pub fn run_swarm_worker( &request, ip.into(), peer_valid_until, - &mut send_ref.announce_ipv4, ); + + // It doesn't matter which socket worker receives announce responses + response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) + .expect("swarm response channel is closed"); } (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - // It doesn't matter which socket worker receives announce responses - let mut send_ref = response_sender - .send_ref_to_any() - .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::AnnounceIpv6; - - torrents + let response = torrents .ipv6 .0 .entry(request.info_hash) @@ -91,28 +79,26 @@ pub fn run_swarm_worker( &request, ip.into(), peer_valid_until, - &mut send_ref.announce_ipv6, ); + + // It doesn't matter which socket worker receives announce responses + response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) + .expect("swarm response channel is closed"); } (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - let mut send_ref = response_sender - .send_ref_to(sender_index) + let response = torrents.ipv4.scrape(request); + + response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv4.scrape(request, &mut send_ref.scrape); } (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - let mut send_ref = response_sender - .send_ref_to(sender_index) + let response = torrents.ipv6.scrape(request); + + response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv6.scrape(request, &mut send_ref.scrape); } }; } diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 28984b1..36441b2 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -79,24 +79,29 @@ impl TorrentMaps { pub struct TorrentMap(pub IndexMap>); impl TorrentMap { - pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) { - response.slab_key = request.slab_key; + pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse { + let torrent_stats = request + .info_hashes + .into_iter() + .map(|(i, info_hash)| { + let stats = self + .0 + .get(&info_hash) + .map(|torrent_data| torrent_data.scrape_statistics()) + .unwrap_or_else(|| TorrentScrapeStatistics { + seeders: NumberOfPeers::new(0), + leechers: NumberOfPeers::new(0), + completed: NumberOfDownloads::new(0), + }); - let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| { - let stats = self - .0 - .get(&info_hash) - .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| TorrentScrapeStatistics { - seeders: NumberOfPeers::new(0), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), - }); + (i, stats) + }) + .collect(); - (i, stats) - }); - - response.torrent_stats.extend(torrent_stats); + PendingScrapeResponse { + slab_key: request.slab_key, + torrent_stats, + } } /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers fn clean_and_get_statistics( @@ -187,8 +192,7 @@ impl TorrentData { request: &AnnounceRequest, ip_address: I, valid_until: ValidUntil, - response: &mut AnnounceResponse, - ) { + ) -> AnnounceResponse { let max_num_peers_to_take: usize = if request.peers_wanted.0.get() <= 0 { config.protocol.max_response_peers } else { @@ -209,23 +213,24 @@ impl TorrentData { // Create the response before inserting the peer. This means that we // don't have to filter it out from the response peers, and that the // reported number of seeders/leechers will not include it - let opt_removed_peer = match self { + let (response, opt_removed_peer) = match self { Self::Small(peer_map) => { let opt_removed_peer = peer_map.remove(&peer_map_key); let (seeders, leechers) = peer_map.num_seeders_leechers(); - response.fixed = AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, - ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + let response = AnnounceResponse { + fixed: AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }, + peers: peer_map.extract_response_peers(max_num_peers_to_take), }; - peer_map.extract_response_peers(max_num_peers_to_take, &mut response.peers); - // Convert peer map to large variant if it is full and // announcing peer is not stopped and will therefore be // inserted @@ -233,24 +238,25 @@ impl TorrentData { *self = Self::Large(peer_map.to_large()); } - opt_removed_peer + (response, opt_removed_peer) } Self::Large(peer_map) => { let opt_removed_peer = peer_map.remove_peer(&peer_map_key); let (seeders, leechers) = peer_map.num_seeders_leechers(); - response.fixed = AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, - ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + let response = AnnounceResponse { + fixed: AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }, + peers: peer_map.extract_response_peers(rng, max_num_peers_to_take), }; - peer_map.extract_response_peers(rng, max_num_peers_to_take, &mut response.peers); - // Try shrinking the map if announcing peer is stopped and // will therefore not be inserted if status == PeerStatus::Stopped { @@ -259,7 +265,7 @@ impl TorrentData { } } - opt_removed_peer + (response, opt_removed_peer) } }; @@ -290,6 +296,8 @@ impl TorrentData { } } }; + + response } pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { @@ -313,7 +321,7 @@ impl Default for TorrentData { } /// Store torrents with up to two peers without an extra heap allocation -/// +/// /// On public open trackers, this is likely to be the majority of torrents. #[derive(Default, Debug)] pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); @@ -344,12 +352,8 @@ impl SmallPeerMap { None } - fn extract_response_peers( - &self, - max_num_peers_to_take: usize, - peers: &mut Vec>, - ) { - peers.extend(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| k)) + fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec> { + Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k)) } fn clean_and_get_num_peers( @@ -427,10 +431,9 @@ impl LargePeerMap { &self, rng: &mut impl Rng, max_num_peers_to_take: usize, - peers: &mut Vec>, - ) { + ) -> Vec> { if self.peers.len() <= max_num_peers_to_take { - peers.extend(self.peers.keys()); + self.peers.keys().copied().collect() } else { let middle_index = self.peers.len() / 2; let num_to_take_per_half = max_num_peers_to_take / 2; @@ -451,12 +454,16 @@ impl LargePeerMap { let end_half_one = offset_half_one + num_to_take_per_half; let end_half_two = offset_half_two + num_to_take_per_half; + let mut peers = Vec::with_capacity(max_num_peers_to_take); + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { peers.extend(slice.keys()); } if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { peers.extend(slice.keys()); } + + peers } }