From 59e95894b929ca1bbddc054d6b48bc38f02c40d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 19 Nov 2021 12:04:16 +0100 Subject: [PATCH] udp: statistics: show number of torrents and access list len --- TODO.md | 1 - aquatic_common/src/access_list.rs | 4 ++++ aquatic_udp/src/lib/common.rs | 28 ++++++++++++++++++++++++---- aquatic_udp/src/lib/handlers.rs | 17 ++++++++++++++++- aquatic_udp/src/lib/lib.rs | 12 ++++++++++-- aquatic_udp/src/lib/tasks.rs | 22 ++++++++++++++++++++++ aquatic_udp_bench/src/main.rs | 9 ++++++++- 7 files changed, 84 insertions(+), 9 deletions(-) diff --git a/TODO.md b/TODO.md index ed8ba7f..5927ae5 100644 --- a/TODO.md +++ b/TODO.md @@ -19,7 +19,6 @@ * aquatic_udp * look at proper cpu pinning (check that one thread gets bound per core) * then consider so_attach_reuseport_cbpf - * implement statistics for total number of torrents and peers again? * what poll event capacity is actually needed? * stagger connection cleaning intervals? * notes diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 54cfd21..f5a1076 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -77,6 +77,10 @@ impl AccessList { AccessListMode::Off => true, } } + + pub fn len(&self) -> usize { + self.0.len() + } } pub trait AccessListQuery { diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index e9fc24e..41381ed 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -227,12 +227,32 @@ impl TorrentMaps { } } -#[derive(Default)] pub struct Statistics { pub requests_received: AtomicUsize, pub responses_sent: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, + pub torrents_ipv4: Vec, + pub torrents_ipv6: Vec, +} + +impl Statistics { + pub fn new(num_request_workers: usize) -> Self { + Self { + requests_received: Default::default(), + responses_sent: Default::default(), + bytes_received: Default::default(), + bytes_sent: Default::default(), + torrents_ipv4: Self::create_atomic_usize_vec(num_request_workers), + torrents_ipv6: Self::create_atomic_usize_vec(num_request_workers), + } + } + + fn create_atomic_usize_vec(len: usize) -> Vec { + ::std::iter::repeat_with(|| AtomicUsize::default()) + .take(len) + .collect() + } } #[derive(Clone)] @@ -241,11 +261,11 @@ pub struct State { pub statistics: Arc, } -impl Default for State { - fn default() -> Self { +impl State { + pub fn new(num_request_workers: usize) -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), - statistics: Arc::new(Statistics::default()), + statistics: Arc::new(Statistics::new(num_request_workers)), } } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index 5653290..5cc38f0 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -3,6 +3,7 @@ use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::net::SocketAddr; +use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; @@ -84,6 +85,7 @@ pub fn run_request_worker( state: State, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, response_sender: ConnectedResponseSender, + worker_index: RequestWorkerIndex, ) { let mut torrents = TorrentMaps::default(); let mut small_rng = SmallRng::from_entropy(); @@ -92,9 +94,12 @@ pub fn run_request_worker( let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); + let statistics_update_interval = Duration::from_secs(config.statistics.interval); + + let mut last_cleaning = Instant::now(); + let mut last_statistics_update = Instant::now(); let mut iter_counter = 0usize; - let mut last_cleaning = Instant::now(); loop { if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { @@ -125,6 +130,16 @@ pub fn run_request_worker( last_cleaning = now; } + if !statistics_update_interval.is_zero() + && now > last_statistics_update + statistics_update_interval + { + state.statistics.torrents_ipv4[worker_index.0] + .store(torrents.ipv4.len(), Ordering::SeqCst); + state.statistics.torrents_ipv6[worker_index.0] + .store(torrents.ipv6.len(), Ordering::SeqCst); + + last_statistics_update = now; + } } iter_counter = iter_counter.wrapping_add(1); diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index f42c377..f9ecb93 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -23,10 +23,12 @@ use signal_hook::iterator::Signals; use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State}; +use crate::common::RequestWorkerIndex; + pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::default(); + let state = State::new(config.request_workers); update_access_list(&config.access_list, &state.access_list)?; @@ -70,7 +72,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::RequestWorker(i), ); - handlers::run_request_worker(config, state, request_receiver, response_sender) + handlers::run_request_worker( + config, + state, + request_receiver, + response_sender, + RequestWorkerIndex(i), + ) }) .with_context(|| "spawn request worker")?; } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 8e22560..b175baa 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -25,6 +25,21 @@ pub fn gather_and_print_statistics(state: &State, config: &Config) { let bytes_received_per_second: f64 = bytes_received / interval as f64; let bytes_sent_per_second: f64 = bytes_sent / interval as f64; + let num_torrents_ipv4: usize = state + .statistics + .torrents_ipv4 + .iter() + .map(|n| n.load(Ordering::SeqCst)) + .sum(); + let num_torrents_ipv6: usize = state + .statistics + .torrents_ipv6 + .iter() + .map(|n| n.load(Ordering::SeqCst)) + .sum(); + + let access_list_len = state.access_list.load().len(); + println!( "stats: {:.2} requests/second, {:.2} responses/second", requests_per_second, responses_per_second @@ -36,5 +51,12 @@ pub fn gather_and_print_statistics(state: &State, config: &Config) { bytes_sent_per_second * 8.0 / 1_000_000.0, ); + println!( + "ipv4 torrents: {}, ipv6 torrents: {}", + num_torrents_ipv4, num_torrents_ipv6, + ); + + println!("access list entries: {}", access_list_len,); + println!(); } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 6c77bee..c0258e1 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -50,9 +50,16 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { { let config = aquatic_config.clone(); + let state = State::new(config.request_workers); ::std::thread::spawn(move || { - run_request_worker(config, State::default(), request_receiver, response_sender) + run_request_worker( + config, + state, + request_receiver, + response_sender, + RequestWorkerIndex(0), + ) }); }