udp: split statistics by ipv4/ipv6

This commit is contained in:
Joakim Frostegård 2021-11-21 20:04:18 +01:00
parent 34f263f6b9
commit f68bbff700
4 changed files with 100 additions and 74 deletions

View file

@ -232,10 +232,8 @@ pub struct Statistics {
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
pub torrents_ipv4: Vec<AtomicUsize>,
pub torrents_ipv6: Vec<AtomicUsize>,
pub peers_ipv4: Vec<AtomicUsize>,
pub peers_ipv6: Vec<AtomicUsize>,
pub torrents: Vec<AtomicUsize>,
pub peers: Vec<AtomicUsize>,
}
impl Statistics {
@ -245,10 +243,8 @@ impl Statistics {
responses_sent: Default::default(),
bytes_received: Default::default(),
bytes_sent: Default::default(),
torrents_ipv4: Self::create_atomic_usize_vec(num_request_workers),
torrents_ipv6: Self::create_atomic_usize_vec(num_request_workers),
peers_ipv4: Self::create_atomic_usize_vec(num_request_workers),
peers_ipv6: Self::create_atomic_usize_vec(num_request_workers),
torrents: Self::create_atomic_usize_vec(num_request_workers),
peers: Self::create_atomic_usize_vec(num_request_workers),
}
}
@ -262,14 +258,16 @@ impl Statistics {
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub statistics: Arc<Statistics>,
pub statistics_ipv4: Arc<Statistics>,
pub statistics_ipv6: Arc<Statistics>,
}
impl State {
pub fn new(num_request_workers: usize) -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
statistics: Arc::new(Statistics::new(num_request_workers)),
statistics_ipv4: Arc::new(Statistics::new(num_request_workers)),
statistics_ipv6: Arc::new(Statistics::new(num_request_workers)),
}
}
}

View file

@ -132,9 +132,9 @@ pub fn run_request_worker(
let peers_ipv4 = torrents.ipv4.values().map(|t| t.peers.len()).sum();
let peers_ipv6 = torrents.ipv6.values().map(|t| t.peers.len()).sum();
state.statistics.peers_ipv4[worker_index.0]
state.statistics_ipv4.peers[worker_index.0]
.store(peers_ipv4, Ordering::Release);
state.statistics.peers_ipv6[worker_index.0]
state.statistics_ipv6.peers[worker_index.0]
.store(peers_ipv6, Ordering::Release);
}
@ -143,9 +143,9 @@ pub fn run_request_worker(
if !statistics_update_interval.is_zero()
&& now > last_statistics_update + statistics_update_interval
{
state.statistics.torrents_ipv4[worker_index.0]
state.statistics_ipv4.torrents[worker_index.0]
.store(torrents.ipv4.len(), Ordering::Release);
state.statistics.torrents_ipv6[worker_index.0]
state.statistics_ipv6.torrents[worker_index.0]
.store(torrents.ipv6.len(), Ordering::Release);
last_statistics_update = now;

View file

@ -226,8 +226,10 @@ fn read_requests(
connection_valid_until: ValidUntil,
pending_scrape_valid_until: ValidUntil,
) {
let mut requests_received: usize = 0;
let mut bytes_received: usize = 0;
let mut requests_received_ipv4: usize = 0;
let mut requests_received_ipv6: usize = 0;
let mut bytes_received_ipv4: usize = 0;
let mut bytes_received_ipv6 = 0;
let mut access_list_cache = create_access_list_cache(&state.access_list);
@ -237,14 +239,21 @@ fn read_requests(
let res_request =
Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents);
bytes_received += amt;
if res_request.is_ok() {
requests_received += 1;
}
let src = match src {
src @ SocketAddr::V4(_) => {
if res_request.is_ok() {
requests_received_ipv4 += 1;
}
bytes_received_ipv4 += amt;
src
}
SocketAddr::V6(src) => {
if res_request.is_ok() {
requests_received_ipv6 += 1;
}
bytes_received_ipv6 += amt;
match src.ip().octets() {
// Convert IPv4-mapped address (available in std but nightly-only)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
@ -256,7 +265,6 @@ fn read_requests(
_ => src.into(),
}
}
src => src,
};
handle_request(
@ -285,13 +293,21 @@ fn read_requests(
if config.statistics.interval != 0 {
state
.statistics
.statistics_ipv4
.requests_received
.fetch_add(requests_received, Ordering::Release);
.fetch_add(requests_received_ipv4, Ordering::Release);
state
.statistics
.statistics_ipv6
.requests_received
.fetch_add(requests_received_ipv6, Ordering::Release);
state
.statistics_ipv4
.bytes_received
.fetch_add(bytes_received, Ordering::Release);
.fetch_add(bytes_received_ipv4, Ordering::Release);
state
.statistics_ipv6
.bytes_received
.fetch_add(bytes_received_ipv6, Ordering::Release);
}
}
@ -412,16 +428,20 @@ fn send_responses(
pending_scrape_responses: &mut PendingScrapeResponseMap,
local_responses: Drain<(Response, SocketAddr)>,
) {
let mut responses_sent: usize = 0;
let mut bytes_sent: usize = 0;
let mut responses_sent_ipv4: usize = 0;
let mut responses_sent_ipv6: usize = 0;
let mut bytes_sent_ipv4: usize = 0;
let mut bytes_sent_ipv6: usize = 0;
for (response, addr) in local_responses {
send_response(
config,
socket,
buffer,
&mut responses_sent,
&mut bytes_sent,
&mut responses_sent_ipv4,
&mut responses_sent_ipv6,
&mut bytes_sent_ipv4,
&mut bytes_sent_ipv6,
response,
addr,
);
@ -439,8 +459,10 @@ fn send_responses(
config,
socket,
buffer,
&mut responses_sent,
&mut bytes_sent,
&mut responses_sent_ipv4,
&mut responses_sent_ipv6,
&mut bytes_sent_ipv4,
&mut bytes_sent_ipv6,
response,
addr,
);
@ -449,13 +471,21 @@ fn send_responses(
if config.statistics.interval != 0 {
state
.statistics
.statistics_ipv4
.responses_sent
.fetch_add(responses_sent, Ordering::Release);
.fetch_add(responses_sent_ipv4, Ordering::Release);
state
.statistics
.statistics_ipv6
.responses_sent
.fetch_add(responses_sent_ipv6, Ordering::Release);
state
.statistics_ipv4
.bytes_sent
.fetch_add(bytes_sent, Ordering::Release);
.fetch_add(bytes_sent_ipv4, Ordering::Release);
state
.statistics_ipv6
.bytes_sent
.fetch_add(bytes_sent_ipv6, Ordering::Release);
}
}
@ -463,13 +493,17 @@ fn send_response(
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
responses_sent: &mut usize,
bytes_sent: &mut usize,
responses_sent_ipv4: &mut usize,
responses_sent_ipv6: &mut usize,
bytes_sent_ipv4: &mut usize,
bytes_sent_ipv6: &mut usize,
response: Response,
addr: SocketAddr,
) {
let mut cursor = Cursor::new(buffer);
let addr_is_ipv4 = addr.is_ipv4();
let addr = if config.network.address.is_ipv4() {
if let SocketAddr::V4(addr) = addr {
SocketAddr::V4(addr)
@ -493,8 +527,13 @@ fn send_response(
match socket.send_to(&cursor.get_ref()[..amt], addr) {
Ok(amt) => {
*responses_sent += 1;
*bytes_sent += amt;
if addr_is_ipv4 {
*responses_sent_ipv4 += 1;
*bytes_sent_ipv4 += amt;
} else {
*responses_sent_ipv6 += 1;
*bytes_sent_ipv6 += amt;
}
}
Err(err) => {
::log::info!("send_to error: {}", err);

View file

@ -8,62 +8,51 @@ pub fn run_statistics_worker(config: Config, state: State) {
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
gather_and_print_statistics(&config, &state);
println!("General:");
println!(" access list entries: {}", state.access_list.load().len());
println!("IPv4:");
gather_and_print_for_protocol(&config, &state.statistics_ipv4);
println!("IPv6:");
gather_and_print_for_protocol(&config, &state.statistics_ipv6);
println!();
}
}
fn gather_and_print_statistics(config: &Config, state: &State) {
fn gather_and_print_for_protocol(config: &Config, statistics: &Statistics) {
let interval = config.statistics.interval;
let requests_received: f64 = state
.statistics
.requests_received
.fetch_and(0, Ordering::AcqRel) as f64;
let responses_sent: f64 = state
.statistics
.responses_sent
.fetch_and(0, Ordering::AcqRel) as f64;
let bytes_received: f64 = state
.statistics
.bytes_received
.fetch_and(0, Ordering::AcqRel) as f64;
let bytes_sent: f64 = state.statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64;
let requests_received: f64 = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64;
let responses_sent: f64 = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64;
let bytes_received: f64 = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64;
let bytes_sent: f64 = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64;
let requests_per_second = requests_received / interval as f64;
let responses_per_second: f64 = responses_sent / interval as f64;
let bytes_received_per_second: f64 = bytes_received / interval as f64;
let bytes_sent_per_second: f64 = bytes_sent / interval as f64;
let num_torrents_ipv4: usize = sum_atomic_usizes(&state.statistics.torrents_ipv4);
let num_torrents_ipv6 = sum_atomic_usizes(&state.statistics.torrents_ipv6);
let num_peers_ipv4 = sum_atomic_usizes(&state.statistics.peers_ipv4);
let num_peers_ipv6 = sum_atomic_usizes(&state.statistics.peers_ipv6);
let access_list_len = state.access_list.load().len();
let num_torrents: usize = sum_atomic_usizes(&statistics.torrents);
let num_peers = sum_atomic_usizes(&statistics.peers);
println!(
"stats: {:.2} requests/second, {:.2} responses/second",
" requests/second: {:10.2}, responses/second: {:10.2}",
requests_per_second, responses_per_second
);
println!(
"bandwidth: {:7.2} Mbit/s in, {:7.2} Mbit/s out",
" bandwidth: {:7.2} Mbit/s in, {:7.2} Mbit/s out",
bytes_received_per_second * 8.0 / 1_000_000.0,
bytes_sent_per_second * 8.0 / 1_000_000.0,
);
println!(" number of torrents: {}", num_torrents);
println!(
"ipv4 torrents: {}, ipv6 torrents: {}",
num_torrents_ipv4, num_torrents_ipv6,
" number of peers: {} (updated every {} seconds)",
num_peers, config.cleaning.torrent_cleaning_interval
);
println!(
"ipv4 peers: {}, ipv6 peers: {} (both updated every {} seconds)",
num_peers_ipv4, num_peers_ipv6, config.cleaning.torrent_cleaning_interval
);
println!("access list entries: {}", access_list_len,);
println!();
}
fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {