mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: show separate statistics for all response types
This commit is contained in:
parent
00c4e74374
commit
f0dc7c19f3
5 changed files with 112 additions and 65 deletions
|
|
@ -149,7 +149,10 @@ impl PeerStatus {
|
|||
|
||||
pub struct Statistics {
|
||||
pub requests_received: AtomicUsize,
|
||||
pub responses_sent: AtomicUsize,
|
||||
pub responses_sent_connect: AtomicUsize,
|
||||
pub responses_sent_announce: AtomicUsize,
|
||||
pub responses_sent_scrape: AtomicUsize,
|
||||
pub responses_sent_error: AtomicUsize,
|
||||
pub bytes_received: AtomicUsize,
|
||||
pub bytes_sent: AtomicUsize,
|
||||
pub torrents: Vec<AtomicUsize>,
|
||||
|
|
@ -160,7 +163,10 @@ impl Statistics {
|
|||
pub fn new(num_request_workers: usize) -> Self {
|
||||
Self {
|
||||
requests_received: Default::default(),
|
||||
responses_sent: Default::default(),
|
||||
responses_sent_connect: Default::default(),
|
||||
responses_sent_announce: Default::default(),
|
||||
responses_sent_scrape: Default::default(),
|
||||
responses_sent_error: Default::default(),
|
||||
bytes_received: Default::default(),
|
||||
bytes_sent: Default::default(),
|
||||
torrents: Self::create_atomic_usize_vec(num_request_workers),
|
||||
|
|
|
|||
|
|
@ -458,20 +458,12 @@ fn send_responses(
|
|||
pending_scrape_responses: &mut PendingScrapeResponseSlab,
|
||||
local_responses: Drain<(Response, SocketAddr)>,
|
||||
) {
|
||||
let mut responses_sent_ipv4: usize = 0;
|
||||
let mut responses_sent_ipv6: usize = 0;
|
||||
let mut bytes_sent_ipv4: usize = 0;
|
||||
let mut bytes_sent_ipv6: usize = 0;
|
||||
|
||||
for (response, addr) in local_responses {
|
||||
send_response(
|
||||
state,
|
||||
config,
|
||||
socket,
|
||||
buffer,
|
||||
&mut responses_sent_ipv4,
|
||||
&mut responses_sent_ipv6,
|
||||
&mut bytes_sent_ipv4,
|
||||
&mut bytes_sent_ipv6,
|
||||
response,
|
||||
addr,
|
||||
);
|
||||
|
|
@ -486,47 +478,22 @@ fn send_responses(
|
|||
|
||||
if let Some(response) = opt_response {
|
||||
send_response(
|
||||
state,
|
||||
config,
|
||||
socket,
|
||||
buffer,
|
||||
&mut responses_sent_ipv4,
|
||||
&mut responses_sent_ipv6,
|
||||
&mut bytes_sent_ipv4,
|
||||
&mut bytes_sent_ipv6,
|
||||
response,
|
||||
addr,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if config.statistics.active() {
|
||||
state
|
||||
.statistics_ipv4
|
||||
.responses_sent
|
||||
.fetch_add(responses_sent_ipv4, Ordering::Release);
|
||||
state
|
||||
.statistics_ipv6
|
||||
.responses_sent
|
||||
.fetch_add(responses_sent_ipv6, Ordering::Release);
|
||||
state
|
||||
.statistics_ipv4
|
||||
.bytes_sent
|
||||
.fetch_add(bytes_sent_ipv4, Ordering::Release);
|
||||
state
|
||||
.statistics_ipv6
|
||||
.bytes_sent
|
||||
.fetch_add(bytes_sent_ipv6, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
fn send_response(
|
||||
state: &State,
|
||||
config: &Config,
|
||||
socket: &mut UdpSocket,
|
||||
buffer: &mut [u8],
|
||||
responses_sent_ipv4: &mut usize,
|
||||
responses_sent_ipv6: &mut usize,
|
||||
bytes_sent_ipv4: &mut usize,
|
||||
bytes_sent_ipv6: &mut usize,
|
||||
response: Response,
|
||||
addr: SocketAddr,
|
||||
) {
|
||||
|
|
@ -556,15 +523,31 @@ fn send_response(
|
|||
let amt = cursor.position() as usize;
|
||||
|
||||
match socket.send_to(&cursor.get_ref()[..amt], addr) {
|
||||
Ok(amt) => {
|
||||
if addr_is_ipv4 {
|
||||
*responses_sent_ipv4 += 1;
|
||||
*bytes_sent_ipv4 += amt;
|
||||
Ok(amt) if config.statistics.active() => {
|
||||
let stats = if addr_is_ipv4 {
|
||||
&state.statistics_ipv4
|
||||
} else {
|
||||
*responses_sent_ipv6 += 1;
|
||||
*bytes_sent_ipv6 += amt;
|
||||
&state.statistics_ipv6
|
||||
};
|
||||
|
||||
stats.bytes_sent.fetch_add(amt, Ordering::Relaxed);
|
||||
|
||||
match response {
|
||||
Response::Connect(_) => {
|
||||
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
|
||||
},
|
||||
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
|
||||
stats.responses_sent_announce.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Response::Scrape(_) => {
|
||||
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Response::Error(_) => {
|
||||
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {},
|
||||
Err(err) => {
|
||||
::log::info!("send_to error: {}", err);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,10 @@ const STYLESHEET_CONTENTS: &str = concat!(
|
|||
#[derive(Clone, Copy, Debug)]
|
||||
struct CollectedStatistics {
|
||||
requests_per_second: f64,
|
||||
responses_per_second: f64,
|
||||
responses_per_second_connect: f64,
|
||||
responses_per_second_announce: f64,
|
||||
responses_per_second_scrape: f64,
|
||||
responses_per_second_error: f64,
|
||||
bytes_received_per_second: f64,
|
||||
bytes_sent_per_second: f64,
|
||||
num_torrents: usize,
|
||||
|
|
@ -33,10 +36,13 @@ struct CollectedStatistics {
|
|||
|
||||
impl CollectedStatistics {
|
||||
fn from_shared(statistics: &Arc<Statistics>, last: &mut Instant) -> Self {
|
||||
let requests_received = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64;
|
||||
let responses_sent = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64;
|
||||
let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64;
|
||||
let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64;
|
||||
let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let responses_sent_connect = statistics.responses_sent_connect.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let responses_sent_announce = statistics.responses_sent_announce.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let responses_sent_scrape = statistics.responses_sent_scrape.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let responses_sent_error = statistics.responses_sent_error.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64;
|
||||
let num_torrents = Self::sum_atomic_usizes(&statistics.torrents);
|
||||
let num_peers = Self::sum_atomic_usizes(&statistics.peers);
|
||||
|
||||
|
|
@ -48,7 +54,10 @@ impl CollectedStatistics {
|
|||
|
||||
Self {
|
||||
requests_per_second: requests_received / elapsed,
|
||||
responses_per_second: responses_sent / elapsed,
|
||||
responses_per_second_connect: responses_sent_connect / elapsed,
|
||||
responses_per_second_announce: responses_sent_announce / elapsed,
|
||||
responses_per_second_scrape: responses_sent_scrape / elapsed,
|
||||
responses_per_second_error: responses_sent_error / elapsed,
|
||||
bytes_received_per_second: bytes_received / elapsed,
|
||||
bytes_sent_per_second: bytes_sent / elapsed,
|
||||
num_torrents,
|
||||
|
|
@ -57,7 +66,7 @@ impl CollectedStatistics {
|
|||
}
|
||||
|
||||
fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {
|
||||
values.iter().map(|n| n.load(Ordering::Acquire)).sum()
|
||||
values.iter().map(|n| n.load(Ordering::Relaxed)).sum()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -66,10 +75,20 @@ impl Into<FormattedStatistics> for CollectedStatistics {
|
|||
let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0;
|
||||
let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0;
|
||||
|
||||
let responses_per_second_total = self.responses_per_second_connect + self.responses_per_second_announce + self.responses_per_second_scrape + self.responses_per_second_error;
|
||||
|
||||
FormattedStatistics {
|
||||
requests_per_second: (self.requests_per_second as usize)
|
||||
.to_formatted_string(&Locale::en),
|
||||
responses_per_second: (self.responses_per_second as usize)
|
||||
responses_per_second_total: (responses_per_second_total as usize)
|
||||
.to_formatted_string(&Locale::en),
|
||||
responses_per_second_connect: (self.responses_per_second_connect as usize)
|
||||
.to_formatted_string(&Locale::en),
|
||||
responses_per_second_announce: (self.responses_per_second_announce as usize)
|
||||
.to_formatted_string(&Locale::en),
|
||||
responses_per_second_scrape: (self.responses_per_second_scrape as usize)
|
||||
.to_formatted_string(&Locale::en),
|
||||
responses_per_second_error: (self.responses_per_second_error as usize)
|
||||
.to_formatted_string(&Locale::en),
|
||||
rx_mbits: format!("{:.2}", rx_mbits),
|
||||
tx_mbits: format!("{:.2}", tx_mbits),
|
||||
|
|
@ -82,7 +101,11 @@ impl Into<FormattedStatistics> for CollectedStatistics {
|
|||
#[derive(Clone, Debug, Serialize)]
|
||||
struct FormattedStatistics {
|
||||
requests_per_second: String,
|
||||
responses_per_second: String,
|
||||
responses_per_second_total: String,
|
||||
responses_per_second_connect: String,
|
||||
responses_per_second_announce: String,
|
||||
responses_per_second_scrape: String,
|
||||
responses_per_second_error: String,
|
||||
rx_mbits: String,
|
||||
tx_mbits: String,
|
||||
num_torrents: String,
|
||||
|
|
@ -161,10 +184,13 @@ pub fn run_statistics_worker(config: Config, state: State) {
|
|||
}
|
||||
|
||||
fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) {
|
||||
println!(
|
||||
" requests/second: {:>10}, responses/second: {:>10}",
|
||||
statistics.requests_per_second, statistics.responses_per_second
|
||||
);
|
||||
println!(" requests/second: {:>10}", statistics.requests_per_second);
|
||||
println!(" responses/second");
|
||||
println!(" total: {:>10}", statistics.responses_per_second_total);
|
||||
println!(" connect: {:>10}", statistics.responses_per_second_connect);
|
||||
println!(" announce: {:>10}", statistics.responses_per_second_announce);
|
||||
println!(" scrape: {:>10}", statistics.responses_per_second_scrape);
|
||||
println!(" error: {:>10}", statistics.responses_per_second_error);
|
||||
println!(
|
||||
" bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out",
|
||||
statistics.rx_mbits, statistics.tx_mbits,
|
||||
|
|
|
|||
|
|
@ -39,8 +39,24 @@
|
|||
<td>{ ipv4.requests_per_second }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Responses / second</th>
|
||||
<td>{ ipv4.responses_per_second }</td>
|
||||
<th scope="row">Total responses / second</th>
|
||||
<td>{ ipv4.responses_per_second_total }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Connect responses / second</th>
|
||||
<td>{ ipv4.responses_per_second_connect }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Announce responses / second</th>
|
||||
<td>{ ipv4.responses_per_second_announce }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Scrape responses / second</th>
|
||||
<td>{ ipv4.responses_per_second_scrape }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Error responses / second</th>
|
||||
<td>{ ipv4.responses_per_second_error }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Bandwidth (RX)</th>
|
||||
|
|
@ -73,8 +89,24 @@
|
|||
<td>{ ipv6.requests_per_second }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Responses / second</th>
|
||||
<td>{ ipv6.responses_per_second }</td>
|
||||
<th scope="row">Total responses / second</th>
|
||||
<td>{ ipv6.responses_per_second_total }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Connect responses / second</th>
|
||||
<td>{ ipv6.responses_per_second_connect }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Announce responses / second</th>
|
||||
<td>{ ipv6.responses_per_second_announce }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Scrape responses / second</th>
|
||||
<td>{ ipv6.responses_per_second_scrape }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Error responses / second</th>
|
||||
<td>{ ipv6.responses_per_second_error }</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th scope="row">Bandwidth (RX)</th>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue