diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 2a87296..b0f9f2e 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -232,10 +232,8 @@ pub struct Statistics { pub responses_sent: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, - pub torrents_ipv4: Vec, - pub torrents_ipv6: Vec, - pub peers_ipv4: Vec, - pub peers_ipv6: Vec, + pub torrents: Vec, + pub peers: Vec, } impl Statistics { @@ -245,10 +243,8 @@ impl Statistics { responses_sent: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), - torrents_ipv4: Self::create_atomic_usize_vec(num_request_workers), - torrents_ipv6: Self::create_atomic_usize_vec(num_request_workers), - peers_ipv4: Self::create_atomic_usize_vec(num_request_workers), - peers_ipv6: Self::create_atomic_usize_vec(num_request_workers), + torrents: Self::create_atomic_usize_vec(num_request_workers), + peers: Self::create_atomic_usize_vec(num_request_workers), } } @@ -262,14 +258,16 @@ impl Statistics { #[derive(Clone)] pub struct State { pub access_list: Arc, - pub statistics: Arc, + pub statistics_ipv4: Arc, + pub statistics_ipv6: Arc, } impl State { pub fn new(num_request_workers: usize) -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), - statistics: Arc::new(Statistics::new(num_request_workers)), + statistics_ipv4: Arc::new(Statistics::new(num_request_workers)), + statistics_ipv6: Arc::new(Statistics::new(num_request_workers)), } } } diff --git a/aquatic_udp/src/lib/workers/request.rs b/aquatic_udp/src/lib/workers/request.rs index 0bc85bb..00d0d53 100644 --- a/aquatic_udp/src/lib/workers/request.rs +++ b/aquatic_udp/src/lib/workers/request.rs @@ -132,9 +132,9 @@ pub fn run_request_worker( let peers_ipv4 = torrents.ipv4.values().map(|t| t.peers.len()).sum(); let peers_ipv6 = torrents.ipv6.values().map(|t| t.peers.len()).sum(); - state.statistics.peers_ipv4[worker_index.0] + state.statistics_ipv4.peers[worker_index.0] .store(peers_ipv4, Ordering::Release); - state.statistics.peers_ipv6[worker_index.0] + state.statistics_ipv6.peers[worker_index.0] .store(peers_ipv6, Ordering::Release); } @@ -143,9 +143,9 @@ pub fn run_request_worker( if !statistics_update_interval.is_zero() && now > last_statistics_update + statistics_update_interval { - state.statistics.torrents_ipv4[worker_index.0] + state.statistics_ipv4.torrents[worker_index.0] .store(torrents.ipv4.len(), Ordering::Release); - state.statistics.torrents_ipv6[worker_index.0] + state.statistics_ipv6.torrents[worker_index.0] .store(torrents.ipv6.len(), Ordering::Release); last_statistics_update = now; diff --git a/aquatic_udp/src/lib/workers/socket.rs b/aquatic_udp/src/lib/workers/socket.rs index af1f56b..db359d6 100644 --- a/aquatic_udp/src/lib/workers/socket.rs +++ b/aquatic_udp/src/lib/workers/socket.rs @@ -226,8 +226,10 @@ fn read_requests( connection_valid_until: ValidUntil, pending_scrape_valid_until: ValidUntil, ) { - let mut requests_received: usize = 0; - let mut bytes_received: usize = 0; + let mut requests_received_ipv4: usize = 0; + let mut requests_received_ipv6: usize = 0; + let mut bytes_received_ipv4: usize = 0; + let mut bytes_received_ipv6 = 0; let mut access_list_cache = create_access_list_cache(&state.access_list); @@ -237,14 +239,21 @@ fn read_requests( let res_request = Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); - bytes_received += amt; - - if res_request.is_ok() { - requests_received += 1; - } - let src = match src { + src @ SocketAddr::V4(_) => { + if res_request.is_ok() { + requests_received_ipv4 += 1; + } + bytes_received_ipv4 += amt; + + src + } SocketAddr::V6(src) => { + if res_request.is_ok() { + requests_received_ipv6 += 1; + } + bytes_received_ipv6 += amt; + match src.ip().octets() { // Convert IPv4-mapped address (available in std but nightly-only) [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { @@ -256,7 +265,6 @@ fn read_requests( _ => src.into(), } } - src => src, }; handle_request( @@ -285,13 +293,21 @@ fn read_requests( if config.statistics.interval != 0 { state - .statistics + .statistics_ipv4 .requests_received - .fetch_add(requests_received, Ordering::Release); + .fetch_add(requests_received_ipv4, Ordering::Release); state - .statistics + .statistics_ipv6 + .requests_received + .fetch_add(requests_received_ipv6, Ordering::Release); + state + .statistics_ipv4 .bytes_received - .fetch_add(bytes_received, Ordering::Release); + .fetch_add(bytes_received_ipv4, Ordering::Release); + state + .statistics_ipv6 + .bytes_received + .fetch_add(bytes_received_ipv6, Ordering::Release); } } @@ -412,16 +428,20 @@ fn send_responses( pending_scrape_responses: &mut PendingScrapeResponseMap, local_responses: Drain<(Response, SocketAddr)>, ) { - let mut responses_sent: usize = 0; - let mut bytes_sent: usize = 0; + let mut responses_sent_ipv4: usize = 0; + let mut responses_sent_ipv6: usize = 0; + let mut bytes_sent_ipv4: usize = 0; + let mut bytes_sent_ipv6: usize = 0; for (response, addr) in local_responses { send_response( config, socket, buffer, - &mut responses_sent, - &mut bytes_sent, + &mut responses_sent_ipv4, + &mut responses_sent_ipv6, + &mut bytes_sent_ipv4, + &mut bytes_sent_ipv6, response, addr, ); @@ -439,8 +459,10 @@ fn send_responses( config, socket, buffer, - &mut responses_sent, - &mut bytes_sent, + &mut responses_sent_ipv4, + &mut responses_sent_ipv6, + &mut bytes_sent_ipv4, + &mut bytes_sent_ipv6, response, addr, ); @@ -449,13 +471,21 @@ fn send_responses( if config.statistics.interval != 0 { state - .statistics + .statistics_ipv4 .responses_sent - .fetch_add(responses_sent, Ordering::Release); + .fetch_add(responses_sent_ipv4, Ordering::Release); state - .statistics + .statistics_ipv6 + .responses_sent + .fetch_add(responses_sent_ipv6, Ordering::Release); + state + .statistics_ipv4 .bytes_sent - .fetch_add(bytes_sent, Ordering::Release); + .fetch_add(bytes_sent_ipv4, Ordering::Release); + state + .statistics_ipv6 + .bytes_sent + .fetch_add(bytes_sent_ipv6, Ordering::Release); } } @@ -463,13 +493,17 @@ fn send_response( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - responses_sent: &mut usize, - bytes_sent: &mut usize, + responses_sent_ipv4: &mut usize, + responses_sent_ipv6: &mut usize, + bytes_sent_ipv4: &mut usize, + bytes_sent_ipv6: &mut usize, response: Response, addr: SocketAddr, ) { let mut cursor = Cursor::new(buffer); + let addr_is_ipv4 = addr.is_ipv4(); + let addr = if config.network.address.is_ipv4() { if let SocketAddr::V4(addr) = addr { SocketAddr::V4(addr) @@ -493,8 +527,13 @@ fn send_response( match socket.send_to(&cursor.get_ref()[..amt], addr) { Ok(amt) => { - *responses_sent += 1; - *bytes_sent += amt; + if addr_is_ipv4 { + *responses_sent_ipv4 += 1; + *bytes_sent_ipv4 += amt; + } else { + *responses_sent_ipv6 += 1; + *bytes_sent_ipv6 += amt; + } } Err(err) => { ::log::info!("send_to error: {}", err); diff --git a/aquatic_udp/src/lib/workers/statistics.rs b/aquatic_udp/src/lib/workers/statistics.rs index fc8e8d8..7ca16f4 100644 --- a/aquatic_udp/src/lib/workers/statistics.rs +++ b/aquatic_udp/src/lib/workers/statistics.rs @@ -8,62 +8,51 @@ pub fn run_statistics_worker(config: Config, state: State) { loop { ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - gather_and_print_statistics(&config, &state); + println!("General:"); + println!(" access list entries: {}", state.access_list.load().len()); + + println!("IPv4:"); + gather_and_print_for_protocol(&config, &state.statistics_ipv4); + + println!("IPv6:"); + gather_and_print_for_protocol(&config, &state.statistics_ipv6); + + println!(); } } -fn gather_and_print_statistics(config: &Config, state: &State) { +fn gather_and_print_for_protocol(config: &Config, statistics: &Statistics) { let interval = config.statistics.interval; - let requests_received: f64 = state - .statistics - .requests_received - .fetch_and(0, Ordering::AcqRel) as f64; - let responses_sent: f64 = state - .statistics - .responses_sent - .fetch_and(0, Ordering::AcqRel) as f64; - let bytes_received: f64 = state - .statistics - .bytes_received - .fetch_and(0, Ordering::AcqRel) as f64; - let bytes_sent: f64 = state.statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; + let requests_received: f64 = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64; + let responses_sent: f64 = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64; + let bytes_received: f64 = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64; + let bytes_sent: f64 = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; let requests_per_second = requests_received / interval as f64; let responses_per_second: f64 = responses_sent / interval as f64; let bytes_received_per_second: f64 = bytes_received / interval as f64; let bytes_sent_per_second: f64 = bytes_sent / interval as f64; - let num_torrents_ipv4: usize = sum_atomic_usizes(&state.statistics.torrents_ipv4); - let num_torrents_ipv6 = sum_atomic_usizes(&state.statistics.torrents_ipv6); - let num_peers_ipv4 = sum_atomic_usizes(&state.statistics.peers_ipv4); - let num_peers_ipv6 = sum_atomic_usizes(&state.statistics.peers_ipv6); - - let access_list_len = state.access_list.load().len(); + let num_torrents: usize = sum_atomic_usizes(&statistics.torrents); + let num_peers = sum_atomic_usizes(&statistics.peers); println!( - "stats: {:.2} requests/second, {:.2} responses/second", + " requests/second: {:10.2}, responses/second: {:10.2}", requests_per_second, responses_per_second ); println!( - "bandwidth: {:7.2} Mbit/s in, {:7.2} Mbit/s out", + " bandwidth: {:7.2} Mbit/s in, {:7.2} Mbit/s out", bytes_received_per_second * 8.0 / 1_000_000.0, bytes_sent_per_second * 8.0 / 1_000_000.0, ); + println!(" number of torrents: {}", num_torrents); println!( - "ipv4 torrents: {}, ipv6 torrents: {}", - num_torrents_ipv4, num_torrents_ipv6, + " number of peers: {} (updated every {} seconds)", + num_peers, config.cleaning.torrent_cleaning_interval ); - println!( - "ipv4 peers: {}, ipv6 peers: {} (both updated every {} seconds)", - num_peers_ipv4, num_peers_ipv6, config.cleaning.torrent_cleaning_interval - ); - - println!("access list entries: {}", access_list_len,); - - println!(); } fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {