From 00c4e74374a929b9575b0b6f96745b151fa792a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 13 Jan 2022 18:35:41 +0100 Subject: [PATCH 1/3] udp: use slab for pending scrape responses to fix potential issue Peers sometimes send multiple scrape requests with the same transaction id, which would previously cause warnings due to replacing the PendingScrapeMapEntry and later not finding it. --- aquatic_udp/src/common.rs | 6 +- aquatic_udp/src/workers/request.rs | 3 +- aquatic_udp/src/workers/socket.rs | 89 ++++++++++++------------------ aquatic_udp_bench/src/scrape.rs | 3 +- 4 files changed, 39 insertions(+), 62 deletions(-) diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 0b56c17..df26a70 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -31,15 +31,13 @@ impl Ip for Ipv6Addr { #[derive(Debug)] pub struct PendingScrapeRequest { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub info_hashes: BTreeMap, } #[derive(Debug)] pub struct PendingScrapeResponse { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub torrent_stats: BTreeMap, } diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index d3158c8..92f4319 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -391,8 +391,7 @@ fn handle_scrape_request( } PendingScrapeResponse { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: request.slab_key, torrent_stats, } } diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 2ba7a23..730c44f 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -12,6 +12,7 @@ use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{Rng, SeedableRng, StdRng}; +use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; @@ -49,23 +50,23 @@ impl ConnectionMap { } #[derive(Debug)] -pub struct PendingScrapeResponseMapEntry { +pub struct PendingScrapeResponseSlabEntry { num_pending: usize, valid_until: ValidUntil, torrent_stats: BTreeMap, + transaction_id: TransactionId, } #[derive(Default)] -pub struct PendingScrapeResponseMap( - AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>, +pub struct PendingScrapeResponseSlab( + Slab, ); -impl PendingScrapeResponseMap { +impl PendingScrapeResponseSlab { pub fn prepare_split_requests( &mut self, config: &Config, request: ScrapeRequest, - addr: SocketAddr, valid_until: ValidUntil, ) -> impl IntoIterator { let mut split_requests: AHashIndexMap = @@ -73,54 +74,38 @@ impl PendingScrapeResponseMap { if request.info_hashes.is_empty() { ::log::warn!( - "Attempted to prepare PendingScrapeResponseMap entry with zero info hashes" + "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" ); return split_requests; } - let connection_id = request.connection_id; - let transaction_id = request.transaction_id; + let vacant_entry = self.0.vacant_entry(); + let slab_key = vacant_entry.key(); for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { - connection_id, - transaction_id, + slab_key, info_hashes: BTreeMap::new(), }); split_request.info_hashes.insert(i, info_hash); } - let key = (connection_id, transaction_id, addr); - - let entry = PendingScrapeResponseMapEntry { + vacant_entry.insert(PendingScrapeResponseSlabEntry { num_pending: split_requests.len(), valid_until, torrent_stats: Default::default(), - }; - - if let Some(previous_entry) = self.0.insert(key, entry) { - ::log::warn!( - "PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}", - previous_entry, - key - ); - } + transaction_id: request.transaction_id, + }); split_requests } - pub fn add_and_get_finished( - &mut self, - response: PendingScrapeResponse, - addr: SocketAddr, - ) -> Option { - let key = (response.connection_id, response.transaction_id, addr); - - let finished = if let Some(entry) = self.0.get_mut(&key) { + pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { + let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { entry.num_pending -= 1; entry @@ -130,18 +115,18 @@ impl PendingScrapeResponseMap { entry.num_pending == 0 } else { ::log::warn!( - "PendingScrapeResponseMap.add didn't find entry for key {:?}", - key + "PendingScrapeResponseSlab.add didn't find entry for key {:?}", + response.slab_key ); false }; if finished { - let entry = self.0.remove(&key).unwrap(); + let entry = self.0.remove(response.slab_key); Some(Response::Scrape(ScrapeResponse { - transaction_id: response.transaction_id, + transaction_id: entry.transaction_id, torrent_stats: entry.torrent_stats.into_values().collect(), })) } else { @@ -157,7 +142,7 @@ impl PendingScrapeResponseMap { if !keep { ::log::warn!( - "Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}", + "Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}", k, v ); @@ -193,7 +178,7 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); - let mut pending_scrape_responses = PendingScrapeResponseMap::default(); + let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); @@ -278,7 +263,7 @@ fn read_requests( config: &Config, state: &State, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, socket: &mut UdpSocket, @@ -375,7 +360,7 @@ fn read_requests( pub fn handle_request( config: &Config, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, request_sender: &ConnectedRequestSender, @@ -429,7 +414,6 @@ pub fn handle_request( let split_requests = pending_scrape_responses.prepare_split_requests( config, request, - src, pending_scrape_valid_until, ); @@ -471,7 +455,7 @@ fn send_responses( socket: &mut UdpSocket, buffer: &mut [u8], response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, SocketAddr)>, ) { let mut responses_sent_ipv4: usize = 0; @@ -495,7 +479,7 @@ fn send_responses( for (response, addr) in response_receiver.try_iter() { let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr), + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), }; @@ -638,7 +622,7 @@ mod tests { #[quickcheck] fn test_pending_scrape_response_map( - request_data: Vec<(i32, i64, SocketAddr, u8)>, + request_data: Vec<(i32, i64, u8)>, request_workers: u8, ) -> TestResult { if request_workers == 0 { @@ -651,11 +635,11 @@ mod tests { let valid_until = ValidUntil::new(1); - let mut map = PendingScrapeResponseMap::default(); + let mut map = PendingScrapeResponseSlab::default(); let mut requests = Vec::new(); - for (t, c, a, b) in request_data { + for (t, c, b) in request_data { if b == 0 { return TestResult::discard(); } @@ -674,32 +658,30 @@ mod tests { info_hashes, }; - requests.push((request, a)); + requests.push(request); } let mut all_split_requests = Vec::new(); - for (request, addr) in requests.iter() { + for request in requests.iter() { let split_requests = map.prepare_split_requests( &config, request.to_owned(), - addr.to_owned(), valid_until, ); - all_split_requests.push(( - addr, + all_split_requests.push( split_requests .into_iter() .collect::>(), - )); + ); } assert_eq!(map.0.len(), requests.len()); let mut responses = Vec::new(); - for (addr, split_requests) in all_split_requests { + for split_requests in all_split_requests { for (worker_index, split_request) in split_requests { assert!(worker_index.0 < request_workers as usize); @@ -719,12 +701,11 @@ mod tests { .collect(); let response = PendingScrapeResponse { - transaction_id: split_request.transaction_id, - connection_id: split_request.connection_id, + slab_key: split_request.slab_key, torrent_stats, }; - if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) { + if let Some(response) = map.add_and_get_finished(response) { responses.push(response); } } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 09fe3e5..1062ade 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -41,8 +41,7 @@ pub fn bench_scrape_handler( for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { let request = ConnectedRequest::Scrape(PendingScrapeRequest { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: 0, info_hashes: request .info_hashes .clone() From f0dc7c19f32eddc973612ef1cd147d88d8d45548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 13 Jan 2022 19:16:25 +0100 Subject: [PATCH 2/3] udp: show separate statistics for all response types --- aquatic_udp/src/common.rs | 10 +++- aquatic_udp/src/workers/socket.rs | 67 ++++++++++----------------- aquatic_udp/src/workers/statistics.rs | 52 +++++++++++++++------ aquatic_udp/templates/statistics.html | 40 ++++++++++++++-- aquatic_udp_protocol/src/response.rs | 8 ++-- 5 files changed, 112 insertions(+), 65 deletions(-) diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index df26a70..a32981a 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -149,7 +149,10 @@ impl PeerStatus { pub struct Statistics { pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, + pub responses_sent_connect: AtomicUsize, + pub responses_sent_announce: AtomicUsize, + pub responses_sent_scrape: AtomicUsize, + pub responses_sent_error: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, pub torrents: Vec, @@ -160,7 +163,10 @@ impl Statistics { pub fn new(num_request_workers: usize) -> Self { Self { requests_received: Default::default(), - responses_sent: Default::default(), + responses_sent_connect: Default::default(), + responses_sent_announce: Default::default(), + responses_sent_scrape: Default::default(), + responses_sent_error: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), torrents: Self::create_atomic_usize_vec(num_request_workers), diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 730c44f..5b2f07c 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -458,20 +458,12 @@ fn send_responses( pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, SocketAddr)>, ) { - let mut responses_sent_ipv4: usize = 0; - let mut responses_sent_ipv6: usize = 0; - let mut bytes_sent_ipv4: usize = 0; - let mut bytes_sent_ipv6: usize = 0; - for (response, addr) in local_responses { send_response( + state, config, socket, buffer, - &mut responses_sent_ipv4, - &mut responses_sent_ipv6, - &mut bytes_sent_ipv4, - &mut bytes_sent_ipv6, response, addr, ); @@ -486,47 +478,22 @@ fn send_responses( if let Some(response) = opt_response { send_response( + state, config, socket, buffer, - &mut responses_sent_ipv4, - &mut responses_sent_ipv6, - &mut bytes_sent_ipv4, - &mut bytes_sent_ipv6, response, addr, ); } } - - if config.statistics.active() { - state - .statistics_ipv4 - .responses_sent - .fetch_add(responses_sent_ipv4, Ordering::Release); - state - .statistics_ipv6 - .responses_sent - .fetch_add(responses_sent_ipv6, Ordering::Release); - state - .statistics_ipv4 - .bytes_sent - .fetch_add(bytes_sent_ipv4, Ordering::Release); - state - .statistics_ipv6 - .bytes_sent - .fetch_add(bytes_sent_ipv6, Ordering::Release); - } } fn send_response( + state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - responses_sent_ipv4: &mut usize, - responses_sent_ipv6: &mut usize, - bytes_sent_ipv4: &mut usize, - bytes_sent_ipv6: &mut usize, response: Response, addr: SocketAddr, ) { @@ -556,15 +523,31 @@ fn send_response( let amt = cursor.position() as usize; match socket.send_to(&cursor.get_ref()[..amt], addr) { - Ok(amt) => { - if addr_is_ipv4 { - *responses_sent_ipv4 += 1; - *bytes_sent_ipv4 += amt; + Ok(amt) if config.statistics.active() => { + let stats = if addr_is_ipv4 { + &state.statistics_ipv4 } else { - *responses_sent_ipv6 += 1; - *bytes_sent_ipv6 += amt; + &state.statistics_ipv6 + }; + + stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); + + match response { + Response::Connect(_) => { + stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); + }, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats.responses_sent_announce.fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); + } } } + Ok(_) => {}, Err(err) => { ::log::info!("send_to error: {}", err); } diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index 52f5f4c..fccff7a 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -24,7 +24,10 @@ const STYLESHEET_CONTENTS: &str = concat!( #[derive(Clone, Copy, Debug)] struct CollectedStatistics { requests_per_second: f64, - responses_per_second: f64, + responses_per_second_connect: f64, + responses_per_second_announce: f64, + responses_per_second_scrape: f64, + responses_per_second_error: f64, bytes_received_per_second: f64, bytes_sent_per_second: f64, num_torrents: usize, @@ -33,10 +36,13 @@ struct CollectedStatistics { impl CollectedStatistics { fn from_shared(statistics: &Arc, last: &mut Instant) -> Self { - let requests_received = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64; - let responses_sent = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64; - let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64; - let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; + let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_connect = statistics.responses_sent_connect.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_announce = statistics.responses_sent_announce.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_scrape = statistics.responses_sent_scrape.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_error = statistics.responses_sent_error.fetch_and(0, Ordering::Relaxed) as f64; + let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64; + let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64; let num_torrents = Self::sum_atomic_usizes(&statistics.torrents); let num_peers = Self::sum_atomic_usizes(&statistics.peers); @@ -48,7 +54,10 @@ impl CollectedStatistics { Self { requests_per_second: requests_received / elapsed, - responses_per_second: responses_sent / elapsed, + responses_per_second_connect: responses_sent_connect / elapsed, + responses_per_second_announce: responses_sent_announce / elapsed, + responses_per_second_scrape: responses_sent_scrape / elapsed, + responses_per_second_error: responses_sent_error / elapsed, bytes_received_per_second: bytes_received / elapsed, bytes_sent_per_second: bytes_sent / elapsed, num_torrents, @@ -57,7 +66,7 @@ impl CollectedStatistics { } fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize { - values.iter().map(|n| n.load(Ordering::Acquire)).sum() + values.iter().map(|n| n.load(Ordering::Relaxed)).sum() } } @@ -66,10 +75,20 @@ impl Into for CollectedStatistics { let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0; let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0; + let responses_per_second_total = self.responses_per_second_connect + self.responses_per_second_announce + self.responses_per_second_scrape + self.responses_per_second_error; + FormattedStatistics { requests_per_second: (self.requests_per_second as usize) .to_formatted_string(&Locale::en), - responses_per_second: (self.responses_per_second as usize) + responses_per_second_total: (responses_per_second_total as usize) + .to_formatted_string(&Locale::en), + responses_per_second_connect: (self.responses_per_second_connect as usize) + .to_formatted_string(&Locale::en), + responses_per_second_announce: (self.responses_per_second_announce as usize) + .to_formatted_string(&Locale::en), + responses_per_second_scrape: (self.responses_per_second_scrape as usize) + .to_formatted_string(&Locale::en), + responses_per_second_error: (self.responses_per_second_error as usize) .to_formatted_string(&Locale::en), rx_mbits: format!("{:.2}", rx_mbits), tx_mbits: format!("{:.2}", tx_mbits), @@ -82,7 +101,11 @@ impl Into for CollectedStatistics { #[derive(Clone, Debug, Serialize)] struct FormattedStatistics { requests_per_second: String, - responses_per_second: String, + responses_per_second_total: String, + responses_per_second_connect: String, + responses_per_second_announce: String, + responses_per_second_scrape: String, + responses_per_second_error: String, rx_mbits: String, tx_mbits: String, num_torrents: String, @@ -161,10 +184,13 @@ pub fn run_statistics_worker(config: Config, state: State) { } fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) { - println!( - " requests/second: {:>10}, responses/second: {:>10}", - statistics.requests_per_second, statistics.responses_per_second - ); + println!(" requests/second: {:>10}", statistics.requests_per_second); + println!(" responses/second"); + println!(" total: {:>10}", statistics.responses_per_second_total); + println!(" connect: {:>10}", statistics.responses_per_second_connect); + println!(" announce: {:>10}", statistics.responses_per_second_announce); + println!(" scrape: {:>10}", statistics.responses_per_second_scrape); + println!(" error: {:>10}", statistics.responses_per_second_error); println!( " bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out", statistics.rx_mbits, statistics.tx_mbits, diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 1f6e808..e62f697 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -39,8 +39,24 @@ { ipv4.requests_per_second } - Responses / second - { ipv4.responses_per_second } + Total responses / second + { ipv4.responses_per_second_total } + + + Connect responses / second + { ipv4.responses_per_second_connect } + + + Announce responses / second + { ipv4.responses_per_second_announce } + + + Scrape responses / second + { ipv4.responses_per_second_scrape } + + + Error responses / second + { ipv4.responses_per_second_error } Bandwidth (RX) @@ -73,8 +89,24 @@ { ipv6.requests_per_second } - Responses / second - { ipv6.responses_per_second } + Total responses / second + { ipv6.responses_per_second_total } + + + Connect responses / second + { ipv6.responses_per_second_connect } + + + Announce responses / second + { ipv6.responses_per_second_announce } + + + Scrape responses / second + { ipv6.responses_per_second_scrape } + + + Error responses / second + { ipv6.responses_per_second_error } Bandwidth (RX) diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index 99f3afa..8e9a280 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -97,7 +97,7 @@ impl Response { /// addresses. Clients seem not to support it very well, but due to a lack /// of alternative solutions, it is implemented here. #[inline] - pub fn write(self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { Response::Connect(r) => { bytes.write_i32::(0)?; @@ -111,7 +111,7 @@ impl Response { bytes.write_i32::(r.leechers.0)?; bytes.write_i32::(r.seeders.0)?; - for peer in r.peers { + for peer in r.peers.iter() { bytes.write_all(&peer.ip_address.octets())?; bytes.write_u16::(peer.port.0)?; } @@ -120,7 +120,7 @@ impl Response { bytes.write_i32::(2)?; bytes.write_i32::(r.transaction_id.0)?; - for torrent_stat in r.torrent_stats { + for torrent_stat in r.torrent_stats.iter() { bytes.write_i32::(torrent_stat.seeders.0)?; bytes.write_i32::(torrent_stat.completed.0)?; bytes.write_i32::(torrent_stat.leechers.0)?; @@ -139,7 +139,7 @@ impl Response { bytes.write_i32::(r.leechers.0)?; bytes.write_i32::(r.seeders.0)?; - for peer in r.peers { + for peer in r.peers.iter() { bytes.write_all(&peer.ip_address.octets())?; bytes.write_u16::(peer.port.0)?; } From 0d3c6111ca39562d0d0a95e6a36b0963340f66c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 13 Jan 2022 19:21:53 +0100 Subject: [PATCH 3/3] udp: run cargo fmt --- aquatic_udp/src/workers/socket.rs | 37 ++++++--------------- aquatic_udp/src/workers/statistics.rs | 46 +++++++++++++++++++++------ 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 5b2f07c..130e11b 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -58,9 +58,7 @@ pub struct PendingScrapeResponseSlabEntry { } #[derive(Default)] -pub struct PendingScrapeResponseSlab( - Slab, -); +pub struct PendingScrapeResponseSlab(Slab); impl PendingScrapeResponseSlab { pub fn prepare_split_requests( @@ -459,14 +457,7 @@ fn send_responses( local_responses: Drain<(Response, SocketAddr)>, ) { for (response, addr) in local_responses { - send_response( - state, - config, - socket, - buffer, - response, - addr, - ); + send_response(state, config, socket, buffer, response, addr); } for (response, addr) in response_receiver.try_iter() { @@ -477,14 +468,7 @@ fn send_responses( }; if let Some(response) = opt_response { - send_response( - state, - config, - socket, - buffer, - response, - addr, - ); + send_response(state, config, socket, buffer, response, addr); } } } @@ -535,9 +519,11 @@ fn send_response( match response { Response::Connect(_) => { stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); - }, + } Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { - stats.responses_sent_announce.fetch_add(1, Ordering::Relaxed); + stats + .responses_sent_announce + .fetch_add(1, Ordering::Relaxed); } Response::Scrape(_) => { stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); @@ -547,7 +533,7 @@ fn send_response( } } } - Ok(_) => {}, + Ok(_) => {} Err(err) => { ::log::info!("send_to error: {}", err); } @@ -647,11 +633,8 @@ mod tests { let mut all_split_requests = Vec::new(); for request in requests.iter() { - let split_requests = map.prepare_split_requests( - &config, - request.to_owned(), - valid_until, - ); + let split_requests = + map.prepare_split_requests(&config, request.to_owned(), valid_until); all_split_requests.push( split_requests diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index fccff7a..4477662 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -37,10 +37,18 @@ struct CollectedStatistics { impl CollectedStatistics { fn from_shared(statistics: &Arc, last: &mut Instant) -> Self { let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64; - let responses_sent_connect = statistics.responses_sent_connect.fetch_and(0, Ordering::Relaxed) as f64; - let responses_sent_announce = statistics.responses_sent_announce.fetch_and(0, Ordering::Relaxed) as f64; - let responses_sent_scrape = statistics.responses_sent_scrape.fetch_and(0, Ordering::Relaxed) as f64; - let responses_sent_error = statistics.responses_sent_error.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_connect = statistics + .responses_sent_connect + .fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_announce = statistics + .responses_sent_announce + .fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_scrape = statistics + .responses_sent_scrape + .fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_error = statistics + .responses_sent_error + .fetch_and(0, Ordering::Relaxed) as f64; let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64; let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64; let num_torrents = Self::sum_atomic_usizes(&statistics.torrents); @@ -75,7 +83,10 @@ impl Into for CollectedStatistics { let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0; let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0; - let responses_per_second_total = self.responses_per_second_connect + self.responses_per_second_announce + self.responses_per_second_scrape + self.responses_per_second_error; + let responses_per_second_total = self.responses_per_second_connect + + self.responses_per_second_announce + + self.responses_per_second_scrape + + self.responses_per_second_error; FormattedStatistics { requests_per_second: (self.requests_per_second as usize) @@ -186,11 +197,26 @@ pub fn run_statistics_worker(config: Config, state: State) { fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) { println!(" requests/second: {:>10}", statistics.requests_per_second); println!(" responses/second"); - println!(" total: {:>10}", statistics.responses_per_second_total); - println!(" connect: {:>10}", statistics.responses_per_second_connect); - println!(" announce: {:>10}", statistics.responses_per_second_announce); - println!(" scrape: {:>10}", statistics.responses_per_second_scrape); - println!(" error: {:>10}", statistics.responses_per_second_error); + println!( + " total: {:>10}", + statistics.responses_per_second_total + ); + println!( + " connect: {:>10}", + statistics.responses_per_second_connect + ); + println!( + " announce: {:>10}", + statistics.responses_per_second_announce + ); + println!( + " scrape: {:>10}", + statistics.responses_per_second_scrape + ); + println!( + " error: {:>10}", + statistics.responses_per_second_error + ); println!( " bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out", statistics.rx_mbits, statistics.tx_mbits,