From 34f263f6b9593182facb8969fd35ac241edd5745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 21 Nov 2021 19:39:44 +0100 Subject: [PATCH 1/5] udp: new file structure: each worker types is submod of workers mod --- aquatic_udp/src/lib/lib.rs | 15 ++++----------- aquatic_udp/src/lib/workers/mod.rs | 3 +++ .../src/lib/{handlers.rs => workers/request.rs} | 0 .../src/lib/{network.rs => workers/socket.rs} | 0 .../src/lib/{tasks.rs => workers/statistics.rs} | 13 +++++++++++-- aquatic_udp_bench/src/main.rs | 2 +- 6 files changed, 19 insertions(+), 14 deletions(-) create mode 100644 aquatic_udp/src/lib/workers/mod.rs rename aquatic_udp/src/lib/{handlers.rs => workers/request.rs} (100%) rename aquatic_udp/src/lib/{network.rs => workers/socket.rs} (100%) rename aquatic_udp/src/lib/{tasks.rs => workers/statistics.rs} (86%) diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index f9ecb93..1e7e604 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,15 +1,12 @@ pub mod common; pub mod config; -pub mod handlers; -pub mod network; -pub mod tasks; +pub mod workers; use config::Config; use std::collections::BTreeMap; use std::sync::{atomic::AtomicUsize, Arc}; use std::thread::Builder; -use std::time::Duration; use anyhow::Context; #[cfg(feature = "cpu-pinning")] @@ -72,7 +69,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::RequestWorker(i), ); - handlers::run_request_worker( + workers::request::run_request_worker( config, state, request_receiver, @@ -101,7 +98,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i), ); - network::run_socket_worker( + workers::socket::run_socket_worker( state, config, i, @@ -127,11 +124,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Other, ); - loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::gather_and_print_statistics(&state, &config); - } + workers::statistics::run_statistics_worker(config, state); }) .with_context(|| "spawn statistics worker")?; } diff --git a/aquatic_udp/src/lib/workers/mod.rs b/aquatic_udp/src/lib/workers/mod.rs new file mode 100644 index 0000000..668fd84 --- /dev/null +++ b/aquatic_udp/src/lib/workers/mod.rs @@ -0,0 +1,3 @@ +pub mod request; +pub mod socket; +pub mod statistics; diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/workers/request.rs similarity index 100% rename from aquatic_udp/src/lib/handlers.rs rename to aquatic_udp/src/lib/workers/request.rs diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/workers/socket.rs similarity index 100% rename from aquatic_udp/src/lib/network.rs rename to aquatic_udp/src/lib/workers/socket.rs diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/workers/statistics.rs similarity index 86% rename from aquatic_udp/src/lib/tasks.rs rename to aquatic_udp/src/lib/workers/statistics.rs index 6624a5d..fc8e8d8 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/workers/statistics.rs @@ -1,9 +1,18 @@ use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; -use super::common::*; +use crate::common::*; use crate::config::Config; -pub fn gather_and_print_statistics(state: &State, config: &Config) { +pub fn run_statistics_worker(config: Config, state: State) { + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + gather_and_print_statistics(&config, &state); + } +} + +fn gather_and_print_statistics(config: &Config, state: &State) { let interval = config.statistics.interval; let requests_received: f64 = state diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index c0258e1..153ccac 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,7 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` -use aquatic_udp::handlers::run_request_worker; +use aquatic_udp::workers::request::run_request_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; From f68bbff7009faad875c5d9b5f725d2a5ed3cf64a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 21 Nov 2021 20:04:18 +0100 Subject: [PATCH 2/5] udp: split statistics by ipv4/ipv6 --- aquatic_udp/src/lib/common.rs | 18 ++--- aquatic_udp/src/lib/workers/request.rs | 8 +- aquatic_udp/src/lib/workers/socket.rs | 93 ++++++++++++++++------- aquatic_udp/src/lib/workers/statistics.rs | 55 ++++++-------- 4 files changed, 100 insertions(+), 74 deletions(-) diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 2a87296..b0f9f2e 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -232,10 +232,8 @@ pub struct Statistics { pub responses_sent: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, - pub torrents_ipv4: Vec, - pub torrents_ipv6: Vec, - pub peers_ipv4: Vec, - pub peers_ipv6: Vec, + pub torrents: Vec, + pub peers: Vec, } impl Statistics { @@ -245,10 +243,8 @@ impl Statistics { responses_sent: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), - torrents_ipv4: Self::create_atomic_usize_vec(num_request_workers), - torrents_ipv6: Self::create_atomic_usize_vec(num_request_workers), - peers_ipv4: Self::create_atomic_usize_vec(num_request_workers), - peers_ipv6: Self::create_atomic_usize_vec(num_request_workers), + torrents: Self::create_atomic_usize_vec(num_request_workers), + peers: Self::create_atomic_usize_vec(num_request_workers), } } @@ -262,14 +258,16 @@ impl Statistics { #[derive(Clone)] pub struct State { pub access_list: Arc, - pub statistics: Arc, + pub statistics_ipv4: Arc, + pub statistics_ipv6: Arc, } impl State { pub fn new(num_request_workers: usize) -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), - statistics: Arc::new(Statistics::new(num_request_workers)), + statistics_ipv4: Arc::new(Statistics::new(num_request_workers)), + statistics_ipv6: Arc::new(Statistics::new(num_request_workers)), } } } diff --git a/aquatic_udp/src/lib/workers/request.rs b/aquatic_udp/src/lib/workers/request.rs index 0bc85bb..00d0d53 100644 --- a/aquatic_udp/src/lib/workers/request.rs +++ b/aquatic_udp/src/lib/workers/request.rs @@ -132,9 +132,9 @@ pub fn run_request_worker( let peers_ipv4 = torrents.ipv4.values().map(|t| t.peers.len()).sum(); let peers_ipv6 = torrents.ipv6.values().map(|t| t.peers.len()).sum(); - state.statistics.peers_ipv4[worker_index.0] + state.statistics_ipv4.peers[worker_index.0] .store(peers_ipv4, Ordering::Release); - state.statistics.peers_ipv6[worker_index.0] + state.statistics_ipv6.peers[worker_index.0] .store(peers_ipv6, Ordering::Release); } @@ -143,9 +143,9 @@ pub fn run_request_worker( if !statistics_update_interval.is_zero() && now > last_statistics_update + statistics_update_interval { - state.statistics.torrents_ipv4[worker_index.0] + state.statistics_ipv4.torrents[worker_index.0] .store(torrents.ipv4.len(), Ordering::Release); - state.statistics.torrents_ipv6[worker_index.0] + state.statistics_ipv6.torrents[worker_index.0] .store(torrents.ipv6.len(), Ordering::Release); last_statistics_update = now; diff --git a/aquatic_udp/src/lib/workers/socket.rs b/aquatic_udp/src/lib/workers/socket.rs index af1f56b..db359d6 100644 --- a/aquatic_udp/src/lib/workers/socket.rs +++ b/aquatic_udp/src/lib/workers/socket.rs @@ -226,8 +226,10 @@ fn read_requests( connection_valid_until: ValidUntil, pending_scrape_valid_until: ValidUntil, ) { - let mut requests_received: usize = 0; - let mut bytes_received: usize = 0; + let mut requests_received_ipv4: usize = 0; + let mut requests_received_ipv6: usize = 0; + let mut bytes_received_ipv4: usize = 0; + let mut bytes_received_ipv6 = 0; let mut access_list_cache = create_access_list_cache(&state.access_list); @@ -237,14 +239,21 @@ fn read_requests( let res_request = Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); - bytes_received += amt; - - if res_request.is_ok() { - requests_received += 1; - } - let src = match src { + src @ SocketAddr::V4(_) => { + if res_request.is_ok() { + requests_received_ipv4 += 1; + } + bytes_received_ipv4 += amt; + + src + } SocketAddr::V6(src) => { + if res_request.is_ok() { + requests_received_ipv6 += 1; + } + bytes_received_ipv6 += amt; + match src.ip().octets() { // Convert IPv4-mapped address (available in std but nightly-only) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { @@ -256,7 +265,6 @@ fn read_requests( _ => src.into(), } } - src => src, }; handle_request( @@ -285,13 +293,21 @@ fn read_requests( if config.statistics.interval != 0 { state - .statistics + .statistics_ipv4 .requests_received - .fetch_add(requests_received, Ordering::Release); + .fetch_add(requests_received_ipv4, Ordering::Release); state - .statistics + .statistics_ipv6 + .requests_received + .fetch_add(requests_received_ipv6, Ordering::Release); + state + .statistics_ipv4 .bytes_received - .fetch_add(bytes_received, Ordering::Release); + .fetch_add(bytes_received_ipv4, Ordering::Release); + state + .statistics_ipv6 + .bytes_received + .fetch_add(bytes_received_ipv6, Ordering::Release); } } @@ -412,16 +428,20 @@ fn send_responses( pending_scrape_responses: &mut PendingScrapeResponseMap, local_responses: Drain<(Response, SocketAddr)>, ) { - let mut responses_sent: usize = 0; - let mut bytes_sent: usize = 0; + let mut responses_sent_ipv4: usize = 0; + let mut responses_sent_ipv6: usize = 0; + let mut bytes_sent_ipv4: usize = 0; + let mut bytes_sent_ipv6: usize = 0; for (response, addr) in local_responses { send_response( config, socket, buffer, - &mut responses_sent, - &mut bytes_sent, + &mut responses_sent_ipv4, + &mut responses_sent_ipv6, + &mut bytes_sent_ipv4, + &mut bytes_sent_ipv6, response, addr, ); @@ -439,8 +459,10 @@ fn send_responses( config, socket, buffer, - &mut responses_sent, - &mut bytes_sent, + &mut responses_sent_ipv4, + &mut responses_sent_ipv6, + &mut bytes_sent_ipv4, + &mut bytes_sent_ipv6, response, addr, ); @@ -449,13 +471,21 @@ fn send_responses( if config.statistics.interval != 0 { state - .statistics + .statistics_ipv4 .responses_sent - .fetch_add(responses_sent, Ordering::Release); + .fetch_add(responses_sent_ipv4, Ordering::Release); state - .statistics + .statistics_ipv6 + .responses_sent + .fetch_add(responses_sent_ipv6, Ordering::Release); + state + .statistics_ipv4 .bytes_sent - .fetch_add(bytes_sent, Ordering::Release); + .fetch_add(bytes_sent_ipv4, Ordering::Release); + state + .statistics_ipv6 + .bytes_sent + .fetch_add(bytes_sent_ipv6, Ordering::Release); } } @@ -463,13 +493,17 @@ fn send_response( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - responses_sent: &mut usize, - bytes_sent: &mut usize, + responses_sent_ipv4: &mut usize, + responses_sent_ipv6: &mut usize, + bytes_sent_ipv4: &mut usize, + bytes_sent_ipv6: &mut usize, response: Response, addr: SocketAddr, ) { let mut cursor = Cursor::new(buffer); + let addr_is_ipv4 = addr.is_ipv4(); + let addr = if config.network.address.is_ipv4() { if let SocketAddr::V4(addr) = addr { SocketAddr::V4(addr) @@ -493,8 +527,13 @@ fn send_response( match socket.send_to(&cursor.get_ref()[..amt], addr) { Ok(amt) => { - *responses_sent += 1; - *bytes_sent += amt; + if addr_is_ipv4 { + *responses_sent_ipv4 += 1; + *bytes_sent_ipv4 += amt; + } else { + *responses_sent_ipv6 += 1; + *bytes_sent_ipv6 += amt; + } } Err(err) => { ::log::info!("send_to error: {}", err); diff --git a/aquatic_udp/src/lib/workers/statistics.rs b/aquatic_udp/src/lib/workers/statistics.rs index fc8e8d8..7ca16f4 100644 --- a/aquatic_udp/src/lib/workers/statistics.rs +++ b/aquatic_udp/src/lib/workers/statistics.rs @@ -8,62 +8,51 @@ pub fn run_statistics_worker(config: Config, state: State) { loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - gather_and_print_statistics(&config, &state); + println!("General:"); + println!(" access list entries: {}", state.access_list.load().len()); + + println!("IPv4:"); + gather_and_print_for_protocol(&config, &state.statistics_ipv4); + + println!("IPv6:"); + gather_and_print_for_protocol(&config, &state.statistics_ipv6); + + println!(); } } -fn gather_and_print_statistics(config: &Config, state: &State) { +fn gather_and_print_for_protocol(config: &Config, statistics: &Statistics) { let interval = config.statistics.interval; - let requests_received: f64 = state - .statistics - .requests_received - .fetch_and(0, Ordering::AcqRel) as f64; - let responses_sent: f64 = state - .statistics - .responses_sent - .fetch_and(0, Ordering::AcqRel) as f64; - let bytes_received: f64 = state - .statistics - .bytes_received - .fetch_and(0, Ordering::AcqRel) as f64; - let bytes_sent: f64 = state.statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; + let requests_received: f64 = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64; + let responses_sent: f64 = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64; + let bytes_received: f64 = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64; + let bytes_sent: f64 = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; let requests_per_second = requests_received / interval as f64; let responses_per_second: f64 = responses_sent / interval as f64; let bytes_received_per_second: f64 = bytes_received / interval as f64; let bytes_sent_per_second: f64 = bytes_sent / interval as f64; - let num_torrents_ipv4: usize = sum_atomic_usizes(&state.statistics.torrents_ipv4); - let num_torrents_ipv6 = sum_atomic_usizes(&state.statistics.torrents_ipv6); - let num_peers_ipv4 = sum_atomic_usizes(&state.statistics.peers_ipv4); - let num_peers_ipv6 = sum_atomic_usizes(&state.statistics.peers_ipv6); - - let access_list_len = state.access_list.load().len(); + let num_torrents: usize = sum_atomic_usizes(&statistics.torrents); + let num_peers = sum_atomic_usizes(&statistics.peers); println!( - "stats: {:.2} requests/second, {:.2} responses/second", + " requests/second: {:10.2}, responses/second: {:10.2}", requests_per_second, responses_per_second ); println!( - "bandwidth: {:7.2} Mbit/s in, {:7.2} Mbit/s out", + " bandwidth: {:7.2} Mbit/s in, {:7.2} Mbit/s out", bytes_received_per_second * 8.0 / 1_000_000.0, bytes_sent_per_second * 8.0 / 1_000_000.0, ); + println!(" number of torrents: {}", num_torrents); println!( - "ipv4 torrents: {}, ipv6 torrents: {}", - num_torrents_ipv4, num_torrents_ipv6, + " number of peers: {} (updated every {} seconds)", + num_peers, config.cleaning.torrent_cleaning_interval ); - println!( - "ipv4 peers: {}, ipv6 peers: {} (both updated every {} seconds)", - num_peers_ipv4, num_peers_ipv6, config.cleaning.torrent_cleaning_interval - ); - - println!("access list entries: {}", access_list_len,); - - println!(); } fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize { From f001c69dc7cbac5d1fef4ad307feb85cbb4cbe11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 21 Nov 2021 20:09:39 +0100 Subject: [PATCH 3/5] udp: statistics: print only for active protocols --- aquatic_udp/src/lib/workers/statistics.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/aquatic_udp/src/lib/workers/statistics.rs b/aquatic_udp/src/lib/workers/statistics.rs index 7ca16f4..6219e32 100644 --- a/aquatic_udp/src/lib/workers/statistics.rs +++ b/aquatic_udp/src/lib/workers/statistics.rs @@ -5,17 +5,23 @@ use crate::common::*; use crate::config::Config; pub fn run_statistics_worker(config: Config, state: State) { + let ipv4_active = config.network.address.is_ipv4() || !config.network.only_ipv6; + let ipv6_active = config.network.address.is_ipv6(); + loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); println!("General:"); println!(" access list entries: {}", state.access_list.load().len()); - println!("IPv4:"); - gather_and_print_for_protocol(&config, &state.statistics_ipv4); - - println!("IPv6:"); - gather_and_print_for_protocol(&config, &state.statistics_ipv6); + if ipv4_active { + println!("IPv4:"); + gather_and_print_for_protocol(&config, &state.statistics_ipv4); + } + if ipv6_active { + println!("IPv6:"); + gather_and_print_for_protocol(&config, &state.statistics_ipv6); + } println!(); } From 3a4056058bc85c4124e40d5cf7f8e09c90eea8d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 21 Nov 2021 20:16:17 +0100 Subject: [PATCH 4/5] udp: statistics: properly handle ipv4-mapped ipv6 addresses --- aquatic_udp/src/lib/workers/socket.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/aquatic_udp/src/lib/workers/socket.rs b/aquatic_udp/src/lib/workers/socket.rs index db359d6..53b214c 100644 --- a/aquatic_udp/src/lib/workers/socket.rs +++ b/aquatic_udp/src/lib/workers/socket.rs @@ -240,20 +240,8 @@ fn read_requests( Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); let src = match src { - src @ SocketAddr::V4(_) => { - if res_request.is_ok() { - requests_received_ipv4 += 1; - } - bytes_received_ipv4 += amt; - - src - } + src @ SocketAddr::V4(_) => src, SocketAddr::V6(src) => { - if res_request.is_ok() { - requests_received_ipv6 += 1; - } - bytes_received_ipv6 += amt; - match src.ip().octets() { // Convert IPv4-mapped address (available in std but nightly-only) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { @@ -267,6 +255,19 @@ fn read_requests( } }; + // Update statistics for converted address + if src.is_ipv4() { + if res_request.is_ok() { + requests_received_ipv4 += 1; + } + bytes_received_ipv4 += amt; + } else { + if res_request.is_ok() { + requests_received_ipv6 += 1; + } + bytes_received_ipv6 += amt; + } + handle_request( config, connections, From 2f15b2cc8e6e2fbac0a375cb345b3b95809c9154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 21 Nov 2021 20:39:27 +0100 Subject: [PATCH 5/5] udp: make tracker-side statistics more accurate --- aquatic_udp/src/lib/workers/statistics.rs | 27 ++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/aquatic_udp/src/lib/workers/statistics.rs b/aquatic_udp/src/lib/workers/statistics.rs index 6219e32..ae3a81a 100644 --- a/aquatic_udp/src/lib/workers/statistics.rs +++ b/aquatic_udp/src/lib/workers/statistics.rs @@ -1,5 +1,5 @@ use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::common::*; use crate::config::Config; @@ -8,6 +8,9 @@ pub fn run_statistics_worker(config: Config, state: State) { let ipv4_active = config.network.address.is_ipv4() || !config.network.only_ipv6; let ipv6_active = config.network.address.is_ipv6(); + let mut last_ipv4 = Instant::now(); + let mut last_ipv6 = Instant::now(); + loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); @@ -16,29 +19,33 @@ pub fn run_statistics_worker(config: Config, state: State) { if ipv4_active { println!("IPv4:"); - gather_and_print_for_protocol(&config, &state.statistics_ipv4); + gather_and_print_for_protocol(&config, &state.statistics_ipv4, &mut last_ipv4); } if ipv6_active { println!("IPv6:"); - gather_and_print_for_protocol(&config, &state.statistics_ipv6); + gather_and_print_for_protocol(&config, &state.statistics_ipv6, &mut last_ipv6); } println!(); } } -fn gather_and_print_for_protocol(config: &Config, statistics: &Statistics) { - let interval = config.statistics.interval; - +fn gather_and_print_for_protocol(config: &Config, statistics: &Statistics, last: &mut Instant) { let requests_received: f64 = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64; let responses_sent: f64 = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64; let bytes_received: f64 = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64; let bytes_sent: f64 = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; - let requests_per_second = requests_received / interval as f64; - let responses_per_second: f64 = responses_sent / interval as f64; - let bytes_received_per_second: f64 = bytes_received / interval as f64; - let bytes_sent_per_second: f64 = bytes_sent / interval as f64; + let now = Instant::now(); + + let elapsed = (now - *last).as_secs_f64(); + + *last = now; + + let requests_per_second = requests_received / elapsed; + let responses_per_second: f64 = responses_sent / elapsed; + let bytes_received_per_second: f64 = bytes_received / elapsed; + let bytes_sent_per_second: f64 = bytes_sent / elapsed; let num_torrents: usize = sum_atomic_usizes(&statistics.torrents); let num_peers = sum_atomic_usizes(&statistics.peers);