Merge pull request #116 from greatest-ape/prometheus

fix ws metrics; add http metrics; fix http load test issues; run cargo update
This commit is contained in:
Joakim Frostegård 2023-01-18 22:36:26 +01:00 committed by GitHub
commit 5ac8f3727b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 399 additions and 144 deletions

View file

@ -50,6 +50,10 @@
### aquatic_http
#### Added
* Support exposing a Prometheus endpoint for metrics
#### Changed
* Don't return any response peers if announce event is stopped
@ -74,6 +78,7 @@
#### Added
* Add HTTP health check route when running without TLS
* Support exposing a Prometheus endpoint for metrics
#### Changed

100
Cargo.lock generated
View file

@ -77,7 +77,7 @@ dependencies = [
"duplicate",
"git-testament",
"glommio",
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"hex",
"hwloc",
"indexmap",
@ -111,6 +111,8 @@ dependencies = [
"libc",
"log",
"memchr",
"metrics",
"metrics-exporter-prometheus",
"mimalloc",
"once_cell",
"privdrop",
@ -136,7 +138,7 @@ dependencies = [
"futures-lite",
"futures-rustls",
"glommio",
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"log",
"mimalloc",
"quickcheck",
@ -226,7 +228,7 @@ dependencies = [
"constant_time_eq",
"crossbeam-channel",
"getrandom",
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"hdrhistogram",
"hex",
"libc",
@ -271,7 +273,7 @@ dependencies = [
"aquatic_common",
"aquatic_toml_config",
"aquatic_udp_protocol",
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"mimalloc",
"mio",
"quickcheck",
@ -306,7 +308,7 @@ dependencies = [
"futures-lite",
"futures-rustls",
"glommio",
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"httparse",
"log",
"metrics",
@ -355,7 +357,7 @@ version = "0.2.0"
dependencies = [
"anyhow",
"criterion",
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"quickcheck",
"quickcheck_macros",
"serde",
@ -590,9 +592,9 @@ checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5"
[[package]]
name = "bumpalo"
version = "3.11.1"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
[[package]]
name = "byteorder"
@ -663,9 +665,9 @@ dependencies = [
[[package]]
name = "console"
version = "0.15.4"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9b6515d269224923b26b5febea2ed42b2d5f2ce37284a4dd670fedd6cb8347a"
checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60"
dependencies = [
"encode_unicode",
"lazy_static",
@ -706,9 +708,9 @@ dependencies = [
[[package]]
name = "crc-catalog"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32fast"
@ -1260,7 +1262,7 @@ version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e2a3c70a9c00cc1ee87b54e89f9505f73bb17d63f1b25c9a462ba8ef885444f"
dependencies = [
"hashbrown 0.13.1",
"hashbrown 0.13.2",
"serde",
]
@ -1281,9 +1283,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.13.1"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash 0.8.2",
"serde",
@ -1481,9 +1483,9 @@ dependencies = [
[[package]]
name = "indicatif"
version = "0.17.2"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4295cbb7573c16d310e99e713cf9e75101eb190ab31fccd35f2d2691b4352b19"
checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729"
dependencies = [
"console",
"number_prefix",
@ -1796,9 +1798,9 @@ dependencies = [
[[package]]
name = "nix"
version = "0.26.1"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags 1.3.2",
"cfg-if",
@ -1816,9 +1818,9 @@ checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.2"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
@ -1931,9 +1933,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.30.1"
version = "0.30.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d864c91689fdc196779b98dba0aceac6118594c2df6ee5d943eb6a8df4d107a"
checksum = "2b8c786513eb403643f2a88c244c2aaa270ef2153f55094587d0c48a3cf22a83"
dependencies = [
"memchr",
]
@ -1980,7 +1982,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.5",
"parking_lot_core 0.9.6",
]
[[package]]
@ -1999,9 +2001,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.5"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba"
checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf"
dependencies = [
"cfg-if",
"libc",
@ -2138,7 +2140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81ed9e5437d82d5f2cde999a21571474c5f09b3d76e33eab94bf0e8e42a4fd96"
dependencies = [
"libc",
"nix 0.26.1",
"nix 0.26.2",
]
[[package]]
@ -2167,9 +2169,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.49"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
dependencies = [
"unicode-ident",
]
@ -2376,9 +2378,9 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustls"
version = "0.20.7"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log",
"ring",
@ -2679,9 +2681,9 @@ dependencies = [
[[package]]
name = "sqlformat"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f87e292b4291f154971a43c3774364e2cbcaec599d3f5bf6fa9d122885dbc38a"
checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
dependencies = [
"itertools",
"nom",
@ -2901,9 +2903,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.24.1"
version = "1.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae"
checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb"
dependencies = [
"autocfg",
"bytes",
@ -3340,45 +3342,45 @@ dependencies = [
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e"
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4"
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
[[package]]
name = "windows_i686_gnu"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7"
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
[[package]]
name = "windows_i686_msvc"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246"
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed"
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028"
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.0"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
[[package]]
name = "zeroize"

View file

@ -124,6 +124,26 @@ parsing fails, the program exits. Later failures result in in emitting of
an error-level log message, while successful updates of the access list result
in emitting of an info-level log message.
#### Prometheus
`aquatic_http` and `aquatic_ws` support exporting [Prometheus](https://prometheus.io/) metrics.
Pass the `prometheus` feature when building:
```sh
. ./scripts/env-native-cpu-without-avx-512
cargo build --release -p aquatic_ws --features "prometheus"
cargo build --release -p aquatic_http --features "prometheus"
```
Then activate the prometheus endpoint in the configuration file:
```toml
[metrics]
run_prometheus_endpoint = true
prometheus_endpoint_address = "0.0.0.0:9000"
```
### Running
If you're running `aquatic_http` or `aquatic_ws`, please make sure locked memory
@ -232,29 +252,6 @@ proxied to IPv4 requests, and IPv6 requests to IPv6 requests.
More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf). Please note that request workers have been renamed to swarm workers.
#### Prometheus
`aquatic_ws` supports exporting [Prometheus](https://prometheus.io/) metrics.
Pass the `prometheus` feature when building:
```sh
. ./scripts/env-native-cpu-without-avx-512
cargo build --release -p aquatic_ws --features "prometheus"
```
Then activate the prometheus endpoint in the configuration file:
```toml
[metrics]
# Run a prometheus endpoint
run_prometheus_endpoint = true
# Address to run prometheus endpoint on
prometheus_endpoint_address = "0.0.0.0:9000"
# Update metrics for torrent count this often (seconds)
torrent_count_update_interval = 10
```
## Load testing
There are load test binaries for all protocols. They use a CLI structure

View file

@ -16,6 +16,10 @@ name = "aquatic_http"
[[bin]]
name = "aquatic_http"
[features]
prometheus = ["metrics", "metrics-exporter-prometheus"]
metrics = ["dep:metrics"]
[dependencies]
aquatic_common = { workspace = true, features = ["rustls", "glommio"] }
aquatic_http_protocol.workspace = true
@ -31,6 +35,8 @@ glommio = "0.7"
itoa = "1"
libc = "0.2"
log = "0.4"
metrics = { version = "0.20", optional = true }
metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] }
mimalloc = { version = "0.1", default-features = false }
memchr = "2"
privdrop = "0.5"

View file

@ -29,6 +29,8 @@ pub struct Config {
pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig,
pub cpu_pinning: CpuPinningConfigAsc,
#[cfg(feature = "metrics")]
pub metrics: MetricsConfig,
}
impl Default for Config {
@ -43,6 +45,8 @@ impl Default for Config {
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
cpu_pinning: Default::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
}
}
}
@ -128,6 +132,29 @@ impl Default for CleaningConfig {
}
}
#[cfg(feature = "metrics")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct MetricsConfig {
/// Run a prometheus endpoint
pub run_prometheus_endpoint: bool,
/// Address to run prometheus endpoint on
pub prometheus_endpoint_address: SocketAddr,
/// Update metrics for torrent count this often (seconds)
pub torrent_count_update_interval: u64,
}
#[cfg(feature = "metrics")]
impl Default for MetricsConfig {
fn default() -> Self {
Self {
run_prometheus_endpoint: false,
prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)),
torrent_count_update_interval: 10,
}
}
}
#[cfg(test)]
mod tests {
use super::Config;

View file

@ -1,3 +1,4 @@
use anyhow::Context;
use aquatic_common::{
access_list::update_access_list,
cpu_pinning::{
@ -30,6 +31,21 @@ const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
#[cfg(feature = "prometheus")]
if config.metrics.run_prometheus_endpoint {
use metrics_exporter_prometheus::PrometheusBuilder;
PrometheusBuilder::new()
.with_http_listener(config.metrics.prometheus_endpoint_address)
.install()
.with_context(|| {
format!(
"Install prometheus endpoint on {}",
config.metrics.prometheus_endpoint_address
)
})?;
}
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
@ -76,6 +92,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
request_mesh_builder,
priv_dropper,
server_start_instant,
i,
)
.await
})
@ -106,6 +123,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
state,
request_mesh_builder,
server_start_instant,
i,
)
.await
})

View file

@ -39,6 +39,9 @@ const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
const RESPONSE_HEADER_B: &[u8] = b" ";
const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n";
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
static RESPONSE_HEADER: Lazy<Vec<u8>> =
Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat());
@ -60,7 +63,11 @@ pub async fn run_socket_worker(
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config);
let access_list = state.access_list;
@ -93,16 +100,49 @@ pub async fn run_socket_worker(
});
let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move {
if let Err(err) = Connection::run(
config,
access_list,
request_senders,
server_start_instant,
ConnectionId(key),
tls_config,
connection_slab.clone(),
stream
).await {
let result = match stream.peer_addr() {
Ok(peer_addr) => {
let peer_addr = CanonicalSocketAddr::new(peer_addr);
#[cfg(feature = "metrics")]
let ip_version_str = peer_addr_to_ip_version_str(&peer_addr);
#[cfg(feature = "metrics")]
::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_str,
"worker_index" => worker_index.to_string(),
);
let result = Connection::run(
config,
access_list,
request_senders,
server_start_instant,
ConnectionId(key),
tls_config,
connection_slab.clone(),
stream,
peer_addr
).await;
#[cfg(feature = "metrics")]
::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_str,
"worker_index" => worker_index.to_string(),
);
result
}
Err(err) => {
Err(anyhow::anyhow!("Couldn't get peer addr: {:?}", err))
}
};
if let Err(err) = result {
::log::debug!("Connection::run() error: {:?}", err);
}
@ -171,12 +211,8 @@ impl Connection {
tls_config: Arc<RustlsConfig>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TcpStream,
peer_addr: CanonicalSocketAddr,
) -> anyhow::Result<()> {
let peer_addr = stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
@ -288,6 +324,14 @@ impl Connection {
match request {
Request::Announce(request) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hash = request.info_hash;
if self
@ -327,6 +371,14 @@ impl Connection {
}
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.into_iter() {
@ -454,6 +506,22 @@ impl Connection {
self.stream.write(&self.response_buffer[..position]).await?;
self.stream.flush().await?;
#[cfg(feature = "metrics")]
{
let response_type = match response {
Response::Announce(_) => "announce",
Response::Scrape(_) => "scrape",
Response::Failure(_) => "error",
};
::metrics::increment_counter!(
"aquatic_responses_total",
"type" => response_type,
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
Ok(())
}
}
@ -496,3 +564,12 @@ fn create_tcp_listener(
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
}
#[cfg(feature = "metrics")]
fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str {
if addr.is_ipv4() {
"4"
} else {
"6"
}
}

View file

@ -27,10 +27,26 @@ use aquatic_http_protocol::response::*;
use crate::common::*;
use crate::config::Config;
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
impl Ip for Ipv4Addr {}
impl Ip for Ipv6Addr {}
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {
#[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str;
}
impl Ip for Ipv4Addr {
#[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str {
"4"
}
}
impl Ip for Ipv6Addr {
#[cfg(feature = "metrics")]
fn ip_version_str() -> &'static str {
"6"
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PeerStatus {
@ -59,8 +75,8 @@ impl PeerStatus {
pub struct Peer<I: Ip> {
pub ip_address: I,
pub port: u16,
pub status: PeerStatus,
pub valid_until: ValidUntil,
pub seeder: bool,
}
impl<I: Ip> Peer<I> {
@ -83,7 +99,6 @@ pub type PeerMap<I> = IndexMap<PeerMapKey<I>, Peer<I>>;
pub struct TorrentData<I: Ip> {
pub peers: PeerMap<I>,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl<I: Ip> Default for TorrentData<I> {
@ -92,11 +107,16 @@ impl<I: Ip> Default for TorrentData<I> {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
impl<I: Ip> TorrentData<I> {
fn num_leechers(&self) -> usize {
self.peers.len() - self.num_seeders
}
}
pub type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)]
@ -126,6 +146,8 @@ impl TorrentMaps {
torrent_map: &mut TorrentMap<I>,
now: SecondsSinceServerStart,
) {
let mut total_num_peers = 0;
torrent_map.retain(|info_hash, torrent_data| {
if !access_list_cache
.load()
@ -135,29 +157,32 @@ impl TorrentMaps {
}
let num_seeders = &mut torrent_data.num_seeders;
let num_leechers = &mut torrent_data.num_leechers;
torrent_data.peers.retain(|_, peer| {
let keep = peer.valid_until.valid(now);
if !keep {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
if (!keep) & peer.seeder {
*num_seeders -= 1;
}
keep
});
total_num_peers += torrent_data.peers.len() as u64;
!torrent_data.peers.is_empty()
});
let total_num_peers = total_num_peers as f64;
#[cfg(feature = "metrics")]
::metrics::gauge!(
"aquatic_peers",
total_num_peers,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
torrent_map.shrink_to_fit();
}
}
@ -168,7 +193,11 @@ pub async fn run_swarm_worker(
state: State,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
@ -198,6 +227,29 @@ pub async fn run_swarm_worker(
})()
}));
// Periodically update torrent count metrics
#[cfg(feature = "metrics")]
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
enclose!((config, torrents, worker_index) move || async move {
let torrents = torrents.borrow_mut();
::metrics::gauge!(
"aquatic_torrents",
torrents.ipv4.len() as f64,
"ip_version" => "4",
"worker_index" => worker_index.to_string(),
);
::metrics::gauge!(
"aquatic_torrents",
torrents.ipv6.len() as f64,
"ip_version" => "6",
"worker_index" => worker_index.to_string(),
);
Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
})()
}));
let mut handles = Vec::new();
for (_, receiver) in request_receivers.streams() {
@ -337,13 +389,6 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
let peer = Peer {
ip_address: peer_ip_address,
port: request.port,
status: peer_status,
valid_until,
};
let ip_or_key = request
.key
.map(Either::Right)
@ -356,24 +401,51 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
let peer = Peer {
ip_address: peer_ip_address,
port: request.port,
valid_until,
seeder: false,
};
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
let peer = Peer {
ip_address: peer_ip_address,
port: request.port,
valid_until,
seeder: true,
};
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
torrent_data.num_seeders -= 1;
}
#[cfg(feature = "metrics")]
match peer_status {
PeerStatus::Stopped if opt_removed_peer.is_some() => {
::metrics::decrement_gauge!(
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => {
::metrics::increment_gauge!(
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
_ => {}
}
@ -397,7 +469,7 @@ pub fn upsert_peer_and_get_response_peers<I: Ip>(
(
torrent_data.num_seeders,
torrent_data.num_leechers,
torrent_data.num_leechers(),
response_peers,
)
}
@ -427,7 +499,7 @@ pub fn handle_scrape_request(
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
incomplete: torrent_data.num_leechers(),
};
response.files.insert(info_hash, stats);
@ -439,7 +511,7 @@ pub fn handle_scrape_request(
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
incomplete: torrent_data.num_leechers(),
};
response.files.insert(info_hash, stats);

View file

@ -102,22 +102,23 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
let statistics = state.statistics.as_ref();
let responses_announce =
statistics.responses_announce.fetch_and(0, Ordering::SeqCst) as f64;
let responses_announce = statistics
.responses_announce
.fetch_and(0, Ordering::Relaxed) as f64;
// let response_peers = statistics.response_peers
// .fetch_and(0, Ordering::SeqCst) as f64;
let requests_per_second =
statistics.requests.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
statistics.requests.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
let responses_scrape_per_second =
statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
statistics.responses_scrape.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
let responses_failure_per_second =
statistics.responses_failure.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
statistics.responses_failure.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
let bytes_sent_per_second =
statistics.bytes_sent.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
let bytes_received_per_second =
statistics.bytes_received.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64 / interval_f64;
let responses_announce_per_second = responses_announce / interval_f64;

View file

@ -230,6 +230,11 @@ impl Connection {
}
}
self.load_test_state
.statistics
.bytes_received
.fetch_add(interesting_bytes.len(), Ordering::Relaxed);
break;
}
Err(err) => {

View file

@ -109,6 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
response_mesh_builder,
priv_dropper,
server_start_instant,
i,
)
.await
})
@ -145,6 +146,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
request_mesh_builder,
response_mesh_builder,
server_start_instant,
i,
)
.await
})

View file

@ -34,6 +34,9 @@ use crate::common::*;
const LOCAL_CHANNEL_SIZE: usize = 16;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
struct PendingScrapeResponse {
pending_worker_out_messages: usize,
stats: HashMap<InfoHash, ScrapeStatistics>,
@ -60,7 +63,11 @@ pub async fn run_socket_worker(
out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>,
priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config);
let access_list = state.access_list;
@ -156,7 +163,8 @@ pub async fn run_socket_worker(
::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_to_metrics_str(ip_version)
"ip_version" => ip_version_to_metrics_str(ip_version),
"worker_index" => worker_index.to_string(),
);
if let Err(err) = run_connection(
@ -184,7 +192,8 @@ pub async fn run_socket_worker(
::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_to_metrics_str(ip_version)
"ip_version" => ip_version_to_metrics_str(ip_version),
"worker_index" => worker_index.to_string(),
);
// Remove reference in separate statement to avoid
@ -520,7 +529,8 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => ip_version_to_metrics_str(self.ip_version)
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hash = announce_request.info_hash;
@ -597,7 +607,8 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version)
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hashes = if let Some(info_hashes) = info_hashes {
@ -681,9 +692,10 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"aquatic_responses_total",
"type" => "error",
"ip_version" => ip_version_to_metrics_str(self.ip_version)
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
result
@ -784,6 +796,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
"aquatic_responses_total",
"type" => out_message_type,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
@ -805,7 +818,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
}
Ok(Err(err)) => Err(err.into()),
Err(err) => {
::log::debug!("send_out_message: sending to peer took to long: {}", err);
::log::debug!("send_out_message: sending to peer took too long: {}", err);
Ok(())
}

View file

@ -22,6 +22,9 @@ use crate::common::*;
use crate::config::Config;
use crate::SHARED_IN_CHANNEL_SIZE;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
enum PeerStatus {
Seeding,
@ -145,7 +148,12 @@ impl TorrentMaps {
let total_num_peers = total_num_peers as f64;
#[cfg(feature = "metrics")]
::metrics::gauge!("aquatic_peers", total_num_peers, "ip_version" => ip_version);
::metrics::gauge!(
"aquatic_peers",
total_num_peers,
"ip_version" => ip_version,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
}
@ -157,7 +165,11 @@ pub async fn run_swarm_worker(
in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let (_, mut control_message_receivers) = control_message_mesh_builder
.join(Role::Consumer)
.await
@ -189,12 +201,14 @@ pub async fn run_swarm_worker(
::metrics::gauge!(
"aquatic_torrents",
torrents.ipv4.len() as f64,
"ip_version" => "4"
"ip_version" => "4",
"worker_index" => worker_index.to_string(),
);
::metrics::gauge!(
"aquatic_torrents",
torrents.ipv6.len() as f64,
"ip_version" => "6"
"ip_version" => "6",
"worker_index" => worker_index.to_string(),
);
Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
@ -385,13 +399,29 @@ fn handle_announce_request(
PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id),
};
if let Some(removed_peer) = opt_removed_peer {
if removed_peer.seeder {
torrent_data.num_seeders -= 1;
if let Some(&Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
torrent_data.num_seeders -= 1;
}
#[cfg(feature = "metrics")]
match peer_status {
PeerStatus::Stopped if opt_removed_peer.is_some() => {
::metrics::decrement_gauge!(
"aquatic_peers",
1.0,
"ip_version" => ip_version,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
} else {
#[cfg(feature = "metrics")]
::metrics::increment_gauge!("aquatic_peers", 1.0, "ip_version" => ip_version);
PeerStatus::Leeching | PeerStatus::Seeding if opt_removed_peer.is_none() => {
::metrics::increment_gauge!(
"aquatic_peers",
1.0,
"ip_version" => ip_version,
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
_ => {}
}
}

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --profile "release-debug" -p aquatic_http -- $@
cargo run --profile "release-debug" -p aquatic_http --features "prometheus" -- $@