http: upgrade metrics crate to 0.22

This commit is contained in:
Joakim Frostegård 2024-01-25 00:03:05 +01:00
parent fe6a7ef8b5
commit c7f7f010ca
6 changed files with 90 additions and 161 deletions

116
Cargo.lock generated
View file

@ -194,8 +194,8 @@ dependencies = [
"libc", "libc",
"log", "log",
"memchr", "memchr",
"metrics 0.21.1", "metrics",
"metrics-exporter-prometheus 0.12.2", "metrics-exporter-prometheus",
"mimalloc", "mimalloc",
"once_cell", "once_cell",
"privdrop", "privdrop",
@ -306,9 +306,9 @@ dependencies = [
"io-uring", "io-uring",
"libc", "libc",
"log", "log",
"metrics 0.22.0", "metrics",
"metrics-exporter-prometheus 0.13.0", "metrics-exporter-prometheus",
"metrics-util 0.16.0", "metrics-util",
"mimalloc", "mimalloc",
"mio", "mio",
"num-format", "num-format",
@ -376,9 +376,9 @@ dependencies = [
"httparse", "httparse",
"indexmap 2.1.0", "indexmap 2.1.0",
"log", "log",
"metrics 0.22.0", "metrics",
"metrics-exporter-prometheus 0.13.0", "metrics-exporter-prometheus",
"metrics-util 0.16.0", "metrics-util",
"mimalloc", "mimalloc",
"privdrop", "privdrop",
"quickcheck", "quickcheck",
@ -1696,15 +1696,6 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "mach2"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.1" version = "2.7.1"
@ -1738,17 +1729,6 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "metrics"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5"
dependencies = [
"ahash 0.8.7",
"metrics-macros",
"portable-atomic",
]
[[package]] [[package]]
name = "metrics" name = "metrics"
version = "0.22.0" version = "0.22.0"
@ -1759,23 +1739,6 @@ dependencies = [
"portable-atomic", "portable-atomic",
] ]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d4fa7ce7c4862db464a37b0b31d89bca874562f034bd7993895572783d02950"
dependencies = [
"base64",
"hyper",
"indexmap 1.9.3",
"ipnet",
"metrics 0.21.1",
"metrics-util 0.15.1",
"quanta 0.11.1",
"thiserror",
"tokio",
]
[[package]] [[package]]
name = "metrics-exporter-prometheus" name = "metrics-exporter-prometheus"
version = "0.13.0" version = "0.13.0"
@ -1786,39 +1749,13 @@ dependencies = [
"hyper", "hyper",
"indexmap 1.9.3", "indexmap 1.9.3",
"ipnet", "ipnet",
"metrics 0.22.0", "metrics",
"metrics-util 0.16.0", "metrics-util",
"quanta 0.12.2", "quanta",
"thiserror", "thiserror",
"tokio", "tokio",
] ]
[[package]]
name = "metrics-macros"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "metrics-util"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.13.1",
"metrics 0.21.1",
"num_cpus",
"quanta 0.11.1",
"sketches-ddsketch 0.2.1",
]
[[package]] [[package]]
name = "metrics-util" name = "metrics-util"
version = "0.16.0" version = "0.16.0"
@ -1830,10 +1767,10 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
"hashbrown 0.13.1", "hashbrown 0.13.1",
"indexmap 1.9.3", "indexmap 1.9.3",
"metrics 0.22.0", "metrics",
"num_cpus", "num_cpus",
"ordered-float", "ordered-float",
"quanta 0.12.2", "quanta",
"radix_trie", "radix_trie",
"sketches-ddsketch 0.2.1", "sketches-ddsketch 0.2.1",
] ]
@ -2199,22 +2136,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "quanta"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab"
dependencies = [
"crossbeam-utils",
"libc",
"mach2",
"once_cell",
"raw-cpuid 10.7.0",
"wasi",
"web-sys",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "quanta" name = "quanta"
version = "0.12.2" version = "0.12.2"
@ -2224,7 +2145,7 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
"libc", "libc",
"once_cell", "once_cell",
"raw-cpuid 11.0.1", "raw-cpuid",
"wasi", "wasi",
"web-sys", "web-sys",
"winapi 0.3.9", "winapi 0.3.9",
@ -2311,15 +2232,6 @@ dependencies = [
"rand", "rand",
] ]
[[package]]
name = "raw-cpuid"
version = "10.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "raw-cpuid" name = "raw-cpuid"
version = "11.0.1" version = "11.0.1"

View file

@ -39,8 +39,8 @@ httparse = "1"
itoa = "1" itoa = "1"
libc = "0.2" libc = "0.2"
log = "0.4" log = "0.4"
metrics = { version = "0.21", optional = true } metrics = { version = "0.22", optional = true }
metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] } metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] }
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
memchr = "2" memchr = "2"
privdrop = "0.5" privdrop = "0.5"

View file

@ -26,9 +26,9 @@ use once_cell::sync::Lazy;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
use super::request::{parse_request, RequestParseError};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use super::{peer_addr_to_ip_version_str, WORKER_INDEX}; use super::peer_addr_to_ip_version_str;
use super::request::{parse_request, RequestParseError};
const REQUEST_BUFFER_SIZE: usize = 2048; const REQUEST_BUFFER_SIZE: usize = 2048;
const RESPONSE_BUFFER_SIZE: usize = 4096; const RESPONSE_BUFFER_SIZE: usize = 4096;
@ -67,6 +67,7 @@ pub enum ConnectionError {
Other(#[from] anyhow::Error), Other(#[from] anyhow::Error),
} }
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_connection( pub(super) async fn run_connection(
config: Rc<Config>, config: Rc<Config>,
access_list: Arc<AccessListArcSwap>, access_list: Arc<AccessListArcSwap>,
@ -75,6 +76,7 @@ pub(super) async fn run_connection(
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>, opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
valid_until: Rc<RefCell<ValidUntil>>, valid_until: Rc<RefCell<ValidUntil>>,
stream: TcpStream, stream: TcpStream,
worker_index: usize,
) -> Result<(), ConnectionError> { ) -> Result<(), ConnectionError> {
let access_list_cache = create_access_list_cache(&access_list); let access_list_cache = create_access_list_cache(&access_list);
let request_buffer = Box::new([0u8; REQUEST_BUFFER_SIZE]); let request_buffer = Box::new([0u8; REQUEST_BUFFER_SIZE]);
@ -114,6 +116,7 @@ pub(super) async fn run_connection(
request_buffer_position: 0, request_buffer_position: 0,
response_buffer, response_buffer,
stream, stream,
worker_index_string: worker_index.to_string(),
}; };
conn.run().await conn.run().await
@ -130,6 +133,7 @@ pub(super) async fn run_connection(
request_buffer_position: 0, request_buffer_position: 0,
response_buffer, response_buffer,
stream, stream,
worker_index_string: worker_index.to_string(),
}; };
conn.run().await conn.run().await
@ -148,6 +152,7 @@ struct Connection<S> {
request_buffer_position: usize, request_buffer_position: usize,
response_buffer: Box<[u8; RESPONSE_BUFFER_SIZE]>, response_buffer: Box<[u8; RESPONSE_BUFFER_SIZE]>,
stream: S, stream: S,
worker_index_string: String,
} }
impl<S> Connection<S> impl<S> Connection<S>
@ -244,12 +249,13 @@ where
match request { match request {
Request::Announce(request) => { Request::Announce(request) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_counter!( ::metrics::counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "announce", "type" => "announce",
"ip_version" => peer_addr_to_ip_version_str(&peer_addr), "ip_version" => peer_addr_to_ip_version_str(&peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), "worker_index" => self.worker_index_string.clone(),
); )
.increment(1);
let info_hash = request.info_hash; let info_hash = request.info_hash;
@ -291,12 +297,13 @@ where
} }
Request::Scrape(ScrapeRequest { info_hashes }) => { Request::Scrape(ScrapeRequest { info_hashes }) => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_counter!( ::metrics::counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "scrape", "type" => "scrape",
"ip_version" => peer_addr_to_ip_version_str(&peer_addr), "ip_version" => peer_addr_to_ip_version_str(&peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), "worker_index" => self.worker_index_string.clone(),
); )
.increment(1);
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new(); let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
@ -438,12 +445,13 @@ where
.opt_peer_addr .opt_peer_addr
.expect("peer addr should already have been extracted by now"); .expect("peer addr should already have been extracted by now");
::metrics::increment_counter!( ::metrics::counter!(
"aquatic_responses_total", "aquatic_responses_total",
"type" => response_type, "type" => response_type,
"ip_version" => peer_addr_to_ip_version_str(&peer_addr), "ip_version" => peer_addr_to_ip_version_str(&peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), "worker_index" => self.worker_index_string.clone(),
); )
.increment(1);
} }
Ok(()) Ok(())

View file

@ -25,9 +25,6 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
use crate::workers::socket::connection::{run_connection, ConnectionError}; use crate::workers::socket::connection::{run_connection, ConnectionError};
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
struct ConnectionHandle { struct ConnectionHandle {
close_conn_sender: LocalSender<()>, close_conn_sender: LocalSender<()>,
valid_until: Rc<RefCell<ValidUntil>>, valid_until: Rc<RefCell<ValidUntil>>,
@ -44,9 +41,6 @@ pub async fn run_socket_worker(
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
worker_index: usize, worker_index: usize,
) { ) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config); let config = Rc::new(config);
let access_list = state.access_list; let access_list = state.access_list;
@ -93,12 +87,14 @@ pub async fn run_socket_worker(
) )
async move { async move {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::increment_gauge!( let active_connections_gauge = ::metrics::gauge!(
"aquatic_active_connections", "aquatic_active_connections",
1.0,
"worker_index" => worker_index.to_string(), "worker_index" => worker_index.to_string(),
); );
#[cfg(feature = "metrics")]
active_connections_gauge.increment(1.0);
let f1 = async { run_connection( let f1 = async { run_connection(
config, config,
access_list, access_list,
@ -107,6 +103,7 @@ pub async fn run_socket_worker(
opt_tls_config, opt_tls_config,
valid_until.clone(), valid_until.clone(),
stream, stream,
worker_index,
).await ).await
}; };
let f2 = async { let f2 = async {
@ -118,11 +115,7 @@ pub async fn run_socket_worker(
let result = race(f1, f2).await; let result = race(f1, f2).await;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::decrement_gauge!( active_connections_gauge.decrement(1.0);
"aquatic_active_connections",
1.0,
"worker_index" => worker_index.to_string(),
);
match result { match result {
Ok(()) => (), Ok(()) => (),

View file

@ -18,9 +18,6 @@ use crate::config::Config;
use self::storage::TorrentMaps; use self::storage::TorrentMaps;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
pub async fn run_swarm_worker( pub async fn run_swarm_worker(
_sentinel: PanicSentinel, _sentinel: PanicSentinel,
config: Config, config: Config,
@ -29,12 +26,9 @@ pub async fn run_swarm_worker(
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
worker_index: usize, worker_index: usize,
) { ) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let torrents = Rc::new(RefCell::new(TorrentMaps::default())); let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index)));
let access_list = state.access_list; let access_list = state.access_list;
// Periodically clean torrents // Periodically clean torrents
@ -69,16 +63,14 @@ pub async fn run_swarm_worker(
::metrics::gauge!( ::metrics::gauge!(
"aquatic_torrents", "aquatic_torrents",
torrents.ipv4.len() as f64,
"ip_version" => "4", "ip_version" => "4",
"worker_index" => worker_index.to_string(), "worker_index" => worker_index.to_string(),
); ).set(torrents.ipv4.len() as f64);
::metrics::gauge!( ::metrics::gauge!(
"aquatic_torrents", "aquatic_torrents",
torrents.ipv6.len() as f64,
"ip_version" => "6", "ip_version" => "6",
"worker_index" => worker_index.to_string(), "worker_index" => worker_index.to_string(),
); ).set(torrents.ipv6.len() as f64);
Some(Duration::from_secs(config.metrics.torrent_count_update_interval)) Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
})() })()

View file

@ -16,7 +16,6 @@ use aquatic_http_protocol::response::*;
use crate::config::Config; use crate::config::Config;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use crate::workers::swarm::WORKER_INDEX;
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash { pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
@ -36,13 +35,35 @@ impl Ip for Ipv6Addr {
} }
} }
#[derive(Default)]
pub struct TorrentMaps { pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>, pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>, pub ipv6: TorrentMap<Ipv6Addr>,
#[cfg(feature = "metrics")]
pub ipv4_peer_gauge: metrics::Gauge,
#[cfg(feature = "metrics")]
pub ipv6_peer_gauge: metrics::Gauge,
} }
impl TorrentMaps { impl TorrentMaps {
pub fn new(worker_index: usize) -> Self {
Self {
ipv4: Default::default(),
ipv6: Default::default(),
#[cfg(feature = "metrics")]
ipv4_peer_gauge: ::metrics::gauge!(
"aquatic_peers",
"ip_version" => "4",
"worker_index" => worker_index.to_string(),
),
#[cfg(feature = "metrics")]
ipv6_peer_gauge: ::metrics::gauge!(
"aquatic_peers",
"ip_version" => "6",
"worker_index" => worker_index.to_string(),
),
}
}
pub fn handle_announce_request( pub fn handle_announce_request(
&mut self, &mut self,
config: &Config, config: &Config,
@ -63,6 +84,8 @@ impl TorrentMaps {
peer_ip_address, peer_ip_address,
request, request,
valid_until, valid_until,
#[cfg(feature = "metrics")]
&self.ipv4_peer_gauge,
); );
AnnounceResponse { AnnounceResponse {
@ -85,6 +108,8 @@ impl TorrentMaps {
peer_ip_address, peer_ip_address,
request, request,
valid_until, valid_until,
#[cfg(feature = "metrics")]
&self.ipv6_peer_gauge,
); );
AnnounceResponse { AnnounceResponse {
@ -157,8 +182,20 @@ impl TorrentMaps {
let now = server_start_instant.seconds_elapsed(); let now = server_start_instant.seconds_elapsed();
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); Self::clean_torrent_map(
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); config,
&mut access_list_cache,
&mut self.ipv4,
now,
&self.ipv4_peer_gauge,
);
Self::clean_torrent_map(
config,
&mut access_list_cache,
&mut self.ipv6,
now,
&self.ipv6_peer_gauge,
);
} }
fn clean_torrent_map<I: Ip>( fn clean_torrent_map<I: Ip>(
@ -166,6 +203,7 @@ impl TorrentMaps {
access_list_cache: &mut AccessListCache, access_list_cache: &mut AccessListCache,
torrent_map: &mut TorrentMap<I>, torrent_map: &mut TorrentMap<I>,
now: SecondsSinceServerStart, now: SecondsSinceServerStart,
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
) { ) {
let mut total_num_peers = 0; let mut total_num_peers = 0;
@ -197,12 +235,7 @@ impl TorrentMaps {
let total_num_peers = total_num_peers as f64; let total_num_peers = total_num_peers as f64;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
::metrics::gauge!( peer_gauge.set(total_num_peers);
"aquatic_peers",
total_num_peers,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
torrent_map.shrink_to_fit(); torrent_map.shrink_to_fit();
} }
@ -238,6 +271,7 @@ impl<I: Ip> TorrentData<I> {
peer_ip_address: I, peer_ip_address: I,
request: AnnounceRequest, request: AnnounceRequest,
valid_until: ValidUntil, valid_until: ValidUntil,
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
) -> (usize, usize, Vec<ResponsePeer<I>>) { ) -> (usize, usize, Vec<ResponsePeer<I>>) {
let peer_status = let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
@ -257,12 +291,7 @@ impl<I: Ip> TorrentData<I> {
PeerStatus::Seeding | PeerStatus::Leeching => { PeerStatus::Seeding | PeerStatus::Leeching => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
if opt_removed_peer.is_none() { if opt_removed_peer.is_none() {
::metrics::increment_gauge!( peer_gauge.increment(1.0);
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
} }
let max_num_peers_to_take = match request.numwant { let max_num_peers_to_take = match request.numwant {
@ -288,12 +317,7 @@ impl<I: Ip> TorrentData<I> {
PeerStatus::Stopped => { PeerStatus::Stopped => {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
if opt_removed_peer.is_some() { if opt_removed_peer.is_some() {
::metrics::decrement_gauge!( peer_gauge.decrement(1.0);
"aquatic_peers",
1.0,
"ip_version" => I::ip_version_str(),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
} }
Vec::new() Vec::new()