From e77c9f46e7b2f2d28117e0565a4113b22c3ba8bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:40:09 +0100 Subject: [PATCH 01/13] udp: store torrents with few peers without an extra heap alloc --- CHANGELOG.md | 2 + Cargo.lock | 1 + crates/udp/Cargo.toml | 1 + crates/udp/src/workers/swarm/storage.rs | 403 ++++++++++++++++-------- 4 files changed, 269 insertions(+), 138 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 09708fc..49183b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ * Index peers by packet source IP and provided port, instead of by peer_id. This prevents users from impersonating others and is likely also slightly faster for IPv4 peers. +* Store torrents with up to two peers without an extra heap allocation for the + peers. * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed diff --git a/Cargo.lock b/Cargo.lock index 3968b40..184c491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,7 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", + "arrayvec", "blake3", "cfg-if", "compact_str", diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 4e9eac3..599fa39 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -32,6 +32,7 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" +arrayvec = "0.7" blake3 = "1" cfg-if = "1" compact_str = "0.7" diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 4feb80d..28984b1 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -10,6 +10,7 @@ use aquatic_common::{ }; use aquatic_udp_protocol::*; +use arrayvec::ArrayVec; use crossbeam_channel::Sender; use hdrhistogram::Histogram; use rand::prelude::SmallRng; @@ -18,6 +19,8 @@ use rand::Rng; use crate::common::*; use crate::config::Config; +const SMALL_PEER_MAP_CAPACITY: usize = 2; + pub struct TorrentMaps { pub ipv4: TorrentMap, pub ipv6: TorrentMap, @@ -84,7 +87,11 @@ impl TorrentMap { .0 .get(&info_hash) .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| create_torrent_scrape_statistics(0, 0)); + .unwrap_or_else(|| TorrentScrapeStatistics { + seeders: NumberOfPeers::new(0), + leechers: NumberOfPeers::new(0), + completed: NumberOfDownloads::new(0), + }); (i, stats) }); @@ -100,7 +107,7 @@ impl TorrentMap { access_list_mode: AccessListMode, now: SecondsSinceServerStart, ) -> (usize, Option>) { - let mut num_peers = 0; + let mut total_num_peers = 0; let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms { @@ -124,17 +131,27 @@ impl TorrentMap { return false; } - torrent.clean(config, statistics_sender, now); + let num_peers = match torrent { + TorrentData::Small(peer_map) => { + peer_map.clean_and_get_num_peers(config, statistics_sender, now) + } + TorrentData::Large(peer_map) => { + let num_peers = + peer_map.clean_and_get_num_peers(config, statistics_sender, now); - num_peers += torrent.peers.len(); + if let Some(peer_map) = peer_map.try_shrink() { + *torrent = TorrentData::Small(peer_map); + } + + num_peers + } + }; + + total_num_peers += num_peers; match opt_histogram { - Some(ref mut histogram) if torrent.peers.len() != 0 => { - let n = torrent - .peers - .len() - .try_into() - .expect("Couldn't fit usize into u64"); + Some(ref mut histogram) if num_peers > 0 => { + let n = num_peers.try_into().expect("Couldn't fit usize into u64"); if let Err(err) = histogram.record(n) { ::log::error!("Couldn't record {} to histogram: {:#}", n, err); @@ -143,12 +160,12 @@ impl TorrentMap { _ => (), } - !torrent.peers.is_empty() + num_peers > 0 }); self.0.shrink_to_fit(); - (num_peers, opt_histogram) + (total_num_peers, opt_histogram) } pub fn num_torrents(&self) -> usize { @@ -156,9 +173,9 @@ impl TorrentMap { } } -pub struct TorrentData { - peers: IndexMap, Peer>, - num_seeders: usize, +pub enum TorrentData { + Small(SmallPeerMap), + Large(LargePeerMap), } impl TorrentData { @@ -189,60 +206,75 @@ impl TorrentData { port: request.port, }; - let opt_removed_peer = self.peers.remove(&peer_map_key); - - if let Some(Peer { - is_seeder: true, .. - }) = opt_removed_peer - { - self.num_seeders -= 1; - } - // Create the response before inserting the peer. This means that we // don't have to filter it out from the response peers, and that the // reported number of seeders/leechers will not include it + let opt_removed_peer = match self { + Self::Small(peer_map) => { + let opt_removed_peer = peer_map.remove(&peer_map_key); - response.fixed = AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new(config.protocol.peer_announce_interval), - leechers: NumberOfPeers::new(self.num_leechers().try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(self.num_seeders().try_into().unwrap_or(i32::MAX)), + let (seeders, leechers) = peer_map.num_seeders_leechers(); + + response.fixed = AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }; + + peer_map.extract_response_peers(max_num_peers_to_take, &mut response.peers); + + // Convert peer map to large variant if it is full and + // announcing peer is not stopped and will therefore be + // inserted + if peer_map.is_full() && status != PeerStatus::Stopped { + *self = Self::Large(peer_map.to_large()); + } + + opt_removed_peer + } + Self::Large(peer_map) => { + let opt_removed_peer = peer_map.remove_peer(&peer_map_key); + + let (seeders, leechers) = peer_map.num_seeders_leechers(); + + response.fixed = AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }; + + peer_map.extract_response_peers(rng, max_num_peers_to_take, &mut response.peers); + + // Try shrinking the map if announcing peer is stopped and + // will therefore not be inserted + if status == PeerStatus::Stopped { + if let Some(peer_map) = peer_map.try_shrink() { + *self = Self::Small(peer_map); + } + } + + opt_removed_peer + } }; - extract_response_peers( - rng, - &self.peers, - max_num_peers_to_take, - |k, _| *k, - &mut response.peers, - ); - match status { - PeerStatus::Leeching => { + PeerStatus::Leeching | PeerStatus::Seeding => { let peer = Peer { peer_id: request.peer_id, - is_seeder: false, + is_seeder: status == PeerStatus::Seeding, valid_until, }; - self.peers.insert(peer_map_key, peer); - - if config.statistics.peer_clients && opt_removed_peer.is_none() { - statistics_sender - .try_send(StatisticsMessage::PeerAdded(request.peer_id)) - .expect("statistics channel should be unbounded"); + match self { + Self::Small(peer_map) => peer_map.insert(peer_map_key, peer), + Self::Large(peer_map) => peer_map.insert(peer_map_key, peer), } - } - PeerStatus::Seeding => { - let peer = Peer { - peer_id: request.peer_id, - is_seeder: true, - valid_until, - }; - - self.peers.insert(peer_map_key, peer); - - self.num_seeders += 1; if config.statistics.peer_clients && opt_removed_peer.is_none() { statistics_sender @@ -260,28 +292,180 @@ impl TorrentData { }; } - pub fn num_leechers(&self) -> usize { - self.peers.len() - self.num_seeders - } - - pub fn num_seeders(&self) -> usize { - self.num_seeders - } - pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { - create_torrent_scrape_statistics( - self.num_seeders.try_into().unwrap_or(i32::MAX), - self.num_leechers().try_into().unwrap_or(i32::MAX), - ) + let (seeders, leechers) = match self { + Self::Small(peer_map) => peer_map.num_seeders_leechers(), + Self::Large(peer_map) => peer_map.num_seeders_leechers(), + }; + + TorrentScrapeStatistics { + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + completed: NumberOfDownloads::new(0), + } + } +} + +impl Default for TorrentData { + fn default() -> Self { + Self::Small(SmallPeerMap(ArrayVec::default())) + } +} + +/// Store torrents with up to two peers without an extra heap allocation +/// +/// On public open trackers, this is likely to be the majority of torrents. +#[derive(Default, Debug)] +pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); + +impl SmallPeerMap { + fn is_full(&self) -> bool { + self.0.is_full() } - /// Remove inactive peers and reclaim space - fn clean( + fn num_seeders_leechers(&self) -> (usize, usize) { + let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count(); + let leechers = self.0.len() - seeders; + + (seeders, leechers) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + self.0.push((key, peer)); + } + + fn remove(&mut self, key: &ResponsePeer) -> Option { + for (i, (k, _)) in self.0.iter().enumerate() { + if k == key { + return Some(self.0.remove(i).1); + } + } + + None + } + + fn extract_response_peers( + &self, + max_num_peers_to_take: usize, + peers: &mut Vec>, + ) { + peers.extend(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| k)) + } + + fn clean_and_get_num_peers( &mut self, config: &Config, statistics_sender: &Sender, now: SecondsSinceServerStart, + ) -> usize { + self.0.retain(|(_, peer)| { + let keep = peer.valid_until.valid(now); + + if !keep && config.statistics.peer_clients { + if let Err(_) = + statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + } + } + + keep + }); + + self.0.len() + } + + fn to_large(&self) -> LargePeerMap { + let (num_seeders, _) = self.num_seeders_leechers(); + let peers = self.0.iter().copied().collect(); + + LargePeerMap { peers, num_seeders } + } +} + +#[derive(Default)] +pub struct LargePeerMap { + peers: IndexMap, Peer>, + num_seeders: usize, +} + +impl LargePeerMap { + fn num_seeders_leechers(&self) -> (usize, usize) { + (self.num_seeders, self.peers.len() - self.num_seeders) + } + + fn insert(&mut self, key: ResponsePeer, peer: Peer) { + if peer.is_seeder { + self.num_seeders += 1; + } + + self.peers.insert(key, peer); + } + + fn remove_peer(&mut self, key: &ResponsePeer) -> Option { + let opt_removed_peer = self.peers.remove(key); + + if let Some(Peer { + is_seeder: true, .. + }) = opt_removed_peer + { + self.num_seeders -= 1; + } + + opt_removed_peer + } + + /// Extract response peers + /// + /// If there are more peers in map than `max_num_peers_to_take`, do a random + /// selection of peers from first and second halves of map in order to avoid + /// returning too homogeneous peers. + /// + /// Does NOT filter out announcing peer. + pub fn extract_response_peers( + &self, + rng: &mut impl Rng, + max_num_peers_to_take: usize, + peers: &mut Vec>, ) { + if self.peers.len() <= max_num_peers_to_take { + peers.extend(self.peers.keys()); + } else { + let middle_index = self.peers.len() / 2; + let num_to_take_per_half = max_num_peers_to_take / 2; + + let offset_half_one = { + let from = 0; + let to = usize::max(1, middle_index - num_to_take_per_half); + + rng.gen_range(from..to) + }; + let offset_half_two = { + let from = middle_index; + let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half); + + rng.gen_range(from..to) + }; + + let end_half_one = offset_half_one + num_to_take_per_half; + let end_half_two = offset_half_two + num_to_take_per_half; + + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { + peers.extend(slice.keys()); + } + if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { + peers.extend(slice.keys()); + } + } + } + + fn clean_and_get_num_peers( + &mut self, + config: &Config, + statistics_sender: &Sender, + now: SecondsSinceServerStart, + ) -> usize { self.peers.retain(|_, peer| { let keep = peer.valid_until.valid(now); @@ -305,79 +489,22 @@ impl TorrentData { if !self.peers.is_empty() { self.peers.shrink_to_fit(); } + + self.peers.len() + } + + fn try_shrink(&mut self) -> Option> { + (self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| { + SmallPeerMap(ArrayVec::from_iter( + self.peers.iter().map(|(k, v)| (*k, *v)), + )) + }) } } -impl Default for TorrentData { - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - } - } -} - -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] struct Peer { peer_id: PeerId, is_seeder: bool, valid_until: ValidUntil, } - -/// Extract response peers -/// -/// If there are more peers in map than `max_num_peers_to_take`, do a random -/// selection of peers from first and second halves of map in order to avoid -/// returning too homogeneous peers. -/// -/// Does NOT filter out announcing peer. -#[inline] -pub fn extract_response_peers( - rng: &mut impl Rng, - peer_map: &IndexMap, - max_num_peers_to_take: usize, - peer_conversion_function: F, - peers: &mut Vec, -) where - K: Eq + ::std::hash::Hash, - F: Fn(&K, &V) -> R, -{ - if peer_map.len() <= max_num_peers_to_take { - peers.extend(peer_map.iter().map(|(k, v)| peer_conversion_function(k, v))); - } else { - let middle_index = peer_map.len() / 2; - let num_to_take_per_half = max_num_peers_to_take / 2; - - let offset_half_one = { - let from = 0; - let to = usize::max(1, middle_index - num_to_take_per_half); - - rng.gen_range(from..to) - }; - let offset_half_two = { - let from = middle_index; - let to = usize::max(middle_index + 1, peer_map.len() - num_to_take_per_half); - - rng.gen_range(from..to) - }; - - let end_half_one = offset_half_one + num_to_take_per_half; - let end_half_two = offset_half_two + num_to_take_per_half; - - if let Some(slice) = peer_map.get_range(offset_half_one..end_half_one) { - peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v))); - } - if let Some(slice) = peer_map.get_range(offset_half_two..end_half_two) { - peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v))); - } - } -} - -#[inline(always)] -fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers::new(seeders), - completed: NumberOfDownloads::new(0), // No implementation planned - leechers: NumberOfPeers::new(leechers), - } -} From 1a6b4345d44104b5f9bd0edadd6f42cf05e5bc86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:41:07 +0100 Subject: [PATCH 02/13] udp: remove thingbuf in favor of crossbeam channel thingbuf didn't have obvious performance advantages and is a lot less mature. Furthermore, it doesn't support anything like crossbeam Receiver::try_iter, which is prefereable now that announce responses can be sent to any socket worker. --- CHANGELOG.md | 1 - Cargo.lock | 34 ---- crates/udp/Cargo.toml | 1 - crates/udp/src/common.rs | 154 +++++------------- crates/udp/src/lib.rs | 35 +--- crates/udp/src/workers/socket/mio.rs | 53 +++--- crates/udp/src/workers/socket/uring/mod.rs | 32 ++-- .../src/workers/socket/uring/send_buffers.rs | 23 +-- crates/udp/src/workers/swarm/mod.rs | 54 +++--- crates/udp/src/workers/swarm/storage.rs | 105 ++++++------ 10 files changed, 163 insertions(+), 329 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49183b7..1d06801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,6 @@ * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed -* Reuse allocations in swarm response channel * Remove config key `network.poll_event_capacity` * Harden ConnectionValidator to make IP spoofing even more costly * Distribute announce responses from swarm workers over socket workers to diff --git a/Cargo.lock b/Cargo.lock index 184c491..e3a0a4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,7 +320,6 @@ dependencies = [ "slab", "socket2 0.5.5", "tempfile", - "thingbuf", "time", "tinytemplate", ] @@ -2079,29 +2078,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.48.5", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -2849,16 +2825,6 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" -[[package]] -name = "thingbuf" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4706f1bfb859af03f099ada2de3cea3e515843c2d3e93b7893f16d94a37f9415" -dependencies = [ - "parking_lot", - "pin-project", -] - [[package]] name = "thiserror" version = "1.0.56" diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 599fa39..977ae6a 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -52,7 +52,6 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" socket2 = { version = "0.5", features = ["all"] } -thingbuf = "0.1" time = { version = "0.3", features = ["formatting"] } tinytemplate = "1" diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 6504f06..dbfc868 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -1,68 +1,19 @@ -use std::borrow::Cow; use std::collections::BTreeMap; use std::hash::Hash; -use std::io::Write; -use std::mem::size_of; -use std::net::{SocketAddr, SocketAddrV4}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use crossbeam_channel::{Sender, TrySendError}; +use crossbeam_channel::{Receiver, SendError, Sender, TrySendError}; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::CanonicalSocketAddr; use aquatic_udp_protocol::*; use hdrhistogram::Histogram; -use thingbuf::mpsc::blocking::SendRef; use crate::config::Config; pub const BUFFER_SIZE: usize = 8192; -#[derive(PartialEq, Eq, Clone, Debug)] -pub enum CowResponse<'a> { - Connect(Cow<'a, ConnectResponse>), - AnnounceIpv4(Cow<'a, AnnounceResponse>), - AnnounceIpv6(Cow<'a, AnnounceResponse>), - Scrape(Cow<'a, ScrapeResponse>), - Error(Cow<'a, ErrorResponse>), -} - -impl From for CowResponse<'_> { - fn from(value: Response) -> Self { - match value { - Response::AnnounceIpv4(r) => Self::AnnounceIpv4(Cow::Owned(r)), - Response::AnnounceIpv6(r) => Self::AnnounceIpv6(Cow::Owned(r)), - Response::Connect(r) => Self::Connect(Cow::Owned(r)), - Response::Scrape(r) => Self::Scrape(Cow::Owned(r)), - Response::Error(r) => Self::Error(Cow::Owned(r)), - } - } -} - -impl<'a> CowResponse<'a> { - pub fn into_owned(self) -> Response { - match self { - CowResponse::Connect(r) => Response::Connect(r.into_owned()), - CowResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r.into_owned()), - CowResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r.into_owned()), - CowResponse::Scrape(r) => Response::Scrape(r.into_owned()), - CowResponse::Error(r) => Response::Error(r.into_owned()), - } - } - - #[inline] - pub fn write(&self, bytes: &mut impl Write) -> Result<(), ::std::io::Error> { - match self { - Self::Connect(r) => r.write(bytes), - Self::AnnounceIpv4(r) => r.write(bytes), - Self::AnnounceIpv6(r) => r.write(bytes), - Self::Scrape(r) => r.write(bytes), - Self::Error(r) => r.write(bytes), - } - } -} - #[derive(Debug)] pub struct PendingScrapeRequest { pub slab_key: usize, @@ -88,52 +39,6 @@ pub enum ConnectedResponse { Scrape(PendingScrapeResponse), } -pub enum ConnectedResponseKind { - AnnounceIpv4, - AnnounceIpv6, - Scrape, -} - -pub struct ConnectedResponseWithAddr { - pub kind: ConnectedResponseKind, - pub announce_ipv4: AnnounceResponse, - pub announce_ipv6: AnnounceResponse, - pub scrape: PendingScrapeResponse, - pub addr: CanonicalSocketAddr, -} - -impl ConnectedResponseWithAddr { - pub fn estimated_max_size(config: &Config) -> usize { - size_of::() - + config.protocol.max_response_peers - * (size_of::>() - + size_of::>()) - } -} - -pub struct Recycler; - -impl thingbuf::Recycle for Recycler { - fn new_element(&self) -> ConnectedResponseWithAddr { - ConnectedResponseWithAddr { - kind: ConnectedResponseKind::AnnounceIpv4, - announce_ipv4: AnnounceResponse::empty(), - announce_ipv6: AnnounceResponse::empty(), - scrape: PendingScrapeResponse { - slab_key: 0, - torrent_stats: Default::default(), - }, - addr: CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(0.into(), 0))), - } - } - fn recycle(&self, element: &mut ConnectedResponseWithAddr) { - element.announce_ipv4.peers.clear(); - element.announce_ipv6.peers.clear(); - element.scrape.torrent_stats.clear(); - element.addr = CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(0.into(), 0))); - } -} - #[derive(Clone, Copy, Debug)] pub struct SocketWorkerIndex(pub usize); @@ -180,54 +85,73 @@ impl ConnectedRequestSender { } pub struct ConnectedResponseSender { - senders: Vec>, + senders: Vec>, to_any_last_index_picked: usize, } impl ConnectedResponseSender { - pub fn new( - senders: Vec>, - ) -> Self { + pub fn new(senders: Vec>) -> Self { Self { senders, to_any_last_index_picked: 0, } } - pub fn try_send_ref_to( + pub fn try_send_to( &self, index: SocketWorkerIndex, - ) -> Result, thingbuf::mpsc::errors::TrySendError> { - self.senders[index.0].try_send_ref() + addr: CanonicalSocketAddr, + response: ConnectedResponse, + ) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> { + self.senders[index.0].try_send((addr, response)) } - pub fn send_ref_to( + pub fn send_to( &self, index: SocketWorkerIndex, - ) -> Result, thingbuf::mpsc::errors::Closed> { - self.senders[index.0].send_ref() + addr: CanonicalSocketAddr, + response: ConnectedResponse, + ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { + self.senders[index.0].send((addr, response)) } - pub fn send_ref_to_any( + pub fn send_to_any( &mut self, - ) -> Result, thingbuf::mpsc::errors::Closed> { + addr: CanonicalSocketAddr, + response: ConnectedResponse, + ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> { let start = self.to_any_last_index_picked + 1; - for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { - if let Ok(sender) = self.senders[i].try_send_ref() { - self.to_any_last_index_picked = i; + let mut message = Some((addr, response)); - return Ok(sender); + for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { + match self.senders[i].try_send(message.take().unwrap()) { + Ok(()) => { + self.to_any_last_index_picked = i; + + return Ok(()); + } + Err(TrySendError::Full(msg)) => { + message = Some(msg); + } + Err(TrySendError::Disconnected(_)) => { + panic!("ConnectedResponseReceiver disconnected"); + } } } + let (addr, response) = message.unwrap(); + self.to_any_last_index_picked = start % self.senders.len(); - self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked)) + self.send_to( + SocketWorkerIndex(self.to_any_last_index_picked), + addr, + response, + ) } } -pub type ConnectedResponseReceiver = - thingbuf::mpsc::blocking::Receiver; +pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index c9871ba..1a05df5 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -3,7 +3,6 @@ pub mod config; pub mod workers; use std::collections::BTreeMap; -use std::mem::size_of; use std::thread::Builder; use std::time::Duration; @@ -16,28 +15,20 @@ use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::{CanonicalSocketAddr, PanicSentinelWatcher, ServerStartInstant}; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use common::{ - ConnectedRequestSender, ConnectedResponseSender, Recycler, SocketWorkerIndex, State, - SwarmWorkerIndex, + ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, }; use config::Config; use workers::socket::ConnectionValidator; -use crate::common::{ConnectedRequest, ConnectedResponseWithAddr}; - pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - ::log::info!( - "Estimated max channel memory use: {:.02} MB", - est_max_total_channel_memory(&config) - ); - let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); @@ -56,19 +47,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let server_start_instant = ServerStartInstant::new(); for i in 0..config.swarm_workers { - let (request_sender, request_receiver) = if config.worker_channel_size == 0 { - unbounded() - } else { - bounded(config.worker_channel_size) - }; + let (request_sender, request_receiver) = bounded(config.worker_channel_size); request_senders.push(request_sender); request_receivers.insert(i, request_receiver); } for i in 0..config.socket_workers { - let (response_sender, response_receiver) = - thingbuf::mpsc::blocking::with_recycle(config.worker_channel_size, Recycler); + let (response_sender, response_receiver) = bounded(config.worker_channel_size); response_senders.push(response_sender); response_receivers.insert(i, response_receiver); @@ -214,16 +200,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } - -fn est_max_total_channel_memory(config: &Config) -> f64 { - let request_channel_max_size = config.swarm_workers - * config.worker_channel_size - * (size_of::() - + size_of::() - + size_of::()); - let response_channel_max_size = config.socket_workers - * config.worker_channel_size - * ConnectedResponseWithAddr::estimated_max_size(&config); - - (request_channel_max_size as u64 + response_channel_max_size as u64) as f64 / (1024.0 * 1024.0) -} diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index 64daf5f..eddef10 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; @@ -42,7 +41,7 @@ pub struct SocketWorker { server_start_instant: ServerStartInstant, pending_scrape_responses: PendingScrapeResponseSlab, socket: UdpSocket, - opt_resend_buffer: Option>, + opt_resend_buffer: Option>, buffer: [u8; BUFFER_SIZE], polling_mode: PollMode, /// Storage for requests that couldn't be sent to swarm worker because channel was full @@ -133,14 +132,14 @@ impl SocketWorker { // If resend buffer is enabled, send any responses in it if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() { - for (response, addr) in resend_buffer.drain(..) { + for (addr, response) in resend_buffer.drain(..) { Self::send_response( &self.config, &self.shared_state, &mut self.socket, &mut self.buffer, &mut None, - response.into(), + response, addr, ); } @@ -235,7 +234,7 @@ impl SocketWorker { &mut self.socket, &mut self.buffer, &mut self.opt_resend_buffer, - CowResponse::Error(Cow::Owned(response)), + Response::Error(response), src, ); } @@ -310,7 +309,7 @@ impl SocketWorker { &mut self.socket, &mut self.buffer, &mut self.opt_resend_buffer, - CowResponse::Connect(Cow::Owned(response)), + Response::Connect(response), src, ); @@ -346,7 +345,7 @@ impl SocketWorker { &mut self.socket, &mut self.buffer, &mut self.opt_resend_buffer, - CowResponse::Error(Cow::Owned(response)), + Response::Error(response), src, ); @@ -392,30 +391,20 @@ impl SocketWorker { } fn handle_swarm_worker_responses(&mut self) { - loop { - let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() { - recv_ref - } else { - break; - }; - - let response = match recv_ref.kind { - ConnectedResponseKind::Scrape => { + for (addr, response) in self.response_receiver.try_iter() { + let response = match response { + ConnectedResponse::Scrape(response) => { if let Some(r) = self .pending_scrape_responses - .add_and_get_finished(&recv_ref.scrape) + .add_and_get_finished(&response) { - CowResponse::Scrape(Cow::Owned(r)) + Response::Scrape(r) } else { continue; } } - ConnectedResponseKind::AnnounceIpv4 => { - CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4)) - } - ConnectedResponseKind::AnnounceIpv6 => { - CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6)) - } + ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), + ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), }; Self::send_response( @@ -425,7 +414,7 @@ impl SocketWorker { &mut self.buffer, &mut self.opt_resend_buffer, response, - recv_ref.addr, + addr, ); } } @@ -435,8 +424,8 @@ impl SocketWorker { shared_state: &State, socket: &mut UdpSocket, buffer: &mut [u8], - opt_resend_buffer: &mut Option>, - response: CowResponse, + opt_resend_buffer: &mut Option>, + response: Response, canonical_addr: CanonicalSocketAddr, ) { let mut buffer = Cursor::new(&mut buffer[..]); @@ -478,18 +467,18 @@ impl SocketWorker { }; match response { - CowResponse::Connect(_) => { + Response::Connect(_) => { stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); } - CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => { + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { stats .responses_sent_announce .fetch_add(1, Ordering::Relaxed); } - CowResponse::Scrape(_) => { + Response::Scrape(_) => { stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); } - CowResponse::Error(_) => { + Response::Error(_) => { stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); } } @@ -503,7 +492,7 @@ impl SocketWorker { if resend_buffer.len() < config.network.resend_buffer_max_len { ::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); - resend_buffer.push((response.into_owned(), canonical_addr)); + resend_buffer.push((canonical_addr, response)); } else { ::log::warn!("Response resend buffer full, dropping response"); } diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index 4ddf875..d41aecb 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -2,7 +2,6 @@ mod buf_ring; mod recv_helper; mod send_buffers; -use std::borrow::Cow; use std::cell::RefCell; use std::collections::VecDeque; use std::net::UdpSocket; @@ -217,8 +216,7 @@ impl SocketWorker { num_send_added += 1; } Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses - .push_front((response.into_owned(), addr)); + self.local_responses.push_front((response, addr)); break; } @@ -233,40 +231,32 @@ impl SocketWorker { // Enqueue swarm worker responses for _ in 0..(sq_space - num_send_added) { - let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() { - recv_ref + let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() { + r } else { break; }; - let response = match recv_ref.kind { - ConnectedResponseKind::AnnounceIpv4 => { - CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4)) - } - ConnectedResponseKind::AnnounceIpv6 => { - CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6)) - } - ConnectedResponseKind::Scrape => { - if let Some(response) = self - .pending_scrape_responses - .add_and_get_finished(&recv_ref.scrape) - { - CowResponse::Scrape(Cow::Owned(response)) + let response = match response { + ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r), + ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), + ConnectedResponse::Scrape(r) => { + if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) { + Response::Scrape(r) } else { continue; } } }; - match self.send_buffers.prepare_entry(response, recv_ref.addr) { + match self.send_buffers.prepare_entry(response, addr) { Ok(entry) => { unsafe { ring.submission().push(&entry).unwrap() }; num_send_added += 1; } Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses - .push_back((response.into_owned(), recv_ref.addr)); + self.local_responses.push_back((response, addr)); break; } diff --git a/crates/udp/src/workers/socket/uring/send_buffers.rs b/crates/udp/src/workers/socket/uring/send_buffers.rs index b62ca90..dec4843 100644 --- a/crates/udp/src/workers/socket/uring/send_buffers.rs +++ b/crates/udp/src/workers/socket/uring/send_buffers.rs @@ -6,14 +6,15 @@ use std::{ }; use aquatic_common::CanonicalSocketAddr; +use aquatic_udp_protocol::Response; use io_uring::opcode::SendMsg; -use crate::{common::CowResponse, config::Config}; +use crate::config::Config; use super::{RESPONSE_BUF_LEN, SOCKET_IDENTIFIER}; -pub enum Error<'a> { - NoBuffers(CowResponse<'a>), +pub enum Error { + NoBuffers(Response), SerializationFailed(std::io::Error), } @@ -59,9 +60,9 @@ impl SendBuffers { pub fn prepare_entry<'a>( &mut self, - response: CowResponse<'a>, + response: Response, addr: CanonicalSocketAddr, - ) -> Result> { + ) -> Result { let index = if let Some(index) = self.next_free_index() { index } else { @@ -163,7 +164,7 @@ impl SendBuffer { fn prepare_entry( &mut self, - response: CowResponse, + response: Response, addr: CanonicalSocketAddr, socket_is_ipv4: bool, metadata: &mut SendBufferMetadata, @@ -237,12 +238,12 @@ pub enum ResponseType { } impl ResponseType { - fn from_response(response: &CowResponse) -> Self { + fn from_response(response: &Response) -> Self { match response { - CowResponse::Connect(_) => Self::Connect, - CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => Self::Announce, - CowResponse::Scrape(_) => Self::Scrape, - CowResponse::Error(_) => Self::Error, + Response::Connect(_) => Self::Connect, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => Self::Announce, + Response::Scrape(_) => Self::Scrape, + Response::Error(_) => Self::Error, } } } diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index ea551f3..9c7e00f 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -47,15 +47,7 @@ pub fn run_swarm_worker( // sends in socket workers (doing both could cause a deadlock) match (request, src.get().ip()) { (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - // It doesn't matter which socket worker receives announce responses - let mut send_ref = response_sender - .send_ref_to_any() - .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - - torrents + let response = torrents .ipv4 .0 .entry(request.info_hash) @@ -67,19 +59,15 @@ pub fn run_swarm_worker( &request, ip.into(), peer_valid_until, - &mut send_ref.announce_ipv4, ); + + // It doesn't matter which socket worker receives announce responses + response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) + .expect("swarm response channel is closed"); } (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - // It doesn't matter which socket worker receives announce responses - let mut send_ref = response_sender - .send_ref_to_any() - .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::AnnounceIpv6; - - torrents + let response = torrents .ipv6 .0 .entry(request.info_hash) @@ -91,28 +79,26 @@ pub fn run_swarm_worker( &request, ip.into(), peer_valid_until, - &mut send_ref.announce_ipv6, ); + + // It doesn't matter which socket worker receives announce responses + response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) + .expect("swarm response channel is closed"); } (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - let mut send_ref = response_sender - .send_ref_to(sender_index) + let response = torrents.ipv4.scrape(request); + + response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv4.scrape(request, &mut send_ref.scrape); } (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - let mut send_ref = response_sender - .send_ref_to(sender_index) + let response = torrents.ipv6.scrape(request); + + response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) .expect("swarm response channel is closed"); - - send_ref.addr = src; - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv6.scrape(request, &mut send_ref.scrape); } }; } diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 28984b1..36441b2 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -79,24 +79,29 @@ impl TorrentMaps { pub struct TorrentMap(pub IndexMap>); impl TorrentMap { - pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) { - response.slab_key = request.slab_key; + pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse { + let torrent_stats = request + .info_hashes + .into_iter() + .map(|(i, info_hash)| { + let stats = self + .0 + .get(&info_hash) + .map(|torrent_data| torrent_data.scrape_statistics()) + .unwrap_or_else(|| TorrentScrapeStatistics { + seeders: NumberOfPeers::new(0), + leechers: NumberOfPeers::new(0), + completed: NumberOfDownloads::new(0), + }); - let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| { - let stats = self - .0 - .get(&info_hash) - .map(|torrent_data| torrent_data.scrape_statistics()) - .unwrap_or_else(|| TorrentScrapeStatistics { - seeders: NumberOfPeers::new(0), - leechers: NumberOfPeers::new(0), - completed: NumberOfDownloads::new(0), - }); + (i, stats) + }) + .collect(); - (i, stats) - }); - - response.torrent_stats.extend(torrent_stats); + PendingScrapeResponse { + slab_key: request.slab_key, + torrent_stats, + } } /// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers fn clean_and_get_statistics( @@ -187,8 +192,7 @@ impl TorrentData { request: &AnnounceRequest, ip_address: I, valid_until: ValidUntil, - response: &mut AnnounceResponse, - ) { + ) -> AnnounceResponse { let max_num_peers_to_take: usize = if request.peers_wanted.0.get() <= 0 { config.protocol.max_response_peers } else { @@ -209,23 +213,24 @@ impl TorrentData { // Create the response before inserting the peer. This means that we // don't have to filter it out from the response peers, and that the // reported number of seeders/leechers will not include it - let opt_removed_peer = match self { + let (response, opt_removed_peer) = match self { Self::Small(peer_map) => { let opt_removed_peer = peer_map.remove(&peer_map_key); let (seeders, leechers) = peer_map.num_seeders_leechers(); - response.fixed = AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, - ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + let response = AnnounceResponse { + fixed: AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }, + peers: peer_map.extract_response_peers(max_num_peers_to_take), }; - peer_map.extract_response_peers(max_num_peers_to_take, &mut response.peers); - // Convert peer map to large variant if it is full and // announcing peer is not stopped and will therefore be // inserted @@ -233,24 +238,25 @@ impl TorrentData { *self = Self::Large(peer_map.to_large()); } - opt_removed_peer + (response, opt_removed_peer) } Self::Large(peer_map) => { let opt_removed_peer = peer_map.remove_peer(&peer_map_key); let (seeders, leechers) = peer_map.num_seeders_leechers(); - response.fixed = AnnounceResponseFixedData { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval::new( - config.protocol.peer_announce_interval, - ), - leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), - seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + let response = AnnounceResponse { + fixed: AnnounceResponseFixedData { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval::new( + config.protocol.peer_announce_interval, + ), + leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)), + seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)), + }, + peers: peer_map.extract_response_peers(rng, max_num_peers_to_take), }; - peer_map.extract_response_peers(rng, max_num_peers_to_take, &mut response.peers); - // Try shrinking the map if announcing peer is stopped and // will therefore not be inserted if status == PeerStatus::Stopped { @@ -259,7 +265,7 @@ impl TorrentData { } } - opt_removed_peer + (response, opt_removed_peer) } }; @@ -290,6 +296,8 @@ impl TorrentData { } } }; + + response } pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { @@ -313,7 +321,7 @@ impl Default for TorrentData { } /// Store torrents with up to two peers without an extra heap allocation -/// +/// /// On public open trackers, this is likely to be the majority of torrents. #[derive(Default, Debug)] pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>); @@ -344,12 +352,8 @@ impl SmallPeerMap { None } - fn extract_response_peers( - &self, - max_num_peers_to_take: usize, - peers: &mut Vec>, - ) { - peers.extend(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| k)) + fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec> { + Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k)) } fn clean_and_get_num_peers( @@ -427,10 +431,9 @@ impl LargePeerMap { &self, rng: &mut impl Rng, max_num_peers_to_take: usize, - peers: &mut Vec>, - ) { + ) -> Vec> { if self.peers.len() <= max_num_peers_to_take { - peers.extend(self.peers.keys()); + self.peers.keys().copied().collect() } else { let middle_index = self.peers.len() / 2; let num_to_take_per_half = max_num_peers_to_take / 2; @@ -451,12 +454,16 @@ impl LargePeerMap { let end_half_one = offset_half_one + num_to_take_per_half; let end_half_two = offset_half_two + num_to_take_per_half; + let mut peers = Vec::with_capacity(max_num_peers_to_take); + if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) { peers.extend(slice.keys()); } if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) { peers.extend(slice.keys()); } + + peers } } From 0c03048ce8a520311bbcfff38257b9d0ed92f245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:42:17 +0100 Subject: [PATCH 03/13] udp: update metrics crate --- Cargo.lock | 21 +-- crates/udp/Cargo.toml | 7 +- .../udp/src/workers/statistics/collector.rs | 145 ++++++------------ crates/udp/src/workers/statistics/mod.rs | 8 +- 4 files changed, 60 insertions(+), 121 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3a0a4c..f0f4a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,9 +306,9 @@ dependencies = [ "io-uring", "libc", "log", - "metrics 0.21.1", - "metrics-exporter-prometheus 0.12.2", - "metrics-util 0.15.1", + "metrics 0.22.0", + "metrics-exporter-prometheus 0.13.0", + "metrics-util 0.16.0", "mimalloc", "mio", "num-format", @@ -1819,16 +1819,12 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" dependencies = [ - "aho-corasick", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.1", - "indexmap 1.9.3", "metrics 0.21.1", "num_cpus", - "ordered-float 3.9.2", "quanta 0.11.1", - "radix_trie", "sketches-ddsketch 0.2.1", ] @@ -1845,7 +1841,7 @@ dependencies = [ "indexmap 1.9.3", "metrics 0.22.0", "num_cpus", - "ordered-float 4.2.0", + "ordered-float", "quanta 0.12.2", "radix_trie", "sketches-ddsketch 0.2.1", @@ -2042,15 +2038,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "ordered-float" -version = "3.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" -dependencies = [ - "num-traits", -] - [[package]] name = "ordered-float" version = "4.2.0" diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml index 977ae6a..430ab49 100644 --- a/crates/udp/Cargo.toml +++ b/crates/udp/Cargo.toml @@ -56,15 +56,14 @@ time = { version = "0.3", features = ["formatting"] } tinytemplate = "1" # prometheus feature -metrics = { version = "0.21", optional = true } -metrics-util = { version = "0.15", optional = true } -metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } +metrics = { version = "0.22", optional = true } +metrics-util = { version = "0.16", optional = true } +metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] } # io-uring feature io-uring = { version = "0.6", optional = true } [dev-dependencies] -hex = "0.4" tempfile = "3" quickcheck = "1" quickcheck_macros = "1" diff --git a/crates/udp/src/workers/statistics/collector.rs b/crates/udp/src/workers/statistics/collector.rs index 33c0a4e..820e0b5 100644 --- a/crates/udp/src/workers/statistics/collector.rs +++ b/crates/udp/src/workers/statistics/collector.rs @@ -9,6 +9,18 @@ use serde::Serialize; use crate::common::Statistics; use crate::config::Config; +#[cfg(feature = "prometheus")] +macro_rules! set_peer_histogram_gauge { + ($ip_version:ident, $data:expr, $type_label:expr) => { + ::metrics::gauge!( + "aquatic_peers_per_torrent", + "type" => $type_label, + "ip_version" => $ip_version.clone(), + ) + .set($data as f64); + }; +} + pub struct StatisticsCollector { shared: Arc, last_update: Instant, @@ -79,59 +91,65 @@ impl StatisticsCollector { if config.statistics.run_prometheus_endpoint { ::metrics::counter!( "aquatic_requests_total", - requests_received.try_into().unwrap(), "ip_version" => self.ip_version.clone(), - ); + ) + .increment(requests_received.try_into().unwrap()); + ::metrics::counter!( "aquatic_responses_total", - responses_sent_connect.try_into().unwrap(), "type" => "connect", "ip_version" => self.ip_version.clone(), - ); + ) + .increment(responses_sent_connect.try_into().unwrap()); + ::metrics::counter!( "aquatic_responses_total", - responses_sent_announce.try_into().unwrap(), "type" => "announce", "ip_version" => self.ip_version.clone(), - ); + ) + .increment(responses_sent_announce.try_into().unwrap()); + ::metrics::counter!( "aquatic_responses_total", - responses_sent_scrape.try_into().unwrap(), "type" => "scrape", "ip_version" => self.ip_version.clone(), - ); + ) + .increment(responses_sent_scrape.try_into().unwrap()); + ::metrics::counter!( "aquatic_responses_total", - responses_sent_error.try_into().unwrap(), "type" => "error", "ip_version" => self.ip_version.clone(), - ); + ) + .increment(responses_sent_error.try_into().unwrap()); + ::metrics::counter!( "aquatic_rx_bytes", - bytes_received.try_into().unwrap(), "ip_version" => self.ip_version.clone(), - ); + ) + .increment(bytes_received.try_into().unwrap()); + ::metrics::counter!( "aquatic_tx_bytes", - bytes_sent.try_into().unwrap(), "ip_version" => self.ip_version.clone(), - ); + ) + .increment(bytes_sent.try_into().unwrap()); for (worker_index, n) in num_torrents_by_worker.iter().copied().enumerate() { ::metrics::gauge!( "aquatic_torrents", - n as f64, "ip_version" => self.ip_version.clone(), "worker_index" => worker_index.to_string(), - ); + ) + .set(n as f64); } for (worker_index, n) in num_peers_by_worker.iter().copied().enumerate() { ::metrics::gauge!( "aquatic_peers", - n as f64, "ip_version" => self.ip_version.clone(), "worker_index" => worker_index.to_string(), - ); + ) + .set(n as f64); } if config.statistics.torrent_peer_histograms { @@ -236,83 +254,18 @@ impl PeerHistogramStatistics { #[cfg(feature = "prometheus")] fn update_metrics(&self, ip_version: String) { - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.min as f64, - "type" => "max", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p10 as f64, - "type" => "p10", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p20 as f64, - "type" => "p20", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p30 as f64, - "type" => "p30", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p40 as f64, - "type" => "p40", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p50 as f64, - "type" => "p50", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p60 as f64, - "type" => "p60", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p70 as f64, - "type" => "p70", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p80 as f64, - "type" => "p80", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p90 as f64, - "type" => "p90", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p99 as f64, - "type" => "p99", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.p999 as f64, - "type" => "p99.9", - "ip_version" => ip_version.clone(), - ); - ::metrics::gauge!( - "aquatic_peers_per_torrent", - self.max as f64, - "type" => "max", - "ip_version" => ip_version.clone(), - ); + set_peer_histogram_gauge!(ip_version, self.min, "min"); + set_peer_histogram_gauge!(ip_version, self.p10, "p10"); + set_peer_histogram_gauge!(ip_version, self.p20, "p20"); + set_peer_histogram_gauge!(ip_version, self.p30, "p30"); + set_peer_histogram_gauge!(ip_version, self.p40, "p40"); + set_peer_histogram_gauge!(ip_version, self.p50, "p50"); + set_peer_histogram_gauge!(ip_version, self.p60, "p60"); + set_peer_histogram_gauge!(ip_version, self.p70, "p70"); + set_peer_histogram_gauge!(ip_version, self.p80, "p80"); + set_peer_histogram_gauge!(ip_version, self.p90, "p90"); + set_peer_histogram_gauge!(ip_version, self.p99, "p99"); + set_peer_histogram_gauge!(ip_version, self.p999, "p999"); + set_peer_histogram_gauge!(ip_version, self.max, "max"); } } diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index c950850..e9a8565 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -152,9 +152,9 @@ pub fn run_statistics_worker( for (prefix, count) in prefixes { ::metrics::gauge!( "aquatic_peer_id_prefixes", - count as f64, "prefix_hex" => prefix.to_string(), - ); + ) + .set(count as f64); } } @@ -169,9 +169,9 @@ pub fn run_statistics_worker( if config.statistics.run_prometheus_endpoint { ::metrics::gauge!( "aquatic_peer_clients", - count as f64, "client" => client.to_string(), - ); + ) + .set(count as f64); } } From 1de07ec603745fd4b48cd28f7cf0a11eed6ca1f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:43:03 +0100 Subject: [PATCH 04/13] Run cargo update --- Cargo.lock | 149 +++++++++++++++++++++++++---------------------------- 1 file changed, 70 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0f4a44..53a4b60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,9 +64,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.5" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6" +checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5" dependencies = [ "anstyle", "anstyle-parse", @@ -134,7 +134,7 @@ dependencies = [ "anyhow", "aquatic_udp", "aquatic_udp_load_test", - "clap 4.4.13", + "clap 4.4.18", "humanize-bytes", "indexmap 2.1.0", "indoc", @@ -498,9 +498,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.5" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bendy" @@ -528,9 +528,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.1" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "bitmaps" @@ -661,9 +661,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.13" +version = "4.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" +checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" dependencies = [ "clap_builder", "clap_derive", @@ -671,9 +671,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.12" +version = "4.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" +checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" dependencies = [ "anstream", "anstyle", @@ -806,7 +806,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.4.13", + "clap 4.4.18", "criterion-plot", "is-terminal", "itertools 0.10.5", @@ -835,11 +835,10 @@ dependencies = [ [[package]] name = "crossbeam" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eb9105919ca8e40d437fc9cbb8f1975d916f1bd28afe795a48aae32a2cc8920" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" dependencies = [ - "cfg-if", "crossbeam-channel", "crossbeam-deque", "crossbeam-epoch", @@ -849,54 +848,46 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2" +checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-deque" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.17" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e3681d554572a651dda4186cd47240627c3d0114d45a95f6ad27f2f22e7548d" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-queue" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc6598521bb5a83d491e8c1fe51db7296019d2ca3cb93cc6c2a20369a4d78a2" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-utils" -version = "0.8.18" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1158,9 +1149,9 @@ dependencies = [ [[package]] name = "futures-rustls" -version = "0.25.0" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3afda89bce8f65072d24f8b99a2127e229462d8008182ca93f1d5d2e5df8f22f" +checksum = "c8d8a2499f0fecc0492eb3e47eab4e92da7875e1028ad2528f214ac3346ca04e" dependencies = [ "futures-io", "rustls", @@ -1209,9 +1200,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "js-sys", @@ -1356,9 +1347,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" [[package]] name = "hex" @@ -1534,7 +1525,7 @@ version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi 0.3.4", "rustix", "windows-sys 0.52.0", ] @@ -1565,9 +1556,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.66" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" dependencies = [ "wasm-bindgen", ] @@ -1654,9 +1645,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "libm" @@ -1676,9 +1667,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -2004,7 +1995,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi 0.3.4", "libc", ] @@ -2105,9 +2096,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" [[package]] name = "plotters" @@ -2335,14 +2326,14 @@ version = "11.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", ] [[package]] name = "rayon" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" dependencies = [ "either", "rayon-core", @@ -2350,9 +2341,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ "crossbeam-deque", "crossbeam-utils", @@ -2447,11 +2438,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" dependencies = [ - "bitflags 2.4.1", + "bitflags 2.4.2", "errno 0.3.8", "libc", "linux-raw-sys", @@ -2460,9 +2451,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6b63262c9fcac8659abfaa96cac103d28166d3ff3eaf8f412e19f3ae9e5a48" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", "ring", @@ -2623,9 +2614,9 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.13.6" +version = "0.13.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c58001aca67fc467da571f35e7e1dc9c094e91b099cc54bd3cead2962db2432" +checksum = "2faf8f101b9bc484337a6a6b0409cf76c139f2fb70a9e3aee6b6774be7bfbf76" dependencies = [ "getrandom", "halfbrown", @@ -2687,9 +2678,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "smartstring" @@ -3015,9 +3006,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" @@ -3120,9 +3111,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -3130,9 +3121,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" dependencies = [ "bumpalo", "log", @@ -3145,9 +3136,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3155,9 +3146,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" dependencies = [ "proc-macro2", "quote", @@ -3168,15 +3159,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.89" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" [[package]] name = "web-sys" -version = "0.3.66" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" +checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" dependencies = [ "js-sys", "wasm-bindgen", @@ -3359,9 +3350,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.33" +version = "0.5.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7520bbdec7211caa7c4e682eb1fbe07abe20cee6756b6e00f537c82c11816aa" +checksum = "b7cf47b659b318dccbd69cc4797a39ae128f533dce7902a1096044d1967b9c16" dependencies = [ "memchr", ] From 75c14023948ef16fee13083db8ac0c36011f57af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:43:39 +0100 Subject: [PATCH 05/13] Update CHANGELOG --- CHANGELOG.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d06801..8a8def4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,32 +15,31 @@ #### Added * Add support for reporting peer client information +* Speed up parsing and serialization of requests and responses with + [zerocopy](https://crates.io/crates/zerocopy) +* Store torrents with up to two peers without an extra heap allocation for the + peers. #### Changed * Index peers by packet source IP and provided port, instead of by peer_id. This prevents users from impersonating others and is likely also slightly faster for IPv4 peers. -* Store torrents with up to two peers without an extra heap allocation for the - peers. * Remove support for unbounded worker channels * Add backpressure in socket workers. They will postpone reading from the socket if sending a request to a swarm worker failed -* Remove config key `network.poll_event_capacity` -* Harden ConnectionValidator to make IP spoofing even more costly * Distribute announce responses from swarm workers over socket workers to decrease performance loss due to underutilized threads +* Harden ConnectionValidator to make IP spoofing even more costly +* Remove config key `network.poll_event_capacity` (always use 1) ### aquatic_http #### Added * Reload TLS certificate (and key) on SIGUSR1 - -#### Changed - -* Allow running without TLS -* Allow running behind reverse proxy +* Support running without TLS +* Support running behind reverse proxy #### Fixed @@ -65,6 +64,7 @@ #### Fixed +* Fix memory leak * Fix bug where clean up after closing connections wasn't always done * Fix double counting of error responses * Actually close connections that are too slow to send responses to From 22e151d0f051161691bbe31b562ea245df170291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:43:55 +0100 Subject: [PATCH 06/13] http: start awaiting conn close message before tls setup --- crates/http/src/workers/socket/connection.rs | 22 +++------------ crates/http/src/workers/socket/mod.rs | 28 +++++++++++++------- 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs index 7db5146..b65487a 100644 --- a/crates/http/src/workers/socket/connection.rs +++ b/crates/http/src/workers/socket/connection.rs @@ -16,11 +16,9 @@ use aquatic_http_protocol::response::{ use arc_swap::ArcSwap; use either::Either; use futures::stream::FuturesUnordered; -use futures_lite::future::race; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::LocalReceiver; use glommio::channels::shared_channel::{self, SharedReceiver}; use glommio::net::TcpStream; use once_cell::sync::Lazy; @@ -76,7 +74,6 @@ pub(super) async fn run_connection( server_start_instant: ServerStartInstant, opt_tls_config: Option>>, valid_until: Rc>, - close_conn_receiver: LocalReceiver<()>, stream: TcpStream, ) -> Result<(), ConnectionError> { let access_list_cache = create_access_list_cache(&access_list); @@ -119,7 +116,7 @@ pub(super) async fn run_connection( stream, }; - conn.run(close_conn_receiver).await?; + conn.run().await } else { let mut conn = Connection { config, @@ -135,10 +132,8 @@ pub(super) async fn run_connection( stream, }; - conn.run(close_conn_receiver).await?; + conn.run().await } - - Ok(()) } struct Connection { @@ -159,18 +154,7 @@ impl Connection where S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, { - async fn run(&mut self, close_conn_receiver: LocalReceiver<()>) -> Result<(), ConnectionError> { - let f1 = async { self.run_request_response_loop().await }; - let f2 = async { - close_conn_receiver.recv().await; - - Err(ConnectionError::Inactive) - }; - - race(f1, f2).await - } - - async fn run_request_response_loop(&mut self) -> Result<(), ConnectionError> { + async fn run(&mut self) -> Result<(), ConnectionError> { loop { let response = match self.read_request().await? { Either::Left(response) => Response::Failure(response), diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index 0baa68b..9fab395 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -12,6 +12,7 @@ use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; use arc_swap::ArcSwap; +use futures_lite::future::race; use futures_lite::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; use glommio::channels::local_channel::{new_bounded, LocalSender}; @@ -97,16 +98,23 @@ pub async fn run_socket_worker( "worker_index" => worker_index.to_string(), ); - let result = run_connection( - config, - access_list, - request_senders, - server_start_instant, - opt_tls_config, - valid_until.clone(), - close_conn_receiver, - stream, - ).await; + let f1 = async { run_connection( + config, + access_list, + request_senders, + server_start_instant, + opt_tls_config, + valid_until.clone(), + stream, + ).await + }; + let f2 = async { + close_conn_receiver.recv().await; + + Err(ConnectionError::Inactive) + }; + + let result = race(f1, f2).await; #[cfg(feature = "metrics")] ::metrics::decrement_gauge!( From 5401eaf85f85b7b720e3a76734c99ba8f399e986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 09:56:14 +0100 Subject: [PATCH 07/13] Run cargo clippy --fix and cargo fmt --- crates/bencher/src/common.rs | 12 ++------ crates/bencher/src/html.rs | 2 +- crates/bencher/src/run.rs | 8 ++---- crates/bencher/src/set.rs | 6 ++-- crates/combined_binary/src/main.rs | 6 ++-- crates/http/src/workers/socket/connection.rs | 8 +++--- crates/http/src/workers/socket/request.rs | 2 +- crates/http/src/workers/swarm/storage.rs | 12 +++----- crates/http_load_test/src/utils.rs | 6 ++-- crates/http_protocol/src/request.rs | 8 +++--- crates/http_protocol/src/utils.rs | 2 +- crates/peer_id/src/lib.rs | 2 +- crates/udp/src/common.rs | 2 +- crates/udp/src/workers/socket/mio.rs | 2 +- crates/udp/src/workers/socket/storage.rs | 2 +- crates/udp/src/workers/socket/validator.rs | 6 ++-- crates/udp/src/workers/swarm/storage.rs | 29 ++++++++++---------- crates/udp/tests/common/mod.rs | 10 +++---- crates/udp_load_test/src/lib.rs | 8 ++---- crates/udp_protocol/src/common.rs | 24 ++++++++-------- crates/udp_protocol/src/response.rs | 2 +- crates/ws/src/common.rs | 10 +++---- crates/ws/src/workers/socket/connection.rs | 4 +-- crates/ws/src/workers/swarm/mod.rs | 4 +-- crates/ws/src/workers/swarm/storage.rs | 16 ++++++----- crates/ws_load_test/src/network.rs | 2 +- crates/ws_protocol/src/common.rs | 2 +- crates/ws_protocol/src/lib.rs | 4 +-- 28 files changed, 94 insertions(+), 107 deletions(-) diff --git a/crates/bencher/src/common.rs b/crates/bencher/src/common.rs index 6d17664..4e5b958 100644 --- a/crates/bencher/src/common.rs +++ b/crates/bencher/src/common.rs @@ -27,17 +27,11 @@ impl TaskSetCpuList { let indicator = self.0.iter().map(|indicator| match indicator { TaskSetCpuIndicator::Single(i) => i.to_string(), TaskSetCpuIndicator::Range(range) => { - format!( - "{}-{}", - range.start, - range.clone().into_iter().last().unwrap() - ) + format!("{}-{}", range.start, range.clone().last().unwrap()) } }); - Itertools::intersperse_with(indicator, || ",".to_string()) - .into_iter() - .collect() + Itertools::intersperse_with(indicator, || ",".to_string()).collect() } pub fn new( @@ -163,7 +157,7 @@ pub fn simple_load_test_runs( workers: &[(usize, Priority)], ) -> Vec<(usize, Priority, TaskSetCpuList)> { workers - .into_iter() + .iter() .copied() .map(|(workers, priority)| { ( diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs index 15a0b93..03278f3 100644 --- a/crates/bencher/src/html.rs +++ b/crates/bencher/src/html.rs @@ -191,7 +191,7 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String { load_test_key_names = load_test_key_names.iter() .map(|name| format!("Load test {}", name)) .join("\n"), - body = results.into_iter().map(|r| { + body = results.iter_mut().map(|r| { formatdoc! { " diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs index 2757404..a28e62f 100644 --- a/crates/bencher/src/run.rs +++ b/crates/bencher/src/run.rs @@ -59,9 +59,7 @@ impl RunConfig { .run(command, &self.tracker_vcpus, &mut tracker_config_file) { Ok(handle) => ChildWrapper(handle), - Err(err) => { - return Err(RunErrorResults::new(self).set_error(err.into(), "run tracker")) - } + Err(err) => return Err(RunErrorResults::new(self).set_error(err, "run tracker")), }; ::std::thread::sleep(Duration::from_secs(1)); @@ -74,7 +72,7 @@ impl RunConfig { Ok(handle) => ChildWrapper(handle), Err(err) => { return Err(RunErrorResults::new(self) - .set_error(err.into(), "run load test") + .set_error(err, "run load test") .set_tracker_outputs(tracker)) } }; @@ -328,7 +326,7 @@ impl FromStr for ProcessStats { type Err = (); fn from_str(s: &str) -> Result { - let mut parts = s.trim().split_whitespace(); + let mut parts = s.split_whitespace(); let avg_cpu_utilization = parts.next().ok_or(())?.parse().map_err(|_| ())?; let peak_rss_kb: f32 = parts.next().ok_or(())?.parse().map_err(|_| ())?; diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs index a34a3e7..7ddeef2 100644 --- a/crates/bencher/src/set.rs +++ b/crates/bencher/src/set.rs @@ -73,13 +73,13 @@ pub fn run_sets( (minutes / 60, minutes % 60) }; - println!(""); + println!(); println!("Total number of load test runs: {}", total_num_runs); println!( "Estimated duration: {} hours, {} minutes", estimated_hours, estimated_minutes ); - println!(""); + println!(); let results = set_configs .into_iter() @@ -115,7 +115,7 @@ pub fn run_sets( &load_test_gen, load_test_parameters, implementation, - &tracker_run, + tracker_run, tracker_vcpus.clone(), load_test_vcpus, ) diff --git a/crates/combined_binary/src/main.rs b/crates/combined_binary/src/main.rs index 97fba0f..56eeddd 100644 --- a/crates/combined_binary/src/main.rs +++ b/crates/combined_binary/src/main.rs @@ -12,12 +12,12 @@ fn main() { ::std::process::exit(match run() { Ok(()) => 0, Err(None) => { - print_help(|| gen_info(), None); + print_help(gen_info, None); 0 } Err(opt_err @ Some(_)) => { - print_help(|| gen_info(), opt_err); + print_help(gen_info, opt_err); 1 } @@ -62,7 +62,7 @@ fn run() -> Result<(), Option> { arg => { let opt_err = if arg == "-h" || arg == "--help" { None - } else if arg.chars().next() == Some('-') { + } else if arg.starts_with('-') { Some("First argument must be protocol".to_string()) } else { Some("Invalid protocol".to_string()) diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs index b65487a..d44508a 100644 --- a/crates/http/src/workers/socket/connection.rs +++ b/crates/http/src/workers/socket/connection.rs @@ -383,7 +383,7 @@ where let body_len = response .write(&mut &mut self.response_buffer[position..]) - .map_err(|err| ConnectionError::ResponseBufferWrite(err))?; + .map_err(ConnectionError::ResponseBufferWrite)?; position += body_len; @@ -391,7 +391,7 @@ where return Err(ConnectionError::ResponseBufferFull); } - (&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n"); + self.response_buffer[position..position + 2].copy_from_slice(b"\r\n"); position += 2; @@ -403,7 +403,7 @@ where let start = RESPONSE_HEADER_A.len(); let end = start + RESPONSE_HEADER_B.len(); - (&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B); + self.response_buffer[start..end].copy_from_slice(RESPONSE_HEADER_B); } // Set content-len header value @@ -415,7 +415,7 @@ where let start = RESPONSE_HEADER_A.len(); let end = start + content_len_bytes.len(); - (&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes); + self.response_buffer[start..end].copy_from_slice(content_len_bytes); } // Write buffer to stream diff --git a/crates/http/src/workers/socket/request.rs b/crates/http/src/workers/socket/request.rs index 4412382..ec5f19f 100644 --- a/crates/http/src/workers/socket/request.rs +++ b/crates/http/src/workers/socket/request.rs @@ -52,7 +52,7 @@ fn parse_forwarded_header( header_format: ReverseProxyPeerIpHeaderFormat, headers: &[httparse::Header<'_>], ) -> anyhow::Result { - for header in headers.into_iter().rev() { + for header in headers.iter().rev() { if header.name == header_name { match header_format { ReverseProxyPeerIpHeaderFormat::LastAddress => { diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs index 1aa9bfe..d9ff867 100644 --- a/crates/http/src/workers/swarm/storage.rs +++ b/crates/http/src/workers/swarm/storage.rs @@ -66,16 +66,14 @@ impl TorrentMaps { valid_until, ); - let response = AnnounceResponse { + AnnounceResponse { complete: seeders, incomplete: leechers, announce_interval: config.protocol.peer_announce_interval, peers: ResponsePeerListV4(response_peers), peers6: ResponsePeerListV6(vec![]), warning_message: None, - }; - - response + } } IpAddr::V6(peer_ip_address) => { let (seeders, leechers, response_peers) = self @@ -90,16 +88,14 @@ impl TorrentMaps { valid_until, ); - let response = AnnounceResponse { + AnnounceResponse { complete: seeders, incomplete: leechers, announce_interval: config.protocol.peer_announce_interval, peers: ResponsePeerListV4(vec![]), peers6: ResponsePeerListV6(response_peers), warning_message: None, - }; - - response + } } } } diff --git a/crates/http_load_test/src/utils.rs b/crates/http_load_test/src/utils.rs index c3dab09..d22c2c1 100644 --- a/crates/http_load_test/src/utils.rs +++ b/crates/http_load_test/src/utils.rs @@ -19,7 +19,7 @@ pub fn create_random_request( let items = [RequestType::Announce, RequestType::Scrape]; - let dist = WeightedIndex::new(&weights).expect("random request weighted index"); + let dist = WeightedIndex::new(weights).expect("random request weighted index"); match items[dist.sample(rng)] { RequestType::Announce => create_announce_request(config, state, rng), @@ -37,7 +37,7 @@ fn create_announce_request(config: &Config, state: &LoadTestState, rng: &mut imp } }; - let info_hash_index = select_info_hash_index(config, &state, rng); + let info_hash_index = select_info_hash_index(config, state, rng); Request::Announce(AnnounceRequest { info_hash: state.info_hashes[info_hash_index], @@ -57,7 +57,7 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl let mut scrape_hashes = Vec::with_capacity(5); for _ in 0..5 { - let info_hash_index = select_info_hash_index(config, &state, rng); + let info_hash_index = select_info_hash_index(config, state, rng); scrape_hashes.push(state.info_hashes[info_hash_index]); } diff --git a/crates/http_protocol/src/request.rs b/crates/http_protocol/src/request.rs index 865a448..2fd175c 100644 --- a/crates/http_protocol/src/request.rs +++ b/crates/http_protocol/src/request.rs @@ -86,7 +86,7 @@ impl AnnounceRequest { let mut position = 0usize; for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) { - let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len()); + let segment_end = ampersand_iter.next().unwrap_or(query_string.len()); let key = query_string .get(position..equal_sign_index) @@ -207,7 +207,7 @@ impl ScrapeRequest { let mut position = 0usize; for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) { - let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len()); + let segment_end = ampersand_iter.next().unwrap_or(query_string.len()); let key = query_string .get(position..equal_sign_index) @@ -348,7 +348,7 @@ mod tests { let mut bytes = Vec::new(); bytes.extend_from_slice(b"GET "); - bytes.extend_from_slice(&ANNOUNCE_REQUEST_PATH.as_bytes()); + bytes.extend_from_slice(ANNOUNCE_REQUEST_PATH.as_bytes()); bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap(); @@ -362,7 +362,7 @@ mod tests { let mut bytes = Vec::new(); bytes.extend_from_slice(b"GET "); - bytes.extend_from_slice(&SCRAPE_REQUEST_PATH.as_bytes()); + bytes.extend_from_slice(SCRAPE_REQUEST_PATH.as_bytes()); bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap(); diff --git a/crates/http_protocol/src/utils.rs b/crates/http_protocol/src/utils.rs index 618eb02..51b2ac4 100644 --- a/crates/http_protocol/src/utils.rs +++ b/crates/http_protocol/src/utils.rs @@ -43,7 +43,7 @@ pub fn urldecode_20_bytes(value: &str) -> anyhow::Result<[u8; 20]> { let hex = [first as u8, second as u8]; - hex::decode_to_slice(&hex, &mut out_arr[i..i + 1]) + hex::decode_to_slice(hex, &mut out_arr[i..i + 1]) .map_err(|err| anyhow::anyhow!("hex decode error: {:?}", err))?; } else { out_arr[i] = c as u8; diff --git a/crates/peer_id/src/lib.rs b/crates/peer_id/src/lib.rs index dbc7fcf..304e1e5 100644 --- a/crates/peer_id/src/lib.rs +++ b/crates/peer_id/src/lib.rs @@ -234,7 +234,7 @@ mod tests { let len = bytes.len(); - (&mut peer_id.0[..len]).copy_from_slice(bytes); + peer_id.0[..len].copy_from_slice(bytes); peer_id } diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index dbfc868..f803ea5 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -211,7 +211,7 @@ impl Statistics { } fn create_atomic_usize_vec(len: usize) -> Vec { - ::std::iter::repeat_with(|| AtomicUsize::default()) + ::std::iter::repeat_with(AtomicUsize::default) .take(len) .collect() } diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index eddef10..beede61 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -205,7 +205,7 @@ impl SocketWorker { if let Err(HandleRequestError::RequestChannelFull(failed_requests)) = self.handle_request(pending_scrape_valid_until, request, src) { - self.pending_requests.extend(failed_requests.into_iter()); + self.pending_requests.extend(failed_requests); self.polling_mode = PollMode::SkipReceiving; break; diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs index 5ded3ac..f6cf088 100644 --- a/crates/udp/src/workers/socket/storage.rs +++ b/crates/udp/src/workers/socket/storage.rs @@ -44,7 +44,7 @@ impl PendingScrapeResponseSlab { for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests - .entry(SwarmWorkerIndex::from_info_hash(&config, info_hash)) + .entry(SwarmWorkerIndex::from_info_hash(config, info_hash)) .or_insert_with(|| PendingScrapeRequest { slab_key, info_hashes: BTreeMap::new(), diff --git a/crates/udp/src/workers/socket/validator.rs b/crates/udp/src/workers/socket/validator.rs index 4b5fe56..c68d1ef 100644 --- a/crates/udp/src/workers/socket/validator.rs +++ b/crates/udp/src/workers/socket/validator.rs @@ -59,8 +59,8 @@ impl ConnectionValidator { let mut connection_id_bytes = [0u8; 8]; - (&mut connection_id_bytes[..4]).copy_from_slice(&elapsed); - (&mut connection_id_bytes[4..]).copy_from_slice(&hash); + connection_id_bytes[..4].copy_from_slice(&elapsed); + connection_id_bytes[4..].copy_from_slice(&hash); ConnectionId::new(i64::from_ne_bytes(connection_id_bytes)) } @@ -78,7 +78,7 @@ impl ConnectionValidator { return false; } - let tracker_elapsed = u64::from(self.start_time.elapsed().as_secs()); + let tracker_elapsed = self.start_time.elapsed().as_secs(); let client_elapsed = u64::from(u32::from_ne_bytes(elapsed)); let client_expiration_time = client_elapsed + self.max_connection_age; diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs index 36441b2..0c1dcc2 100644 --- a/crates/udp/src/workers/swarm/storage.rs +++ b/crates/udp/src/workers/swarm/storage.rs @@ -365,13 +365,14 @@ impl SmallPeerMap { self.0.retain(|(_, peer)| { let keep = peer.valid_until.valid(now); - if !keep && config.statistics.peer_clients { - if let Err(_) = - statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); - } + if !keep + && config.statistics.peer_clients + && statistics_sender + .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) + .is_err() + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); } keep @@ -480,13 +481,13 @@ impl LargePeerMap { if peer.is_seeder { self.num_seeders -= 1; } - if config.statistics.peer_clients { - if let Err(_) = - statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); - } + if config.statistics.peer_clients + && statistics_sender + .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) + .is_err() + { + // Should never happen in practice + ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); } } diff --git a/crates/udp/tests/common/mod.rs b/crates/udp/tests/common/mod.rs index 832d027..ee8e365 100644 --- a/crates/udp/tests/common/mod.rs +++ b/crates/udp/tests/common/mod.rs @@ -29,7 +29,7 @@ pub fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result ::anyhow::Result<()> { let ip = if config.server_address.is_ipv6() { Ipv6Addr::LOCALHOST.into() + } else if config.network.multiple_client_ipv4s { + Ipv4Addr::new(127, 0, 0, 1 + i).into() } else { - if config.network.multiple_client_ipv4s { - Ipv4Addr::new(127, 0, 0, 1 + i).into() - } else { - Ipv4Addr::LOCALHOST.into() - } + Ipv4Addr::LOCALHOST.into() }; let addr = SocketAddr::new(ip, port); diff --git a/crates/udp_protocol/src/common.rs b/crates/udp_protocol/src/common.rs index a0043df..6c54cb2 100644 --- a/crates/udp_protocol/src/common.rs +++ b/crates/udp_protocol/src/common.rs @@ -104,15 +104,15 @@ pub struct Ipv4AddrBytes(pub [u8; 4]); impl Ip for Ipv4AddrBytes {} -impl Into for Ipv4AddrBytes { - fn into(self) -> Ipv4Addr { - Ipv4Addr::from(self.0) +impl From for Ipv4Addr { + fn from(val: Ipv4AddrBytes) -> Self { + Ipv4Addr::from(val.0) } } -impl Into for Ipv4Addr { - fn into(self) -> Ipv4AddrBytes { - Ipv4AddrBytes(self.octets()) +impl From for Ipv4AddrBytes { + fn from(val: Ipv4Addr) -> Self { + Ipv4AddrBytes(val.octets()) } } @@ -122,15 +122,15 @@ pub struct Ipv6AddrBytes(pub [u8; 16]); impl Ip for Ipv6AddrBytes {} -impl Into for Ipv6AddrBytes { - fn into(self) -> Ipv6Addr { - Ipv6Addr::from(self.0) +impl From for Ipv6Addr { + fn from(val: Ipv6AddrBytes) -> Self { + Ipv6Addr::from(val.0) } } -impl Into for Ipv6Addr { - fn into(self) -> Ipv6AddrBytes { - Ipv6AddrBytes(self.octets()) +impl From for Ipv6AddrBytes { + fn from(val: Ipv6Addr) -> Self { + Ipv6AddrBytes(val.octets()) } } diff --git a/crates/udp_protocol/src/response.rs b/crates/udp_protocol/src/response.rs index 882a928..4e3353f 100644 --- a/crates/udp_protocol/src/response.rs +++ b/crates/udp_protocol/src/response.rs @@ -85,7 +85,7 @@ impl Response { // Error 3 => { let transaction_id = read_i32_ne(&mut bytes).map(TransactionId)?; - let message = String::from_utf8_lossy(&bytes).into_owned().into(); + let message = String::from_utf8_lossy(bytes).into_owned().into(); Ok((ErrorResponse { transaction_id, diff --git a/crates/ws/src/common.rs b/crates/ws/src/common.rs index 3240546..aced74a 100644 --- a/crates/ws/src/common.rs +++ b/crates/ws/src/common.rs @@ -57,12 +57,12 @@ pub struct OutMessageMeta { pub pending_scrape_id: Option, } -impl Into for InMessageMeta { - fn into(self) -> OutMessageMeta { +impl From for OutMessageMeta { + fn from(val: InMessageMeta) -> Self { OutMessageMeta { - out_message_consumer_id: self.out_message_consumer_id, - connection_id: self.connection_id, - pending_scrape_id: self.pending_scrape_id, + out_message_consumer_id: val.out_message_consumer_id, + connection_id: val.connection_id, + pending_scrape_id: val.pending_scrape_id, } } } diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index 82d93ef..5250b79 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -77,7 +77,7 @@ impl ConnectionRunner { clean_up_data.before_open(); let config = self.config.clone(); - let connection_id = self.connection_id.clone(); + let connection_id = self.connection_id; race( async { @@ -608,7 +608,7 @@ impl ConnectionCleanupData { let mut announced_info_hashes = HashMap::new(); for (info_hash, peer_id) in self.announced_info_hashes.take().into_iter() { - let consumer_index = calculate_in_message_consumer_index(&config, info_hash); + let consumer_index = calculate_in_message_consumer_index(config, info_hash); announced_info_hashes .entry(consumer_index) diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index 7788d12..7863abe 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -140,7 +140,7 @@ async fn handle_request_stream( match in_message { InMessage::AnnounceRequest(request) => { torrents.borrow_mut().handle_announce_request( - &config, + config, &mut rng.borrow_mut(), &mut out_messages, server_start_instant, @@ -150,7 +150,7 @@ async fn handle_request_stream( } InMessage::ScrapeRequest(request) => torrents .borrow_mut() - .handle_scrape_request(&config, &mut out_messages, meta, request), + .handle_scrape_request(config, &mut out_messages, meta, request), }; for (meta, out_message) in out_messages { diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 37cf153..d579ad9 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -248,7 +248,11 @@ impl TorrentMaps { regarding_offer_id: offer_id, }; - if let Some(_) = answer_receiver.expecting_answers.remove(&expecting_answer) { + if answer_receiver + .expecting_answers + .remove(&expecting_answer) + .is_some() + { let answer_out_message = AnswerOutMessage { action: AnnounceAction::Announce, peer_id: request.peer_id, @@ -426,13 +430,11 @@ impl TorrentMaps { #[cfg(feature = "metrics")] self.peers_gauge_ipv4.decrement(1.0); } - } else { - if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) { - torrent_data.remove_peer(peer_id); + } else if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) { + torrent_data.remove_peer(peer_id); - #[cfg(feature = "metrics")] - self.peers_gauge_ipv6.decrement(1.0); - } + #[cfg(feature = "metrics")] + self.peers_gauge_ipv6.decrement(1.0); } } } diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index 55da243..f4e1744 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -312,7 +312,7 @@ pub fn random_request_type(config: &Config, rng: &mut impl Rng) -> RequestType { let items = [RequestType::Announce, RequestType::Scrape]; - let dist = WeightedIndex::new(&weights).expect("random request weighted index"); + let dist = WeightedIndex::new(weights).expect("random request weighted index"); items[dist.sample(rng)] } diff --git a/crates/ws_protocol/src/common.rs b/crates/ws_protocol/src/common.rs index f465f04..5c0a430 100644 --- a/crates/ws_protocol/src/common.rs +++ b/crates/ws_protocol/src/common.rs @@ -156,7 +156,7 @@ mod tests { assert!(bytes.len() == 20); - arr.copy_from_slice(&bytes[..]); + arr.copy_from_slice(bytes); InfoHash(arr) } diff --git a/crates/ws_protocol/src/lib.rs b/crates/ws_protocol/src/lib.rs index c61681a..6a4ed40 100644 --- a/crates/ws_protocol/src/lib.rs +++ b/crates/ws_protocol/src/lib.rs @@ -270,7 +270,7 @@ mod tests { assert!(bytes.len() == 20); - arr.copy_from_slice(&bytes[..]); + arr.copy_from_slice(bytes); InfoHash(arr) } @@ -372,8 +372,6 @@ mod tests { let success = info_hashes == deserialized; - if !success {} - success } } From 9d1bba5e922adfac83802dd6ed644b66c31d6503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:08:53 +0100 Subject: [PATCH 08/13] udp: fix/silence clippy warnings --- crates/udp/src/lib.rs | 11 +- crates/udp/src/workers/socket/mio.rs | 1 + crates/udp/src/workers/socket/mod.rs | 1 + crates/udp/src/workers/socket/storage.rs | 7 +- crates/udp/src/workers/socket/uring/mod.rs | 25 ++- .../src/workers/socket/uring/recv_helper.rs | 1 + .../src/workers/socket/uring/send_buffers.rs | 2 +- crates/udp/src/workers/swarm/mod.rs | 212 +++++++++--------- 8 files changed, 137 insertions(+), 123 deletions(-) diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 1a05df5..50a0439 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -22,6 +22,7 @@ use common::{ }; use config::Config; use workers::socket::ConnectionValidator; +use workers::swarm::SwarmWorker; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -79,16 +80,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SwarmWorker(i), ); - workers::swarm::run_swarm_worker( - sentinel, + let mut worker = SwarmWorker { + _sentinel: sentinel, config, state, server_start_instant, request_receiver, response_sender, statistics_sender, - SwarmWorkerIndex(i), - ) + worker_index: SwarmWorkerIndex(i), + }; + + worker.run(); }) .with_context(|| "spawn swarm worker")?; } diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index beede61..070e00b 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -49,6 +49,7 @@ pub struct SocketWorker { } impl SocketWorker { + #[allow(clippy::too_many_arguments)] pub fn run( _sentinel: PanicSentinel, shared_state: State, diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index b683e13..6889c79 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -36,6 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8; /// - 8 bit udp header const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8; +#[allow(clippy::too_many_arguments)] pub fn run_socket_worker( sentinel: PanicSentinel, shared_state: State, diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs index f6cf088..84c11a7 100644 --- a/crates/udp/src/workers/socket/storage.rs +++ b/crates/udp/src/workers/socket/storage.rs @@ -130,9 +130,10 @@ mod tests { return TestResult::discard(); } - let mut config = Config::default(); - - config.swarm_workers = swarm_workers as usize; + let config = Config { + swarm_workers: swarm_workers as usize, + ..Default::default() + }; let valid_until = ValidUntil::new(ServerStartInstant::new(), 1); diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index d41aecb..43c78c9 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -96,6 +96,7 @@ pub struct SocketWorker { } impl SocketWorker { + #[allow(clippy::too_many_arguments)] pub fn run( _sentinel: PanicSentinel, shared_state: State, @@ -136,7 +137,7 @@ impl SocketWorker { .build() .unwrap(); - let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); + let recv_sqe = recv_helper.create_entry(buf_ring.bgid()); // This timeout enables regular updates of pending_scrape_valid_until // and wakes the main loop to send any pending responses in the case @@ -209,7 +210,7 @@ impl SocketWorker { // Enqueue local responses for _ in 0..sq_space { if let Some((response, addr)) = self.local_responses.pop_front() { - match self.send_buffers.prepare_entry(response.into(), addr) { + match self.send_buffers.prepare_entry(response, addr) { Ok(entry) => { unsafe { ring.submission().push(&entry).unwrap() }; @@ -471,11 +472,11 @@ impl SocketWorker { let worker_index = SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash); - if let Err(_) = self.request_sender.try_send_to( - worker_index, - ConnectedRequest::Announce(request), - src, - ) { + if self + .request_sender + .try_send_to(worker_index, ConnectedRequest::Announce(request), src) + .is_err() + { ::log::warn!("request sender full, dropping request"); } } else { @@ -500,11 +501,11 @@ impl SocketWorker { ); for (swarm_worker_index, request) in split_requests { - if let Err(_) = self.request_sender.try_send_to( - swarm_worker_index, - ConnectedRequest::Scrape(request), - src, - ) { + if self + .request_sender + .try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src) + .is_err() + { ::log::warn!("request sender full, dropping request"); } } diff --git a/crates/udp/src/workers/socket/uring/recv_helper.rs b/crates/udp/src/workers/socket/uring/recv_helper.rs index ff6cdde..4a485f6 100644 --- a/crates/udp/src/workers/socket/uring/recv_helper.rs +++ b/crates/udp/src/workers/socket/uring/recv_helper.rs @@ -11,6 +11,7 @@ use crate::config::Config; use super::{SOCKET_IDENTIFIER, USER_DATA_RECV}; +#[allow(clippy::enum_variant_names)] pub enum Error { RecvMsgParseError, RecvMsgTruncated, diff --git a/crates/udp/src/workers/socket/uring/send_buffers.rs b/crates/udp/src/workers/socket/uring/send_buffers.rs index dec4843..458d96f 100644 --- a/crates/udp/src/workers/socket/uring/send_buffers.rs +++ b/crates/udp/src/workers/socket/uring/send_buffers.rs @@ -58,7 +58,7 @@ impl SendBuffers { self.likely_next_free_index = 0; } - pub fn prepare_entry<'a>( + pub fn prepare_entry( &mut self, response: Response, addr: CanonicalSocketAddr, diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index 9c7e00f..d989477 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -17,122 +17,128 @@ use crate::config::Config; use storage::TorrentMaps; -pub fn run_swarm_worker( - _sentinel: PanicSentinel, - config: Config, - state: State, - server_start_instant: ServerStartInstant, - request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - mut response_sender: ConnectedResponseSender, - statistics_sender: Sender, - worker_index: SwarmWorkerIndex, -) { - let mut torrents = TorrentMaps::default(); - let mut rng = SmallRng::from_entropy(); +pub struct SwarmWorker { + pub _sentinel: PanicSentinel, + pub config: Config, + pub state: State, + pub server_start_instant: ServerStartInstant, + pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, + pub response_sender: ConnectedResponseSender, + pub statistics_sender: Sender, + pub worker_index: SwarmWorkerIndex, +} - let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); +impl SwarmWorker { + pub fn run(&mut self) { + let mut torrents = TorrentMaps::default(); + let mut rng = SmallRng::from_entropy(); - let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); - let statistics_update_interval = Duration::from_secs(config.statistics.interval); + let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms); + let mut peer_valid_until = + ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age); - let mut last_cleaning = Instant::now(); - let mut last_statistics_update = Instant::now(); + let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval); + let statistics_update_interval = Duration::from_secs(self.config.statistics.interval); - let mut iter_counter = 0usize; + let mut last_cleaning = Instant::now(); + let mut last_statistics_update = Instant::now(); - loop { - if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't also do blocking - // sends in socket workers (doing both could cause a deadlock) - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - let response = torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); + let mut iter_counter = 0usize; - // It doesn't matter which socket worker receives announce responses - response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - let response = torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); + loop { + if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) { + // It is OK to block here as long as we don't also do blocking + // sends in socket workers (doing both could cause a deadlock) + match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + let response = torrents + .ipv4 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &self.config, + &self.statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + ); - // It doesn't matter which socket worker receives announce responses - response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - let response = torrents.ipv4.scrape(request); + // It doesn't matter which socket worker receives announce responses + self.response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) + .expect("swarm response channel is closed"); + } + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + let response = torrents + .ipv6 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &self.config, + &self.statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + ); - response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - let response = torrents.ipv6.scrape(request); + // It doesn't matter which socket worker receives announce responses + self.response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) + .expect("swarm response channel is closed"); + } + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + let response = torrents.ipv4.scrape(request); - response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - }; - } + self.response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) + .expect("swarm response channel is closed"); + } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + let response = torrents.ipv6.scrape(request); - // Run periodic tasks - if iter_counter % 128 == 0 { - let now = Instant::now(); - - peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); - - if now > last_cleaning + cleaning_interval { - torrents.clean_and_update_statistics( - &config, - &state, - &statistics_sender, - &state.access_list, - server_start_instant, - worker_index, - ); - - last_cleaning = now; + self.response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) + .expect("swarm response channel is closed"); + } + }; } - if config.statistics.active() - && now > last_statistics_update + statistics_update_interval - { - state.statistics_ipv4.torrents[worker_index.0] - .store(torrents.ipv4.num_torrents(), Ordering::Release); - state.statistics_ipv6.torrents[worker_index.0] - .store(torrents.ipv6.num_torrents(), Ordering::Release); - last_statistics_update = now; + // Run periodic tasks + if iter_counter % 128 == 0 { + let now = Instant::now(); + + peer_valid_until = + ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age); + + if now > last_cleaning + cleaning_interval { + torrents.clean_and_update_statistics( + &self.config, + &self.state, + &self.statistics_sender, + &self.state.access_list, + self.server_start_instant, + self.worker_index, + ); + + last_cleaning = now; + } + if self.config.statistics.active() + && now > last_statistics_update + statistics_update_interval + { + self.state.statistics_ipv4.torrents[self.worker_index.0] + .store(torrents.ipv4.num_torrents(), Ordering::Release); + self.state.statistics_ipv6.torrents[self.worker_index.0] + .store(torrents.ipv6.num_torrents(), Ordering::Release); + + last_statistics_update = now; + } } - } - iter_counter = iter_counter.wrapping_add(1); + iter_counter = iter_counter.wrapping_add(1); + } } } From c32fa7cc2bd172490eb3ddbe8ce4d43a06b839b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:27:15 +0100 Subject: [PATCH 09/13] Fix ws clippy warnings --- crates/ws/src/workers/socket/connection.rs | 9 +++++++- crates/ws/src/workers/socket/mod.rs | 1 + crates/ws/src/workers/swarm/mod.rs | 1 + crates/ws_load_test/src/network.rs | 24 +++++++++++++--------- crates/ws_protocol/src/lib.rs | 4 +--- 5 files changed, 25 insertions(+), 14 deletions(-) diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index 5250b79..00de5c6 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -41,6 +41,9 @@ use crate::workers::socket::calculate_in_message_consumer_index; #[cfg(feature = "metrics")] use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; +/// Optional second tuple field is for peer id hex representation +type PeerClientGauge = (Gauge, Option); + pub struct ConnectionRunner { pub config: Rc, pub access_list: Arc, @@ -283,6 +286,8 @@ impl ConnectionReader { } } + // Silence RefCell lint due to false positives + #[allow(clippy::await_holding_refcell_ref)] async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> { #[cfg(feature = "metrics")] self.total_announce_requests_counter.increment(1); @@ -485,6 +490,8 @@ struct ConnectionWriter { } impl ConnectionWriter { + // Silence RefCell lint due to false positives + #[allow(clippy::await_holding_refcell_ref)] async fn run_out_message_loop(&mut self) -> anyhow::Result<()> { loop { let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| { @@ -590,7 +597,7 @@ impl ConnectionWriter { struct ConnectionCleanupData { announced_info_hashes: Rc>>, ip_version: IpVersion, - opt_peer_client: Rc)>>>, + opt_peer_client: Rc>>, #[cfg(feature = "metrics")] active_connections_gauge: Gauge, } diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index 03e17c5..040fce7 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -48,6 +48,7 @@ struct ConnectionHandle { valid_until_after_tls_update: Option, } +#[allow(clippy::too_many_arguments)] pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index 7863abe..f26c34d 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -24,6 +24,7 @@ use self::storage::TorrentMaps; #[cfg(feature = "metrics")] thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } +#[allow(clippy::too_many_arguments)] pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs index f4e1744..a0aa125 100644 --- a/crates/ws_load_test/src/network.rs +++ b/crates/ws_load_test/src/network.rs @@ -149,6 +149,19 @@ impl Connection { } async fn send_message(&mut self) -> anyhow::Result<()> { + let request = self.create_request(); + + self.stream.send(request.to_ws_message()).await?; + + self.load_test_state + .statistics + .requests + .fetch_add(1, Ordering::Relaxed); + + Ok(()) + } + + fn create_request(&mut self) -> InMessage { let mut rng = self.rng.borrow_mut(); let request = match random_request_type(&self.config, &mut *rng) { @@ -226,18 +239,9 @@ impl Connection { } }; - drop(rng); - self.can_send_answer = None; - self.stream.send(request.to_ws_message()).await?; - - self.load_test_state - .statistics - .requests - .fetch_add(1, Ordering::Relaxed); - - Ok(()) + request } async fn read_message(&mut self) -> anyhow::Result<()> { diff --git a/crates/ws_protocol/src/lib.rs b/crates/ws_protocol/src/lib.rs index 6a4ed40..2652e86 100644 --- a/crates/ws_protocol/src/lib.rs +++ b/crates/ws_protocol/src/lib.rs @@ -370,8 +370,6 @@ mod tests { ::simd_json::serde::from_str(&mut json).unwrap() }; - let success = info_hashes == deserialized; - - success + info_hashes == deserialized } } From 746aa47ccebe96eec0b3e80f39cceec5e4838d26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:29:13 +0100 Subject: [PATCH 10/13] http: silence clippy warnings --- crates/http/src/workers/socket/mod.rs | 1 + crates/http_protocol/src/utils.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index 9fab395..025b4a4 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -33,6 +33,7 @@ struct ConnectionHandle { valid_until: Rc>, } +#[allow(clippy::too_many_arguments)] pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, diff --git a/crates/http_protocol/src/utils.rs b/crates/http_protocol/src/utils.rs index 51b2ac4..1668acc 100644 --- a/crates/http_protocol/src/utils.rs +++ b/crates/http_protocol/src/utils.rs @@ -280,6 +280,7 @@ mod tests { } } + #[allow(clippy::too_many_arguments)] #[quickcheck] fn test_urlencode_urldecode_20_bytes( a: u8, From 2dd3ab8682f6b9f6f2684f86e5d25e54267302eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:37:15 +0100 Subject: [PATCH 11/13] aquatic_common: fix clippy warnings --- crates/common/src/access_list.rs | 11 ++++++----- crates/common/src/cli.rs | 1 + crates/common/src/cpu_pinning.rs | 4 ++-- crates/common/src/lib.rs | 13 ++++++------- crates/common/src/privileges.rs | 16 +++++++--------- crates/common/src/rustls_config.rs | 1 + 6 files changed, 23 insertions(+), 23 deletions(-) diff --git a/crates/common/src/access_list.rs b/crates/common/src/access_list.rs index 459c5ca..5ba7ea8 100644 --- a/crates/common/src/access_list.rs +++ b/crates/common/src/access_list.rs @@ -71,7 +71,7 @@ impl AccessList { } new_list - .insert_from_line(&line) + .insert_from_line(line) .with_context(|| format!("Invalid line in access list: {}", line))?; } @@ -86,6 +86,7 @@ impl AccessList { } } + #[allow(clippy::len_without_is_empty)] pub fn len(&self) -> usize { self.0.len() } @@ -155,10 +156,10 @@ mod tests { fn test_parse_info_hash() { let f = parse_info_hash; - assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok()); - assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err()); - assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeee".into()).is_err()); - assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeö".into()).is_err()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee").is_ok()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef").is_err()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeee").is_err()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeö").is_err()); } #[test] diff --git a/crates/common/src/cli.rs b/crates/common/src/cli.rs index db18f09..d935285 100644 --- a/crates/common/src/cli.rs +++ b/crates/common/src/cli.rs @@ -47,6 +47,7 @@ impl Options { { let mut options = Options::default(); + #[allow(clippy::while_let_loop)] // False positive loop { if let Some(arg) = arg_iter.next() { match arg.as_str() { diff --git a/crates/common/src/cpu_pinning.rs b/crates/common/src/cpu_pinning.rs index 870ea59..529f15c 100644 --- a/crates/common/src/cpu_pinning.rs +++ b/crates/common/src/cpu_pinning.rs @@ -296,12 +296,12 @@ pub fn pin_current_if_configured_to( let cpu_set = core_cpu_sets .get(core_index) - .expect(&format!("get cpu set for core {}", core_index)) + .unwrap_or_else(|| panic!("get cpu set for core {}", core_index)) .to_owned(); topology .set_cpubind(cpu_set, CPUBIND_THREAD) - .expect(&format!("bind thread to core {}", core_index)); + .unwrap_or_else(|err| panic!("bind thread to core {}: {:?}", core_index, err)); ::log::info!( "Pinned worker {:?} to cpu core {}", diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 44f25f5..6d0dbb6 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -39,6 +39,7 @@ impl ValidUntil { pub struct ServerStartInstant(Instant); impl ServerStartInstant { + #[allow(clippy::new_without_default)] // I prefer ::new here pub fn new() -> Self { Self(Instant::now()) } @@ -82,13 +83,11 @@ impl Drop for PanicSentinel { if ::std::thread::panicking() { let already_triggered = self.0.fetch_or(true, Ordering::SeqCst); - if !already_triggered { - if unsafe { libc::raise(15) } == -1 { - panic!( - "Could not raise SIGTERM: {:#}", - ::std::io::Error::last_os_error() - ) - } + if !already_triggered && unsafe { libc::raise(15) } == -1 { + panic!( + "Could not raise SIGTERM: {:#}", + ::std::io::Error::last_os_error() + ) } } } diff --git a/crates/common/src/privileges.rs b/crates/common/src/privileges.rs index 907d80b..10731ca 100644 --- a/crates/common/src/privileges.rs +++ b/crates/common/src/privileges.rs @@ -48,15 +48,13 @@ impl PrivilegeDropper { } pub fn after_socket_creation(self) -> anyhow::Result<()> { - if self.config.drop_privileges { - if self.barrier.wait().is_leader() { - PrivDrop::default() - .chroot(self.config.chroot_path.clone()) - .group(self.config.group.clone()) - .user(self.config.user.clone()) - .apply() - .with_context(|| "couldn't drop privileges after socket creation")?; - } + if self.config.drop_privileges && self.barrier.wait().is_leader() { + PrivDrop::default() + .chroot(self.config.chroot_path.clone()) + .group(self.config.group.clone()) + .user(self.config.user.clone()) + .apply() + .with_context(|| "couldn't drop privileges after socket creation")?; } Ok(()) diff --git a/crates/common/src/rustls_config.rs b/crates/common/src/rustls_config.rs index 56b8336..86d2bd0 100644 --- a/crates/common/src/rustls_config.rs +++ b/crates/common/src/rustls_config.rs @@ -46,6 +46,7 @@ pub fn create_rustls_config( .next() .ok_or(anyhow::anyhow!("No private keys in file"))??; + #[allow(clippy::let_and_return)] // Using temporary variable fixes lifetime issue key }; From aeeeda1b2b61f0c20ca86b5a1b6fe48dcdd7b3b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:40:28 +0100 Subject: [PATCH 12/13] bencher: fix most clippy warnings --- crates/bencher/src/protocols/udp.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs index 5ea7eaf..48ee1bf 100644 --- a/crates/bencher/src/protocols/udp.rs +++ b/crates/bencher/src/protocols/udp.rs @@ -259,6 +259,7 @@ struct AquaticUdpRunner { } impl AquaticUdpRunner { + #[allow(clippy::new_ret_no_self)] fn new( socket_workers: usize, swarm_workers: usize, @@ -275,6 +276,7 @@ impl AquaticUdpRunner { impl ProcessRunner for AquaticUdpRunner { type Command = UdpCommand; + #[allow(clippy::field_reassign_with_default)] fn run( &self, command: &Self::Command, @@ -322,6 +324,7 @@ struct OpenTrackerUdpRunner { } impl OpenTrackerUdpRunner { + #[allow(clippy::new_ret_no_self)] fn new(workers: usize, priority: Priority) -> Rc> { Rc::new(Self { workers, priority }) } @@ -368,6 +371,7 @@ impl ProcessRunner for OpenTrackerUdpRunner { struct ChihayaUdpRunner; impl ChihayaUdpRunner { + #[allow(clippy::new_ret_no_self)] fn new() -> Rc> { Rc::new(Self {}) } @@ -426,6 +430,7 @@ struct AquaticUdpLoadTestRunner { impl ProcessRunner for AquaticUdpLoadTestRunner { type Command = UdpCommand; + #[allow(clippy::field_reassign_with_default)] fn run( &self, command: &Self::Command, From cbbfa9afef68d689b3596a6fcddd64d8a6581ddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:43:39 +0100 Subject: [PATCH 13/13] toml_config_derive: fix clippy warning --- crates/toml_config_derive/src/lib.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/crates/toml_config_derive/src/lib.rs b/crates/toml_config_derive/src/lib.rs index d84e200..89cdff7 100644 --- a/crates/toml_config_derive/src/lib.rs +++ b/crates/toml_config_derive/src/lib.rs @@ -146,21 +146,18 @@ fn extract_comment_string(attrs: Vec) -> TokenStream { } for token_tree in attr.tokens { - match token_tree { - TokenTree::Literal(literal) => { - let mut comment = format!("{}", literal); + if let TokenTree::Literal(literal) = token_tree { + let mut comment = format!("{}", literal); - // Strip leading and trailing quotation marks - comment.remove(comment.len() - 1); - comment.remove(0); + // Strip leading and trailing quotation marks + comment.remove(comment.len() - 1); + comment.remove(0); - // Add toml comment indicator - comment.insert(0, '#'); + // Add toml comment indicator + comment.insert(0, '#'); - output.push_str(&comment); - output.push('\n'); - } - _ => {} + output.push_str(&comment); + output.push('\n'); } } }