From 9d1bba5e922adfac83802dd6ed644b66c31d6503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 20 Jan 2024 10:08:53 +0100 Subject: [PATCH] udp: fix/silence clippy warnings --- crates/udp/src/lib.rs | 11 +- crates/udp/src/workers/socket/mio.rs | 1 + crates/udp/src/workers/socket/mod.rs | 1 + crates/udp/src/workers/socket/storage.rs | 7 +- crates/udp/src/workers/socket/uring/mod.rs | 25 ++- .../src/workers/socket/uring/recv_helper.rs | 1 + .../src/workers/socket/uring/send_buffers.rs | 2 +- crates/udp/src/workers/swarm/mod.rs | 212 +++++++++--------- 8 files changed, 137 insertions(+), 123 deletions(-) diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 1a05df5..50a0439 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -22,6 +22,7 @@ use common::{ }; use config::Config; use workers::socket::ConnectionValidator; +use workers::swarm::SwarmWorker; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -79,16 +80,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SwarmWorker(i), ); - workers::swarm::run_swarm_worker( - sentinel, + let mut worker = SwarmWorker { + _sentinel: sentinel, config, state, server_start_instant, request_receiver, response_sender, statistics_sender, - SwarmWorkerIndex(i), - ) + worker_index: SwarmWorkerIndex(i), + }; + + worker.run(); }) .with_context(|| "spawn swarm worker")?; } diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index beede61..070e00b 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -49,6 +49,7 @@ pub struct SocketWorker { } impl SocketWorker { + #[allow(clippy::too_many_arguments)] pub fn run( _sentinel: PanicSentinel, shared_state: State, diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs index b683e13..6889c79 100644 --- a/crates/udp/src/workers/socket/mod.rs +++ b/crates/udp/src/workers/socket/mod.rs @@ -36,6 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8; /// - 8 bit udp header const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8; +#[allow(clippy::too_many_arguments)] pub fn run_socket_worker( sentinel: PanicSentinel, shared_state: State, diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs index f6cf088..84c11a7 100644 --- a/crates/udp/src/workers/socket/storage.rs +++ b/crates/udp/src/workers/socket/storage.rs @@ -130,9 +130,10 @@ mod tests { return TestResult::discard(); } - let mut config = Config::default(); - - config.swarm_workers = swarm_workers as usize; + let config = Config { + swarm_workers: swarm_workers as usize, + ..Default::default() + }; let valid_until = ValidUntil::new(ServerStartInstant::new(), 1); diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index d41aecb..43c78c9 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -96,6 +96,7 @@ pub struct SocketWorker { } impl SocketWorker { + #[allow(clippy::too_many_arguments)] pub fn run( _sentinel: PanicSentinel, shared_state: State, @@ -136,7 +137,7 @@ impl SocketWorker { .build() .unwrap(); - let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap()); + let recv_sqe = recv_helper.create_entry(buf_ring.bgid()); // This timeout enables regular updates of pending_scrape_valid_until // and wakes the main loop to send any pending responses in the case @@ -209,7 +210,7 @@ impl SocketWorker { // Enqueue local responses for _ in 0..sq_space { if let Some((response, addr)) = self.local_responses.pop_front() { - match self.send_buffers.prepare_entry(response.into(), addr) { + match self.send_buffers.prepare_entry(response, addr) { Ok(entry) => { unsafe { ring.submission().push(&entry).unwrap() }; @@ -471,11 +472,11 @@ impl SocketWorker { let worker_index = SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash); - if let Err(_) = self.request_sender.try_send_to( - worker_index, - ConnectedRequest::Announce(request), - src, - ) { + if self + .request_sender + .try_send_to(worker_index, ConnectedRequest::Announce(request), src) + .is_err() + { ::log::warn!("request sender full, dropping request"); } } else { @@ -500,11 +501,11 @@ impl SocketWorker { ); for (swarm_worker_index, request) in split_requests { - if let Err(_) = self.request_sender.try_send_to( - swarm_worker_index, - ConnectedRequest::Scrape(request), - src, - ) { + if self + .request_sender + .try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src) + .is_err() + { ::log::warn!("request sender full, dropping request"); } } diff --git a/crates/udp/src/workers/socket/uring/recv_helper.rs b/crates/udp/src/workers/socket/uring/recv_helper.rs index ff6cdde..4a485f6 100644 --- a/crates/udp/src/workers/socket/uring/recv_helper.rs +++ b/crates/udp/src/workers/socket/uring/recv_helper.rs @@ -11,6 +11,7 @@ use crate::config::Config; use super::{SOCKET_IDENTIFIER, USER_DATA_RECV}; +#[allow(clippy::enum_variant_names)] pub enum Error { RecvMsgParseError, RecvMsgTruncated, diff --git a/crates/udp/src/workers/socket/uring/send_buffers.rs b/crates/udp/src/workers/socket/uring/send_buffers.rs index dec4843..458d96f 100644 --- a/crates/udp/src/workers/socket/uring/send_buffers.rs +++ b/crates/udp/src/workers/socket/uring/send_buffers.rs @@ -58,7 +58,7 @@ impl SendBuffers { self.likely_next_free_index = 0; } - pub fn prepare_entry<'a>( + pub fn prepare_entry( &mut self, response: Response, addr: CanonicalSocketAddr, diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index 9c7e00f..d989477 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -17,122 +17,128 @@ use crate::config::Config; use storage::TorrentMaps; -pub fn run_swarm_worker( - _sentinel: PanicSentinel, - config: Config, - state: State, - server_start_instant: ServerStartInstant, - request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - mut response_sender: ConnectedResponseSender, - statistics_sender: Sender, - worker_index: SwarmWorkerIndex, -) { - let mut torrents = TorrentMaps::default(); - let mut rng = SmallRng::from_entropy(); +pub struct SwarmWorker { + pub _sentinel: PanicSentinel, + pub config: Config, + pub state: State, + pub server_start_instant: ServerStartInstant, + pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, + pub response_sender: ConnectedResponseSender, + pub statistics_sender: Sender, + pub worker_index: SwarmWorkerIndex, +} - let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); +impl SwarmWorker { + pub fn run(&mut self) { + let mut torrents = TorrentMaps::default(); + let mut rng = SmallRng::from_entropy(); - let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); - let statistics_update_interval = Duration::from_secs(config.statistics.interval); + let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms); + let mut peer_valid_until = + ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age); - let mut last_cleaning = Instant::now(); - let mut last_statistics_update = Instant::now(); + let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval); + let statistics_update_interval = Duration::from_secs(self.config.statistics.interval); - let mut iter_counter = 0usize; + let mut last_cleaning = Instant::now(); + let mut last_statistics_update = Instant::now(); - loop { - if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't also do blocking - // sends in socket workers (doing both could cause a deadlock) - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - let response = torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); + let mut iter_counter = 0usize; - // It doesn't matter which socket worker receives announce responses - response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - let response = torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - ); + loop { + if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) { + // It is OK to block here as long as we don't also do blocking + // sends in socket workers (doing both could cause a deadlock) + match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + let response = torrents + .ipv4 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &self.config, + &self.statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + ); - // It doesn't matter which socket worker receives announce responses - response_sender - .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - let response = torrents.ipv4.scrape(request); + // It doesn't matter which socket worker receives announce responses + self.response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv4(response)) + .expect("swarm response channel is closed"); + } + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + let response = torrents + .ipv6 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &self.config, + &self.statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + ); - response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - let response = torrents.ipv6.scrape(request); + // It doesn't matter which socket worker receives announce responses + self.response_sender + .send_to_any(src, ConnectedResponse::AnnounceIpv6(response)) + .expect("swarm response channel is closed"); + } + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + let response = torrents.ipv4.scrape(request); - response_sender - .send_to(sender_index, src, ConnectedResponse::Scrape(response)) - .expect("swarm response channel is closed"); - } - }; - } + self.response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) + .expect("swarm response channel is closed"); + } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + let response = torrents.ipv6.scrape(request); - // Run periodic tasks - if iter_counter % 128 == 0 { - let now = Instant::now(); - - peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); - - if now > last_cleaning + cleaning_interval { - torrents.clean_and_update_statistics( - &config, - &state, - &statistics_sender, - &state.access_list, - server_start_instant, - worker_index, - ); - - last_cleaning = now; + self.response_sender + .send_to(sender_index, src, ConnectedResponse::Scrape(response)) + .expect("swarm response channel is closed"); + } + }; } - if config.statistics.active() - && now > last_statistics_update + statistics_update_interval - { - state.statistics_ipv4.torrents[worker_index.0] - .store(torrents.ipv4.num_torrents(), Ordering::Release); - state.statistics_ipv6.torrents[worker_index.0] - .store(torrents.ipv6.num_torrents(), Ordering::Release); - last_statistics_update = now; + // Run periodic tasks + if iter_counter % 128 == 0 { + let now = Instant::now(); + + peer_valid_until = + ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age); + + if now > last_cleaning + cleaning_interval { + torrents.clean_and_update_statistics( + &self.config, + &self.state, + &self.statistics_sender, + &self.state.access_list, + self.server_start_instant, + self.worker_index, + ); + + last_cleaning = now; + } + if self.config.statistics.active() + && now > last_statistics_update + statistics_update_interval + { + self.state.statistics_ipv4.torrents[self.worker_index.0] + .store(torrents.ipv4.num_torrents(), Ordering::Release); + self.state.statistics_ipv6.torrents[self.worker_index.0] + .store(torrents.ipv6.num_torrents(), Ordering::Release); + + last_statistics_update = now; + } } - } - iter_counter = iter_counter.wrapping_add(1); + iter_counter = iter_counter.wrapping_add(1); + } } }