ws: add prometheus support (active connections, requests, responses)

This commit is contained in:
Joakim Frostegård 2023-01-17 20:27:43 +01:00
parent bc4eea1a05
commit 3ac12b947f
6 changed files with 201 additions and 6 deletions

View file

@ -28,6 +28,8 @@ pub struct Config {
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig,
#[cfg(feature = "metrics")]
pub metrics: MetricsConfig,
pub cpu_pinning: CpuPinningConfigAsc,
}
@ -42,6 +44,8 @@ impl Default for Config {
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
cpu_pinning: Default::default(),
}
}
@ -142,6 +146,33 @@ impl Default for CleaningConfig {
}
}
#[cfg(feature = "metrics")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct MetricsConfig {
/// Run a prometheus endpoint
pub run_prometheus_endpoint: bool,
/// Address to run prometheus endpoint on
pub prometheus_endpoint_address: SocketAddr,
}
#[cfg(feature = "metrics")]
impl Default for MetricsConfig {
fn default() -> Self {
Self {
run_prometheus_endpoint: false,
prometheus_endpoint_address: SocketAddr::from(([0, 0, 0, 0], 9000)),
}
}
}
#[cfg(feature = "metrics")]
impl MetricsConfig {
pub fn active(&self) -> bool {
self.run_prometheus_endpoint
}
}
#[cfg(test)]
mod tests {
use super::Config;

View file

@ -35,6 +35,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
#[cfg(feature = "prometheus")]
if config.metrics.run_prometheus_endpoint {
use metrics_exporter_prometheus::PrometheusBuilder;
PrometheusBuilder::new()
.with_http_listener(config.metrics.prometheus_endpoint_address)
.install()
.with_context(|| {
format!(
"Install prometheus endpoint on {}",
config.metrics.prometheus_endpoint_address
)
})?;
}
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;

View file

@ -152,6 +152,9 @@ pub async fn run_socket_worker(
::log::trace!("accepting stream, assigning id {}", key);
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move {
#[cfg(feature = "metrics")]
::metrics::increment_gauge!("aquatic_active_connections", 1.0);
if let Err(err) = run_connection(
config.clone(),
access_list,
@ -173,6 +176,9 @@ pub async fn run_socket_worker(
// Clean up after closed connection
#[cfg(feature = "metrics")]
::metrics::decrement_gauge!("aquatic_active_connections", 1.0);
// Remove reference in separate statement to avoid
// multiple RefCell borrows
let opt_reference = connection_slab.borrow_mut().try_remove(key);
@ -501,6 +507,9 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> {
match in_message {
InMessage::AnnounceRequest(announce_request) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_requests_total", "type" => "announce");
let info_hash = announce_request.info_hash;
if self
@ -571,6 +580,9 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
}
}
InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_requests_total", "type" => "scrape");
let info_hashes = if let Some(info_hashes) = info_hashes {
info_hashes
} else {
@ -642,10 +654,18 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
info_hash,
});
self.out_message_sender
let result = self
.out_message_sender
.send((self.make_connection_meta(None).into(), out_message))
.await
.map_err(|err| anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err))
.map_err(|err| {
anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err)
});
#[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_responses_total", "type" => "error");
result
}
fn make_connection_meta(&self, pending_scrape_id: Option<PendingScrapeId>) -> InMessageMeta {
@ -728,6 +748,18 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
match result {
Ok(Ok(())) => {
#[cfg(feature = "metrics")]
let out_message_type = match &out_message {
OutMessage::Offer(_) => "offer",
OutMessage::Answer(_) => "offer_answer",
OutMessage::AnnounceResponse(_) => "announce",
OutMessage::ScrapeResponse(_) => "scrape",
OutMessage::ErrorResponse(_) => "error",
};
#[cfg(feature = "metrics")]
::metrics::increment_counter!("aquatic_responses_total", "type" => out_message_type);
self.connection_slab
.borrow_mut()
.get_mut(self.connection_id.0)