ws: add prometheus peer client metrics

This commit is contained in:
Joakim Frostegård 2023-06-14 11:09:23 +02:00
parent 32aa34366c
commit 6675126d08
6 changed files with 95 additions and 1 deletions

View file

@ -14,6 +14,12 @@
* Add support for reporting peer client information * Add support for reporting peer client information
### aquatic_ws
#### Added
* Add support for reporting peer client information
## 0.8.0 - 2023-03-17 ## 0.8.0 - 2023-03-17
### General ### General

2
Cargo.lock generated
View file

@ -310,6 +310,7 @@ version = "0.8.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"aquatic_common", "aquatic_common",
"aquatic_peer_id",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_ws_protocol", "aquatic_ws_protocol",
"async-tungstenite", "async-tungstenite",
@ -323,6 +324,7 @@ dependencies = [
"log", "log",
"metrics", "metrics",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
"metrics-util",
"mimalloc", "mimalloc",
"privdrop", "privdrop",
"quickcheck", "quickcheck",

View file

@ -19,10 +19,11 @@ name = "aquatic_ws"
[features] [features]
default = ["prometheus"] default = ["prometheus"]
prometheus = ["metrics", "metrics-exporter-prometheus"] prometheus = ["metrics", "metrics-exporter-prometheus"]
metrics = ["dep:metrics"] metrics = ["dep:metrics", "metrics-util"]
[dependencies] [dependencies]
aquatic_common = { workspace = true, features = ["rustls", "glommio"] } aquatic_common = { workspace = true, features = ["rustls", "glommio"] }
aquatic_peer_id.workspace = true
aquatic_toml_config.workspace = true aquatic_toml_config.workspace = true
aquatic_ws_protocol.workspace = true aquatic_ws_protocol.workspace = true
@ -37,6 +38,7 @@ hashbrown = { version = "0.13", features = ["serde"] }
httparse = "1" httparse = "1"
log = "0.4" log = "0.4"
metrics = { version = "0.21", optional = true } metrics = { version = "0.21", optional = true }
metrics-util = { version = "0.15", optional = true }
metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] }
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
privdrop = "0.5" privdrop = "0.5"

View file

@ -156,6 +156,10 @@ pub struct MetricsConfig {
pub prometheus_endpoint_address: SocketAddr, pub prometheus_endpoint_address: SocketAddr,
/// Update metrics for torrent count this often (seconds) /// Update metrics for torrent count this often (seconds)
pub torrent_count_update_interval: u64, pub torrent_count_update_interval: u64,
/// Collect data on peer clients.
///
/// Expect a certain CPU hit
pub peer_clients: bool,
} }
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -165,6 +169,7 @@ impl Default for MetricsConfig {
run_prometheus_endpoint: false, run_prometheus_endpoint: false,
prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)), prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)),
torrent_count_update_interval: 10, torrent_count_update_interval: 10,
peer_clients: false,
} }
} }
} }

View file

@ -3,6 +3,7 @@ pub mod config;
pub mod workers; pub mod workers;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
@ -39,7 +40,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.metrics.run_prometheus_endpoint { if config.metrics.run_prometheus_endpoint {
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
let idle_timeout = config
.cleaning
.connection_cleaning_interval
.max(config.cleaning.torrent_cleaning_interval)
.max(config.metrics.torrent_count_update_interval)
* 2;
PrometheusBuilder::new() PrometheusBuilder::new()
.idle_timeout(
metrics_util::MetricKindMask::GAUGE,
Some(Duration::from_secs(idle_timeout)),
)
.with_http_listener(config.metrics.prometheus_endpoint_address) .with_http_listener(config.metrics.prometheus_endpoint_address)
.install() .install()
.with_context(|| { .with_context(|| {

View file

@ -11,6 +11,7 @@ use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, A
use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{PanicSentinel, ServerStartInstant}; use aquatic_common::{PanicSentinel, ServerStartInstant};
use aquatic_peer_id::PeerClient;
use aquatic_ws_protocol::*; use aquatic_ws_protocol::*;
use async_tungstenite::WebSocketStream; use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
@ -51,6 +52,7 @@ struct ConnectionReference {
valid_until: ValidUntil, valid_until: ValidUntil,
announced_info_hashes: HashMap<InfoHash, PeerId>, announced_info_hashes: HashMap<InfoHash, PeerId>,
ip_version: IpVersion, ip_version: IpVersion,
opt_peer_client: Option<PeerClient>,
} }
pub async fn run_socket_worker( pub async fn run_socket_worker(
@ -154,6 +156,7 @@ pub async fn run_socket_worker(
), ),
announced_info_hashes: Default::default(), announced_info_hashes: Default::default(),
ip_version, ip_version,
opt_peer_client: None,
}); });
::log::trace!("accepting stream, assigning id {}", key); ::log::trace!("accepting stream, assigning id {}", key);
@ -221,6 +224,15 @@ pub async fn run_socket_worker(
.await .await
.unwrap(); .unwrap();
} }
#[cfg(feature = "prometheus")]
if let Some(peer_client) = reference.opt_peer_client {
::metrics::decrement_gauge!(
"aquatic_peer_clients",
1.0,
"client" => peer_client.to_string(),
);
}
} }
}), tq_regular) }), tq_regular)
.unwrap() .unwrap()
@ -246,6 +258,18 @@ async fn clean_connections(
connection_slab.borrow_mut().retain(|_, reference| { connection_slab.borrow_mut().retain(|_, reference| {
if reference.valid_until.valid(now) { if reference.valid_until.valid(now) {
#[cfg(feature = "prometheus")]
if let Some(peer_client) = &reference.opt_peer_client {
// As long as connection is still alive, increment peer client
// gauge by zero to prevent it from being removed due to
// idleness
::metrics::increment_gauge!(
"aquatic_peer_clients",
0.0,
"client" => peer_client.to_string(),
);
}
true true
} else { } else {
if let Some(ref handle) = reference.task_handle { if let Some(ref handle) = reference.task_handle {
@ -258,6 +282,31 @@ async fn clean_connections(
connection_slab.borrow_mut().shrink_to_fit(); connection_slab.borrow_mut().shrink_to_fit();
#[cfg(feature = "metrics")]
{
// Increment gauges by zero to prevent them from being removed due to
// idleness
let worker_index = WORKER_INDEX.with(|index| index.get()).to_string();
if config.network.address.is_ipv4() || !config.network.only_ipv6 {
::metrics::increment_gauge!(
"aquatic_active_connections",
0.0,
"ip_version" => "4",
"worker_index" => worker_index.clone(),
);
}
if config.network.address.is_ipv6() {
::metrics::increment_gauge!(
"aquatic_active_connections",
0.0,
"ip_version" => "6",
"worker_index" => worker_index,
);
}
}
Some(Duration::from_secs( Some(Duration::from_secs(
config.cleaning.connection_cleaning_interval, config.cleaning.connection_cleaning_interval,
)) ))
@ -575,6 +624,24 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
} }
} }
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
#[cfg(feature = "prometheus")]
if self.config.metrics.run_prometheus_endpoint
&& self.config.metrics.peer_clients
&& connection_reference.opt_peer_client.is_none()
{
let client =
aquatic_peer_id::PeerId(announce_request.peer_id.0)
.client();
::metrics::increment_gauge!(
"aquatic_peer_clients",
1.0,
"client" => client.to_string(),
);
connection_reference.opt_peer_client = Some(client);
};
entry.insert(announce_request.peer_id); entry.insert(announce_request.peer_id);
} }
} }