From f0dc7c19f32eddc973612ef1cd147d88d8d45548 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 13 Jan 2022 19:16:25 +0100 Subject: [PATCH] udp: show separate statistics for all response types --- aquatic_udp/src/common.rs | 10 +++- aquatic_udp/src/workers/socket.rs | 67 ++++++++++----------------- aquatic_udp/src/workers/statistics.rs | 52 +++++++++++++++------ aquatic_udp/templates/statistics.html | 40 ++++++++++++++-- aquatic_udp_protocol/src/response.rs | 8 ++-- 5 files changed, 112 insertions(+), 65 deletions(-) diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index df26a70..a32981a 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -149,7 +149,10 @@ impl PeerStatus { pub struct Statistics { pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, + pub responses_sent_connect: AtomicUsize, + pub responses_sent_announce: AtomicUsize, + pub responses_sent_scrape: AtomicUsize, + pub responses_sent_error: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, pub torrents: Vec, @@ -160,7 +163,10 @@ impl Statistics { pub fn new(num_request_workers: usize) -> Self { Self { requests_received: Default::default(), - responses_sent: Default::default(), + responses_sent_connect: Default::default(), + responses_sent_announce: Default::default(), + responses_sent_scrape: Default::default(), + responses_sent_error: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), torrents: Self::create_atomic_usize_vec(num_request_workers), diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 730c44f..5b2f07c 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -458,20 +458,12 @@ fn send_responses( pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, SocketAddr)>, ) { - let mut responses_sent_ipv4: usize = 0; - let mut responses_sent_ipv6: usize = 0; - let mut bytes_sent_ipv4: usize = 0; - let mut bytes_sent_ipv6: usize = 0; - for (response, addr) in local_responses { send_response( + state, config, socket, buffer, - &mut responses_sent_ipv4, - &mut responses_sent_ipv6, - &mut bytes_sent_ipv4, - &mut bytes_sent_ipv6, response, addr, ); @@ -486,47 +478,22 @@ fn send_responses( if let Some(response) = opt_response { send_response( + state, config, socket, buffer, - &mut responses_sent_ipv4, - &mut responses_sent_ipv6, - &mut bytes_sent_ipv4, - &mut bytes_sent_ipv6, response, addr, ); } } - - if config.statistics.active() { - state - .statistics_ipv4 - .responses_sent - .fetch_add(responses_sent_ipv4, Ordering::Release); - state - .statistics_ipv6 - .responses_sent - .fetch_add(responses_sent_ipv6, Ordering::Release); - state - .statistics_ipv4 - .bytes_sent - .fetch_add(bytes_sent_ipv4, Ordering::Release); - state - .statistics_ipv6 - .bytes_sent - .fetch_add(bytes_sent_ipv6, Ordering::Release); - } } fn send_response( + state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - responses_sent_ipv4: &mut usize, - responses_sent_ipv6: &mut usize, - bytes_sent_ipv4: &mut usize, - bytes_sent_ipv6: &mut usize, response: Response, addr: SocketAddr, ) { @@ -556,15 +523,31 @@ fn send_response( let amt = cursor.position() as usize; match socket.send_to(&cursor.get_ref()[..amt], addr) { - Ok(amt) => { - if addr_is_ipv4 { - *responses_sent_ipv4 += 1; - *bytes_sent_ipv4 += amt; + Ok(amt) if config.statistics.active() => { + let stats = if addr_is_ipv4 { + &state.statistics_ipv4 } else { - *responses_sent_ipv6 += 1; - *bytes_sent_ipv6 += amt; + &state.statistics_ipv6 + }; + + stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); + + match response { + Response::Connect(_) => { + stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); + }, + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats.responses_sent_announce.fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); + } } } + Ok(_) => {}, Err(err) => { ::log::info!("send_to error: {}", err); } diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index 52f5f4c..fccff7a 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -24,7 +24,10 @@ const STYLESHEET_CONTENTS: &str = concat!( #[derive(Clone, Copy, Debug)] struct CollectedStatistics { requests_per_second: f64, - responses_per_second: f64, + responses_per_second_connect: f64, + responses_per_second_announce: f64, + responses_per_second_scrape: f64, + responses_per_second_error: f64, bytes_received_per_second: f64, bytes_sent_per_second: f64, num_torrents: usize, @@ -33,10 +36,13 @@ struct CollectedStatistics { impl CollectedStatistics { fn from_shared(statistics: &Arc, last: &mut Instant) -> Self { - let requests_received = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64; - let responses_sent = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64; - let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64; - let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; + let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_connect = statistics.responses_sent_connect.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_announce = statistics.responses_sent_announce.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_scrape = statistics.responses_sent_scrape.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_error = statistics.responses_sent_error.fetch_and(0, Ordering::Relaxed) as f64; + let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64; + let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64; let num_torrents = Self::sum_atomic_usizes(&statistics.torrents); let num_peers = Self::sum_atomic_usizes(&statistics.peers); @@ -48,7 +54,10 @@ impl CollectedStatistics { Self { requests_per_second: requests_received / elapsed, - responses_per_second: responses_sent / elapsed, + responses_per_second_connect: responses_sent_connect / elapsed, + responses_per_second_announce: responses_sent_announce / elapsed, + responses_per_second_scrape: responses_sent_scrape / elapsed, + responses_per_second_error: responses_sent_error / elapsed, bytes_received_per_second: bytes_received / elapsed, bytes_sent_per_second: bytes_sent / elapsed, num_torrents, @@ -57,7 +66,7 @@ impl CollectedStatistics { } fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize { - values.iter().map(|n| n.load(Ordering::Acquire)).sum() + values.iter().map(|n| n.load(Ordering::Relaxed)).sum() } } @@ -66,10 +75,20 @@ impl Into for CollectedStatistics { let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0; let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0; + let responses_per_second_total = self.responses_per_second_connect + self.responses_per_second_announce + self.responses_per_second_scrape + self.responses_per_second_error; + FormattedStatistics { requests_per_second: (self.requests_per_second as usize) .to_formatted_string(&Locale::en), - responses_per_second: (self.responses_per_second as usize) + responses_per_second_total: (responses_per_second_total as usize) + .to_formatted_string(&Locale::en), + responses_per_second_connect: (self.responses_per_second_connect as usize) + .to_formatted_string(&Locale::en), + responses_per_second_announce: (self.responses_per_second_announce as usize) + .to_formatted_string(&Locale::en), + responses_per_second_scrape: (self.responses_per_second_scrape as usize) + .to_formatted_string(&Locale::en), + responses_per_second_error: (self.responses_per_second_error as usize) .to_formatted_string(&Locale::en), rx_mbits: format!("{:.2}", rx_mbits), tx_mbits: format!("{:.2}", tx_mbits), @@ -82,7 +101,11 @@ impl Into for CollectedStatistics { #[derive(Clone, Debug, Serialize)] struct FormattedStatistics { requests_per_second: String, - responses_per_second: String, + responses_per_second_total: String, + responses_per_second_connect: String, + responses_per_second_announce: String, + responses_per_second_scrape: String, + responses_per_second_error: String, rx_mbits: String, tx_mbits: String, num_torrents: String, @@ -161,10 +184,13 @@ pub fn run_statistics_worker(config: Config, state: State) { } fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) { - println!( - " requests/second: {:>10}, responses/second: {:>10}", - statistics.requests_per_second, statistics.responses_per_second - ); + println!(" requests/second: {:>10}", statistics.requests_per_second); + println!(" responses/second"); + println!(" total: {:>10}", statistics.responses_per_second_total); + println!(" connect: {:>10}", statistics.responses_per_second_connect); + println!(" announce: {:>10}", statistics.responses_per_second_announce); + println!(" scrape: {:>10}", statistics.responses_per_second_scrape); + println!(" error: {:>10}", statistics.responses_per_second_error); println!( " bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out", statistics.rx_mbits, statistics.tx_mbits, diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 1f6e808..e62f697 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -39,8 +39,24 @@ { ipv4.requests_per_second } - Responses / second - { ipv4.responses_per_second } + Total responses / second + { ipv4.responses_per_second_total } + + + Connect responses / second + { ipv4.responses_per_second_connect } + + + Announce responses / second + { ipv4.responses_per_second_announce } + + + Scrape responses / second + { ipv4.responses_per_second_scrape } + + + Error responses / second + { ipv4.responses_per_second_error } Bandwidth (RX) @@ -73,8 +89,24 @@ { ipv6.requests_per_second } - Responses / second - { ipv6.responses_per_second } + Total responses / second + { ipv6.responses_per_second_total } + + + Connect responses / second + { ipv6.responses_per_second_connect } + + + Announce responses / second + { ipv6.responses_per_second_announce } + + + Scrape responses / second + { ipv6.responses_per_second_scrape } + + + Error responses / second + { ipv6.responses_per_second_error } Bandwidth (RX) diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index 99f3afa..8e9a280 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -97,7 +97,7 @@ impl Response { /// addresses. Clients seem not to support it very well, but due to a lack /// of alternative solutions, it is implemented here. #[inline] - pub fn write(self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { Response::Connect(r) => { bytes.write_i32::(0)?; @@ -111,7 +111,7 @@ impl Response { bytes.write_i32::(r.leechers.0)?; bytes.write_i32::(r.seeders.0)?; - for peer in r.peers { + for peer in r.peers.iter() { bytes.write_all(&peer.ip_address.octets())?; bytes.write_u16::(peer.port.0)?; } @@ -120,7 +120,7 @@ impl Response { bytes.write_i32::(2)?; bytes.write_i32::(r.transaction_id.0)?; - for torrent_stat in r.torrent_stats { + for torrent_stat in r.torrent_stats.iter() { bytes.write_i32::(torrent_stat.seeders.0)?; bytes.write_i32::(torrent_stat.completed.0)?; bytes.write_i32::(torrent_stat.leechers.0)?; @@ -139,7 +139,7 @@ impl Response { bytes.write_i32::(r.leechers.0)?; bytes.write_i32::(r.seeders.0)?; - for peer in r.peers { + for peer in r.peers.iter() { bytes.write_all(&peer.ip_address.octets())?; bytes.write_u16::(peer.port.0)?; }