mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: add optional peers per torrent statistics
This commit is contained in:
parent
c0ed0eb7db
commit
f0e0a84088
9 changed files with 251 additions and 17 deletions
34
Cargo.lock
generated
34
Cargo.lock
generated
|
|
@ -215,6 +215,7 @@ dependencies = [
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"getrandom",
|
"getrandom",
|
||||||
"hashbrown 0.12.3",
|
"hashbrown 0.12.3",
|
||||||
|
"hdrhistogram",
|
||||||
"hex",
|
"hex",
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
|
|
@ -702,6 +703,15 @@ version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff"
|
checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crc32fast"
|
||||||
|
version = "1.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "criterion"
|
name = "criterion"
|
||||||
version = "0.3.6"
|
version = "0.3.6"
|
||||||
|
|
@ -973,6 +983,16 @@ dependencies = [
|
||||||
"instant",
|
"instant",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "flate2"
|
||||||
|
version = "1.0.24"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
|
||||||
|
dependencies = [
|
||||||
|
"crc32fast",
|
||||||
|
"miniz_oxide",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "float-cmp"
|
name = "float-cmp"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
|
@ -1275,6 +1295,20 @@ dependencies = [
|
||||||
"hashbrown 0.12.3",
|
"hashbrown 0.12.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hdrhistogram"
|
||||||
|
version = "7.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "31672b7011be2c4f7456c4ddbcb40e7e9a4a9fad8efe49a6ebaf5f307d0109c0"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"byteorder",
|
||||||
|
"crossbeam-channel",
|
||||||
|
"flate2",
|
||||||
|
"nom",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "headers"
|
name = "headers"
|
||||||
version = "0.3.7"
|
version = "0.3.7"
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ constant_time_eq = "0.2"
|
||||||
crossbeam-channel = "0.5"
|
crossbeam-channel = "0.5"
|
||||||
getrandom = "0.2"
|
getrandom = "0.2"
|
||||||
hashbrown = { version = "0.12", default-features = false }
|
hashbrown = { version = "0.12", default-features = false }
|
||||||
|
hdrhistogram = "7"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ use crossbeam_channel::{Sender, TrySendError};
|
||||||
use aquatic_common::access_list::AccessListArcSwap;
|
use aquatic_common::access_list::AccessListArcSwap;
|
||||||
use aquatic_common::CanonicalSocketAddr;
|
use aquatic_common::CanonicalSocketAddr;
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
|
use hdrhistogram::Histogram;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
|
|
@ -132,6 +133,11 @@ impl PeerStatus {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum StatisticsMessage {
|
||||||
|
Ipv4PeerHistogram(Histogram<u64>),
|
||||||
|
Ipv6PeerHistogram(Histogram<u64>),
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Statistics {
|
pub struct Statistics {
|
||||||
pub requests_received: AtomicUsize,
|
pub requests_received: AtomicUsize,
|
||||||
pub responses_sent_connect: AtomicUsize,
|
pub responses_sent_connect: AtomicUsize,
|
||||||
|
|
|
||||||
|
|
@ -142,6 +142,8 @@ impl Default for ProtocolConfig {
|
||||||
pub struct StatisticsConfig {
|
pub struct StatisticsConfig {
|
||||||
/// Collect and print/write statistics this often (seconds)
|
/// Collect and print/write statistics this often (seconds)
|
||||||
pub interval: u64,
|
pub interval: u64,
|
||||||
|
/// Enable extended statistics (on peers per torrent)
|
||||||
|
pub extended: bool,
|
||||||
/// Print statistics to standard output
|
/// Print statistics to standard output
|
||||||
pub print_to_stdout: bool,
|
pub print_to_stdout: bool,
|
||||||
/// Save statistics as HTML to a file
|
/// Save statistics as HTML to a file
|
||||||
|
|
@ -160,6 +162,7 @@ impl Default for StatisticsConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
interval: 5,
|
interval: 5,
|
||||||
|
extended: false,
|
||||||
print_to_stdout: false,
|
print_to_stdout: false,
|
||||||
write_html_to_file: false,
|
write_html_to_file: false,
|
||||||
html_file_path: "tmp/statistics.html".into(),
|
html_file_path: "tmp/statistics.html".into(),
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let mut response_senders = Vec::new();
|
let mut response_senders = Vec::new();
|
||||||
let mut response_receivers = BTreeMap::new();
|
let mut response_receivers = BTreeMap::new();
|
||||||
|
|
||||||
|
let (statistics_sender, statistics_receiver) = unbounded();
|
||||||
|
|
||||||
let server_start_instant = ServerStartInstant::new();
|
let server_start_instant = ServerStartInstant::new();
|
||||||
|
|
||||||
for i in 0..config.swarm_workers {
|
for i in 0..config.swarm_workers {
|
||||||
|
|
@ -71,6 +73,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
let request_receiver = request_receivers.remove(&i).unwrap().clone();
|
let request_receiver = request_receivers.remove(&i).unwrap().clone();
|
||||||
let response_sender = ConnectedResponseSender::new(response_senders.clone());
|
let response_sender = ConnectedResponseSender::new(response_senders.clone());
|
||||||
|
let statistics_sender = statistics_sender.clone();
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("swarm-{:02}", i + 1))
|
.name(format!("swarm-{:02}", i + 1))
|
||||||
|
|
@ -90,6 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
server_start_instant,
|
server_start_instant,
|
||||||
request_receiver,
|
request_receiver,
|
||||||
response_sender,
|
response_sender,
|
||||||
|
statistics_sender,
|
||||||
SwarmWorkerIndex(i),
|
SwarmWorkerIndex(i),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
@ -148,7 +152,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
WorkerIndex::Util,
|
WorkerIndex::Util,
|
||||||
);
|
);
|
||||||
|
|
||||||
workers::statistics::run_statistics_worker(sentinel, config, state);
|
workers::statistics::run_statistics_worker(
|
||||||
|
sentinel,
|
||||||
|
config,
|
||||||
|
state,
|
||||||
|
statistics_receiver,
|
||||||
|
);
|
||||||
})
|
})
|
||||||
.with_context(|| "spawn statistics worker")?;
|
.with_context(|| "spawn statistics worker")?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::fmt::Display;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
@ -6,6 +7,8 @@ use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use aquatic_common::PanicSentinel;
|
use aquatic_common::PanicSentinel;
|
||||||
|
use crossbeam_channel::Receiver;
|
||||||
|
use hdrhistogram::Histogram;
|
||||||
use num_format::{Locale, ToFormattedString};
|
use num_format::{Locale, ToFormattedString};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use time::format_description::well_known::Rfc2822;
|
use time::format_description::well_known::Rfc2822;
|
||||||
|
|
@ -23,6 +26,49 @@ const STYLESHEET_CONTENTS: &str = concat!(
|
||||||
"</style>"
|
"</style>"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, Serialize)]
|
||||||
|
struct PeerHistogramStatistics {
|
||||||
|
p0: u64,
|
||||||
|
p10: u64,
|
||||||
|
p20: u64,
|
||||||
|
p30: u64,
|
||||||
|
p40: u64,
|
||||||
|
p50: u64,
|
||||||
|
p60: u64,
|
||||||
|
p70: u64,
|
||||||
|
p80: u64,
|
||||||
|
p90: u64,
|
||||||
|
p95: u64,
|
||||||
|
p99: u64,
|
||||||
|
p100: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerHistogramStatistics {
|
||||||
|
fn new(h: &Histogram<u64>) -> Self {
|
||||||
|
Self {
|
||||||
|
p0: h.value_at_percentile(0.0),
|
||||||
|
p10: h.value_at_percentile(10.0),
|
||||||
|
p20: h.value_at_percentile(20.0),
|
||||||
|
p30: h.value_at_percentile(30.0),
|
||||||
|
p40: h.value_at_percentile(40.0),
|
||||||
|
p50: h.value_at_percentile(50.0),
|
||||||
|
p60: h.value_at_percentile(60.0),
|
||||||
|
p70: h.value_at_percentile(70.0),
|
||||||
|
p80: h.value_at_percentile(80.0),
|
||||||
|
p90: h.value_at_percentile(90.0),
|
||||||
|
p95: h.value_at_percentile(95.0),
|
||||||
|
p99: h.value_at_percentile(99.0),
|
||||||
|
p100: h.value_at_percentile(100.0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for PeerHistogramStatistics {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "p0: {}, p10: {}, p20: {}, p30: {}, p40: {}, p50: {}, p60: {}, p70: {}, p80: {}, p90: {}, p95: {}, p99: {}, p100: {}", self.p0, self.p10, self.p20, self.p30, self.p40, self.p50, self.p60, self.p70, self.p80, self.p90, self.p95, self.p99, self.p100)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
struct CollectedStatistics {
|
struct CollectedStatistics {
|
||||||
requests_per_second: f64,
|
requests_per_second: f64,
|
||||||
|
|
@ -34,10 +80,15 @@ struct CollectedStatistics {
|
||||||
bytes_sent_per_second: f64,
|
bytes_sent_per_second: f64,
|
||||||
num_torrents: usize,
|
num_torrents: usize,
|
||||||
num_peers: usize,
|
num_peers: usize,
|
||||||
|
peer_histogram: PeerHistogramStatistics,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CollectedStatistics {
|
impl CollectedStatistics {
|
||||||
fn from_shared(statistics: &Arc<Statistics>, last: &mut Instant) -> Self {
|
fn from_shared(
|
||||||
|
statistics: &Arc<Statistics>,
|
||||||
|
peer_histogram: &Histogram<u64>,
|
||||||
|
last: &mut Instant,
|
||||||
|
) -> Self {
|
||||||
let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64;
|
let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
let responses_sent_connect = statistics
|
let responses_sent_connect = statistics
|
||||||
.responses_sent_connect
|
.responses_sent_connect
|
||||||
|
|
@ -56,6 +107,8 @@ impl CollectedStatistics {
|
||||||
let num_torrents = Self::sum_atomic_usizes(&statistics.torrents);
|
let num_torrents = Self::sum_atomic_usizes(&statistics.torrents);
|
||||||
let num_peers = Self::sum_atomic_usizes(&statistics.peers);
|
let num_peers = Self::sum_atomic_usizes(&statistics.peers);
|
||||||
|
|
||||||
|
let peer_histogram = PeerHistogramStatistics::new(peer_histogram);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
let elapsed = (now - *last).as_secs_f64();
|
let elapsed = (now - *last).as_secs_f64();
|
||||||
|
|
@ -72,6 +125,7 @@ impl CollectedStatistics {
|
||||||
bytes_sent_per_second: bytes_sent / elapsed,
|
bytes_sent_per_second: bytes_sent / elapsed,
|
||||||
num_torrents,
|
num_torrents,
|
||||||
num_peers,
|
num_peers,
|
||||||
|
peer_histogram,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -107,6 +161,7 @@ impl Into<FormattedStatistics> for CollectedStatistics {
|
||||||
tx_mbits: format!("{:.2}", tx_mbits),
|
tx_mbits: format!("{:.2}", tx_mbits),
|
||||||
num_torrents: self.num_torrents.to_formatted_string(&Locale::en),
|
num_torrents: self.num_torrents.to_formatted_string(&Locale::en),
|
||||||
num_peers: self.num_peers.to_formatted_string(&Locale::en),
|
num_peers: self.num_peers.to_formatted_string(&Locale::en),
|
||||||
|
peer_histogram: self.peer_histogram,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -123,6 +178,7 @@ struct FormattedStatistics {
|
||||||
tx_mbits: String,
|
tx_mbits: String,
|
||||||
num_torrents: String,
|
num_torrents: String,
|
||||||
num_peers: String,
|
num_peers: String,
|
||||||
|
peer_histogram: PeerHistogramStatistics,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
|
|
@ -130,13 +186,47 @@ struct TemplateData {
|
||||||
stylesheet: String,
|
stylesheet: String,
|
||||||
ipv4_active: bool,
|
ipv4_active: bool,
|
||||||
ipv6_active: bool,
|
ipv6_active: bool,
|
||||||
|
extended_active: bool,
|
||||||
ipv4: FormattedStatistics,
|
ipv4: FormattedStatistics,
|
||||||
ipv6: FormattedStatistics,
|
ipv6: FormattedStatistics,
|
||||||
last_updated: String,
|
last_updated: String,
|
||||||
peer_update_interval: String,
|
peer_update_interval: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: State) {
|
struct PeerHistograms {
|
||||||
|
pending: Vec<Histogram<u64>>,
|
||||||
|
last_complete: Histogram<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for PeerHistograms {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
pending: Vec::new(),
|
||||||
|
last_complete: Histogram::new(3).expect("create peer histogram"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerHistograms {
|
||||||
|
fn update(&mut self, config: &Config, histogram: Histogram<u64>) {
|
||||||
|
self.pending.push(histogram);
|
||||||
|
|
||||||
|
if self.pending.len() == config.swarm_workers {
|
||||||
|
self.last_complete = self.pending.drain(..).sum();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current(&self) -> &Histogram<u64> {
|
||||||
|
&self.last_complete
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_statistics_worker(
|
||||||
|
_sentinel: PanicSentinel,
|
||||||
|
config: Config,
|
||||||
|
state: State,
|
||||||
|
statistics_receiver: Receiver<StatisticsMessage>,
|
||||||
|
) {
|
||||||
let tt = if config.statistics.write_html_to_file {
|
let tt = if config.statistics.write_html_to_file {
|
||||||
let mut tt = TinyTemplate::new();
|
let mut tt = TinyTemplate::new();
|
||||||
|
|
||||||
|
|
@ -154,13 +244,31 @@ pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: St
|
||||||
let mut last_ipv4 = Instant::now();
|
let mut last_ipv4 = Instant::now();
|
||||||
let mut last_ipv6 = Instant::now();
|
let mut last_ipv6 = Instant::now();
|
||||||
|
|
||||||
|
let mut peer_histograms_ipv4 = PeerHistograms::default();
|
||||||
|
let mut peer_histograms_ipv6 = PeerHistograms::default();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
|
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
|
||||||
|
|
||||||
let statistics_ipv4 =
|
for message in statistics_receiver.try_iter() {
|
||||||
CollectedStatistics::from_shared(&state.statistics_ipv4, &mut last_ipv4).into();
|
match message {
|
||||||
let statistics_ipv6 =
|
StatisticsMessage::Ipv4PeerHistogram(h) => peer_histograms_ipv4.update(&config, h),
|
||||||
CollectedStatistics::from_shared(&state.statistics_ipv6, &mut last_ipv6).into();
|
StatisticsMessage::Ipv6PeerHistogram(h) => peer_histograms_ipv6.update(&config, h),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let statistics_ipv4 = CollectedStatistics::from_shared(
|
||||||
|
&state.statistics_ipv4,
|
||||||
|
peer_histograms_ipv4.current(),
|
||||||
|
&mut last_ipv4,
|
||||||
|
)
|
||||||
|
.into();
|
||||||
|
let statistics_ipv6 = CollectedStatistics::from_shared(
|
||||||
|
&state.statistics_ipv6,
|
||||||
|
peer_histograms_ipv6.current(),
|
||||||
|
&mut last_ipv6,
|
||||||
|
)
|
||||||
|
.into();
|
||||||
|
|
||||||
if config.statistics.print_to_stdout {
|
if config.statistics.print_to_stdout {
|
||||||
println!("General:");
|
println!("General:");
|
||||||
|
|
@ -183,6 +291,7 @@ pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: St
|
||||||
stylesheet: STYLESHEET_CONTENTS.to_string(),
|
stylesheet: STYLESHEET_CONTENTS.to_string(),
|
||||||
ipv4_active: config.network.ipv4_active(),
|
ipv4_active: config.network.ipv4_active(),
|
||||||
ipv6_active: config.network.ipv6_active(),
|
ipv6_active: config.network.ipv6_active(),
|
||||||
|
extended_active: config.statistics.extended,
|
||||||
ipv4: statistics_ipv4,
|
ipv4: statistics_ipv4,
|
||||||
ipv6: statistics_ipv6,
|
ipv6: statistics_ipv6,
|
||||||
last_updated: OffsetDateTime::now_utc()
|
last_updated: OffsetDateTime::now_utc()
|
||||||
|
|
@ -230,6 +339,26 @@ fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) {
|
||||||
" number of peers: {} (updated every {} seconds)",
|
" number of peers: {} (updated every {} seconds)",
|
||||||
statistics.num_peers, config.cleaning.torrent_cleaning_interval
|
statistics.num_peers, config.cleaning.torrent_cleaning_interval
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if config.statistics.extended {
|
||||||
|
println!(
|
||||||
|
" peers per torrent (updated every {} seconds):",
|
||||||
|
config.cleaning.torrent_cleaning_interval
|
||||||
|
);
|
||||||
|
println!(" min {:>10}", statistics.peer_histogram.p0);
|
||||||
|
println!(" p10 {:>10}", statistics.peer_histogram.p10);
|
||||||
|
println!(" p20 {:>10}", statistics.peer_histogram.p20);
|
||||||
|
println!(" p30 {:>10}", statistics.peer_histogram.p30);
|
||||||
|
println!(" p40 {:>10}", statistics.peer_histogram.p40);
|
||||||
|
println!(" p50 {:>10}", statistics.peer_histogram.p50);
|
||||||
|
println!(" p60 {:>10}", statistics.peer_histogram.p60);
|
||||||
|
println!(" p70 {:>10}", statistics.peer_histogram.p70);
|
||||||
|
println!(" p80 {:>10}", statistics.peer_histogram.p80);
|
||||||
|
println!(" p90 {:>10}", statistics.peer_histogram.p90);
|
||||||
|
println!(" p95 {:>10}", statistics.peer_histogram.p95);
|
||||||
|
println!(" p99 {:>10}", statistics.peer_histogram.p99);
|
||||||
|
println!(" max {:>10}", statistics.peer_histogram.p100);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn save_html_to_file(
|
fn save_html_to_file(
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ use std::time::Instant;
|
||||||
|
|
||||||
use aquatic_common::ServerStartInstant;
|
use aquatic_common::ServerStartInstant;
|
||||||
use crossbeam_channel::Receiver;
|
use crossbeam_channel::Receiver;
|
||||||
|
use crossbeam_channel::Sender;
|
||||||
use rand::{rngs::SmallRng, SeedableRng};
|
use rand::{rngs::SmallRng, SeedableRng};
|
||||||
|
|
||||||
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil};
|
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil};
|
||||||
|
|
@ -25,6 +26,7 @@ pub fn run_swarm_worker(
|
||||||
server_start_instant: ServerStartInstant,
|
server_start_instant: ServerStartInstant,
|
||||||
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
||||||
response_sender: ConnectedResponseSender,
|
response_sender: ConnectedResponseSender,
|
||||||
|
statistics_sender: Sender<StatisticsMessage>,
|
||||||
worker_index: SwarmWorkerIndex,
|
worker_index: SwarmWorkerIndex,
|
||||||
) {
|
) {
|
||||||
let mut torrents = TorrentMaps::default();
|
let mut torrents = TorrentMaps::default();
|
||||||
|
|
@ -86,15 +88,26 @@ pub fn run_swarm_worker(
|
||||||
peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
|
peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
|
||||||
|
|
||||||
if now > last_cleaning + cleaning_interval {
|
if now > last_cleaning + cleaning_interval {
|
||||||
let (ipv4, ipv6) = torrents.clean_and_get_num_peers(
|
let (ipv4, ipv6) = torrents.clean_and_get_statistics(
|
||||||
&config,
|
&config,
|
||||||
&state.access_list,
|
&state.access_list,
|
||||||
server_start_instant,
|
server_start_instant,
|
||||||
);
|
);
|
||||||
|
|
||||||
if config.statistics.active() {
|
if config.statistics.active() {
|
||||||
state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release);
|
state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release);
|
||||||
state.statistics_ipv6.peers[worker_index.0].store(ipv6, Ordering::Release);
|
state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release);
|
||||||
|
|
||||||
|
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
|
||||||
|
if let Err(err) = statistics_sender.try_send(message) {
|
||||||
|
::log::error!("couldn't send statistics message: {:#}", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
|
||||||
|
if let Err(err) = statistics_sender.try_send(message) {
|
||||||
|
::log::error!("couldn't send statistics message: {:#}", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
last_cleaning = now;
|
last_cleaning = now;
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ use aquatic_common::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
|
use hdrhistogram::Histogram;
|
||||||
use rand::prelude::SmallRng;
|
use rand::prelude::SmallRng;
|
||||||
|
|
||||||
use crate::common::*;
|
use crate::common::*;
|
||||||
|
|
@ -140,14 +141,28 @@ pub struct TorrentMap<I: Ip>(pub AmortizedIndexMap<InfoHash, TorrentData<I>>);
|
||||||
|
|
||||||
impl<I: Ip> TorrentMap<I> {
|
impl<I: Ip> TorrentMap<I> {
|
||||||
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
||||||
fn clean_and_get_num_peers(
|
fn clean_and_get_statistics(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
config: &Config,
|
||||||
access_list_cache: &mut AccessListCache,
|
access_list_cache: &mut AccessListCache,
|
||||||
access_list_mode: AccessListMode,
|
access_list_mode: AccessListMode,
|
||||||
now: SecondsSinceServerStart,
|
now: SecondsSinceServerStart,
|
||||||
) -> usize {
|
) -> (usize, Option<Histogram<u64>>) {
|
||||||
let mut num_peers = 0;
|
let mut num_peers = 0;
|
||||||
|
|
||||||
|
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.extended {
|
||||||
|
match Histogram::new(3) {
|
||||||
|
Ok(histogram) => Some(histogram),
|
||||||
|
Err(err) => {
|
||||||
|
::log::error!("Couldn't create peer histogram: {:#}", err);
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
self.0.retain(|info_hash, torrent| {
|
self.0.retain(|info_hash, torrent| {
|
||||||
if !access_list_cache
|
if !access_list_cache
|
||||||
.load()
|
.load()
|
||||||
|
|
@ -160,12 +175,27 @@ impl<I: Ip> TorrentMap<I> {
|
||||||
|
|
||||||
num_peers += torrent.peers.len();
|
num_peers += torrent.peers.len();
|
||||||
|
|
||||||
|
match opt_histogram {
|
||||||
|
Some(ref mut histogram) if torrent.peers.len() != 0 => {
|
||||||
|
let n = torrent
|
||||||
|
.peers
|
||||||
|
.len()
|
||||||
|
.try_into()
|
||||||
|
.expect("Couldn't fit usize into u64");
|
||||||
|
|
||||||
|
if let Err(err) = histogram.record(n) {
|
||||||
|
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
|
||||||
!torrent.peers.is_empty()
|
!torrent.peers.is_empty()
|
||||||
});
|
});
|
||||||
|
|
||||||
self.0.shrink_to_fit();
|
self.0.shrink_to_fit();
|
||||||
|
|
||||||
num_peers
|
(num_peers, opt_histogram)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_torrents(&self) -> usize {
|
pub fn num_torrents(&self) -> usize {
|
||||||
|
|
@ -189,18 +219,25 @@ impl Default for TorrentMaps {
|
||||||
|
|
||||||
impl TorrentMaps {
|
impl TorrentMaps {
|
||||||
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
||||||
pub fn clean_and_get_num_peers(
|
pub fn clean_and_get_statistics(
|
||||||
&mut self,
|
&mut self,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
access_list: &Arc<AccessListArcSwap>,
|
access_list: &Arc<AccessListArcSwap>,
|
||||||
server_start_instant: ServerStartInstant,
|
server_start_instant: ServerStartInstant,
|
||||||
) -> (usize, usize) {
|
) -> (
|
||||||
|
(usize, Option<Histogram<u64>>),
|
||||||
|
(usize, Option<Histogram<u64>>),
|
||||||
|
) {
|
||||||
let mut cache = create_access_list_cache(access_list);
|
let mut cache = create_access_list_cache(access_list);
|
||||||
let mode = config.access_list.mode;
|
let mode = config.access_list.mode;
|
||||||
let now = server_start_instant.seconds_elapsed();
|
let now = server_start_instant.seconds_elapsed();
|
||||||
|
|
||||||
let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now);
|
let ipv4 = self
|
||||||
let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now);
|
.ipv4
|
||||||
|
.clean_and_get_statistics(config, &mut cache, mode, now);
|
||||||
|
let ipv6 = self
|
||||||
|
.ipv6
|
||||||
|
.clean_and_get_statistics(config, &mut cache, mode, now);
|
||||||
|
|
||||||
(ipv4, ipv6)
|
(ipv4, ipv6)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
|
||||||
let (response_sender, response_receiver) = unbounded();
|
let (response_sender, response_receiver) = unbounded();
|
||||||
|
|
||||||
let response_sender = ConnectedResponseSender::new(vec![response_sender]);
|
let response_sender = ConnectedResponseSender::new(vec![response_sender]);
|
||||||
|
let (statistics_sender, _) = unbounded();
|
||||||
|
|
||||||
let server_start_instant = ServerStartInstant::new();
|
let server_start_instant = ServerStartInstant::new();
|
||||||
|
|
||||||
|
|
@ -65,6 +66,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
|
||||||
server_start_instant,
|
server_start_instant,
|
||||||
request_receiver,
|
request_receiver,
|
||||||
response_sender,
|
response_sender,
|
||||||
|
statistics_sender,
|
||||||
SwarmWorkerIndex(0),
|
SwarmWorkerIndex(0),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue