mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: statistics: show number of torrents and access list len
This commit is contained in:
parent
31e44db469
commit
59e95894b9
7 changed files with 84 additions and 9 deletions
1
TODO.md
1
TODO.md
|
|
@ -19,7 +19,6 @@
|
||||||
* aquatic_udp
|
* aquatic_udp
|
||||||
* look at proper cpu pinning (check that one thread gets bound per core)
|
* look at proper cpu pinning (check that one thread gets bound per core)
|
||||||
* then consider so_attach_reuseport_cbpf
|
* then consider so_attach_reuseport_cbpf
|
||||||
* implement statistics for total number of torrents and peers again?
|
|
||||||
* what poll event capacity is actually needed?
|
* what poll event capacity is actually needed?
|
||||||
* stagger connection cleaning intervals?
|
* stagger connection cleaning intervals?
|
||||||
* notes
|
* notes
|
||||||
|
|
|
||||||
|
|
@ -77,6 +77,10 @@ impl AccessList {
|
||||||
AccessListMode::Off => true,
|
AccessListMode::Off => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait AccessListQuery {
|
pub trait AccessListQuery {
|
||||||
|
|
|
||||||
|
|
@ -227,12 +227,32 @@ impl TorrentMaps {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Statistics {
|
pub struct Statistics {
|
||||||
pub requests_received: AtomicUsize,
|
pub requests_received: AtomicUsize,
|
||||||
pub responses_sent: AtomicUsize,
|
pub responses_sent: AtomicUsize,
|
||||||
pub bytes_received: AtomicUsize,
|
pub bytes_received: AtomicUsize,
|
||||||
pub bytes_sent: AtomicUsize,
|
pub bytes_sent: AtomicUsize,
|
||||||
|
pub torrents_ipv4: Vec<AtomicUsize>,
|
||||||
|
pub torrents_ipv6: Vec<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Statistics {
|
||||||
|
pub fn new(num_request_workers: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
requests_received: Default::default(),
|
||||||
|
responses_sent: Default::default(),
|
||||||
|
bytes_received: Default::default(),
|
||||||
|
bytes_sent: Default::default(),
|
||||||
|
torrents_ipv4: Self::create_atomic_usize_vec(num_request_workers),
|
||||||
|
torrents_ipv6: Self::create_atomic_usize_vec(num_request_workers),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_atomic_usize_vec(len: usize) -> Vec<AtomicUsize> {
|
||||||
|
::std::iter::repeat_with(|| AtomicUsize::default())
|
||||||
|
.take(len)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
@ -241,11 +261,11 @@ pub struct State {
|
||||||
pub statistics: Arc<Statistics>,
|
pub statistics: Arc<Statistics>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for State {
|
impl State {
|
||||||
fn default() -> Self {
|
pub fn new(num_request_workers: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
access_list: Arc::new(AccessListArcSwap::default()),
|
access_list: Arc::new(AccessListArcSwap::default()),
|
||||||
statistics: Arc::new(Statistics::default()),
|
statistics: Arc::new(Statistics::new(num_request_workers)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ use std::net::IpAddr;
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use std::net::Ipv6Addr;
|
use std::net::Ipv6Addr;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
|
@ -84,6 +85,7 @@ pub fn run_request_worker(
|
||||||
state: State,
|
state: State,
|
||||||
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>,
|
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>,
|
||||||
response_sender: ConnectedResponseSender,
|
response_sender: ConnectedResponseSender,
|
||||||
|
worker_index: RequestWorkerIndex,
|
||||||
) {
|
) {
|
||||||
let mut torrents = TorrentMaps::default();
|
let mut torrents = TorrentMaps::default();
|
||||||
let mut small_rng = SmallRng::from_entropy();
|
let mut small_rng = SmallRng::from_entropy();
|
||||||
|
|
@ -92,9 +94,12 @@ pub fn run_request_worker(
|
||||||
let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
|
let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
|
||||||
|
|
||||||
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
|
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
|
||||||
|
let statistics_update_interval = Duration::from_secs(config.statistics.interval);
|
||||||
|
|
||||||
|
let mut last_cleaning = Instant::now();
|
||||||
|
let mut last_statistics_update = Instant::now();
|
||||||
|
|
||||||
let mut iter_counter = 0usize;
|
let mut iter_counter = 0usize;
|
||||||
let mut last_cleaning = Instant::now();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
|
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
|
||||||
|
|
@ -125,6 +130,16 @@ pub fn run_request_worker(
|
||||||
|
|
||||||
last_cleaning = now;
|
last_cleaning = now;
|
||||||
}
|
}
|
||||||
|
if !statistics_update_interval.is_zero()
|
||||||
|
&& now > last_statistics_update + statistics_update_interval
|
||||||
|
{
|
||||||
|
state.statistics.torrents_ipv4[worker_index.0]
|
||||||
|
.store(torrents.ipv4.len(), Ordering::SeqCst);
|
||||||
|
state.statistics.torrents_ipv6[worker_index.0]
|
||||||
|
.store(torrents.ipv6.len(), Ordering::SeqCst);
|
||||||
|
|
||||||
|
last_statistics_update = now;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
iter_counter = iter_counter.wrapping_add(1);
|
iter_counter = iter_counter.wrapping_add(1);
|
||||||
|
|
|
||||||
|
|
@ -23,10 +23,12 @@ use signal_hook::iterator::Signals;
|
||||||
|
|
||||||
use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State};
|
use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State};
|
||||||
|
|
||||||
|
use crate::common::RequestWorkerIndex;
|
||||||
|
|
||||||
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
|
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
|
||||||
|
|
||||||
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let state = State::default();
|
let state = State::new(config.request_workers);
|
||||||
|
|
||||||
update_access_list(&config.access_list, &state.access_list)?;
|
update_access_list(&config.access_list, &state.access_list)?;
|
||||||
|
|
||||||
|
|
@ -70,7 +72,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
WorkerIndex::RequestWorker(i),
|
WorkerIndex::RequestWorker(i),
|
||||||
);
|
);
|
||||||
|
|
||||||
handlers::run_request_worker(config, state, request_receiver, response_sender)
|
handlers::run_request_worker(
|
||||||
|
config,
|
||||||
|
state,
|
||||||
|
request_receiver,
|
||||||
|
response_sender,
|
||||||
|
RequestWorkerIndex(i),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.with_context(|| "spawn request worker")?;
|
.with_context(|| "spawn request worker")?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,21 @@ pub fn gather_and_print_statistics(state: &State, config: &Config) {
|
||||||
let bytes_received_per_second: f64 = bytes_received / interval as f64;
|
let bytes_received_per_second: f64 = bytes_received / interval as f64;
|
||||||
let bytes_sent_per_second: f64 = bytes_sent / interval as f64;
|
let bytes_sent_per_second: f64 = bytes_sent / interval as f64;
|
||||||
|
|
||||||
|
let num_torrents_ipv4: usize = state
|
||||||
|
.statistics
|
||||||
|
.torrents_ipv4
|
||||||
|
.iter()
|
||||||
|
.map(|n| n.load(Ordering::SeqCst))
|
||||||
|
.sum();
|
||||||
|
let num_torrents_ipv6: usize = state
|
||||||
|
.statistics
|
||||||
|
.torrents_ipv6
|
||||||
|
.iter()
|
||||||
|
.map(|n| n.load(Ordering::SeqCst))
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
let access_list_len = state.access_list.load().len();
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"stats: {:.2} requests/second, {:.2} responses/second",
|
"stats: {:.2} requests/second, {:.2} responses/second",
|
||||||
requests_per_second, responses_per_second
|
requests_per_second, responses_per_second
|
||||||
|
|
@ -36,5 +51,12 @@ pub fn gather_and_print_statistics(state: &State, config: &Config) {
|
||||||
bytes_sent_per_second * 8.0 / 1_000_000.0,
|
bytes_sent_per_second * 8.0 / 1_000_000.0,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"ipv4 torrents: {}, ipv6 torrents: {}",
|
||||||
|
num_torrents_ipv4, num_torrents_ipv6,
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("access list entries: {}", access_list_len,);
|
||||||
|
|
||||||
println!();
|
println!();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,9 +50,16 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
|
||||||
|
|
||||||
{
|
{
|
||||||
let config = aquatic_config.clone();
|
let config = aquatic_config.clone();
|
||||||
|
let state = State::new(config.request_workers);
|
||||||
|
|
||||||
::std::thread::spawn(move || {
|
::std::thread::spawn(move || {
|
||||||
run_request_worker(config, State::default(), request_receiver, response_sender)
|
run_request_worker(
|
||||||
|
config,
|
||||||
|
state,
|
||||||
|
request_receiver,
|
||||||
|
response_sender,
|
||||||
|
RequestWorkerIndex(0),
|
||||||
|
)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue