http: add metrics for connection count, requests and responses

This commit is contained in:
Joakim Frostegård 2023-01-18 20:48:59 +01:00
parent 3a6661afd7
commit 5c04245cbe
3 changed files with 88 additions and 15 deletions

2
Cargo.lock generated
View file

@ -111,6 +111,8 @@ dependencies = [
"libc",
"log",
"memchr",
"metrics",
"metrics-exporter-prometheus",
"mimalloc",
"once_cell",
"privdrop",

View file

@ -16,6 +16,10 @@ name = "aquatic_http"
[[bin]]
name = "aquatic_http"
[features]
prometheus = ["metrics", "metrics-exporter-prometheus"]
metrics = ["dep:metrics"]
[dependencies]
aquatic_common = { workspace = true, features = ["rustls", "glommio"] }
aquatic_http_protocol.workspace = true
@ -31,6 +35,8 @@ glommio = "0.7"
itoa = "1"
libc = "0.2"
log = "0.4"
metrics = { version = "0.20", optional = true }
metrics-exporter-prometheus = { version = "0.11", optional = true, default-features = false, features = ["http-listener"] }
mimalloc = { version = "0.1", default-features = false }
memchr = "2"
privdrop = "0.5"

View file

@ -93,16 +93,47 @@ pub async fn run_socket_worker(
});
let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move {
if let Err(err) = Connection::run(
config,
access_list,
request_senders,
server_start_instant,
ConnectionId(key),
tls_config,
connection_slab.clone(),
stream
).await {
let result = match stream.peer_addr() {
Ok(peer_addr) => {
let peer_addr = CanonicalSocketAddr::new(peer_addr);
#[cfg(feature = "metrics")]
let ip_version_str = peer_addr_to_ip_version_str(&peer_addr);
#[cfg(feature = "metrics")]
::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_str,
);
let result = Connection::run(
config,
access_list,
request_senders,
server_start_instant,
ConnectionId(key),
tls_config,
connection_slab.clone(),
stream,
peer_addr
).await;
#[cfg(feature = "metrics")]
::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_str,
);
result
}
Err(err) => {
Err(anyhow::anyhow!("Couldn't get peer addr: {:?}", err))
}
};
if let Err(err) = result {
::log::debug!("Connection::run() error: {:?}", err);
}
@ -171,12 +202,8 @@ impl Connection {
tls_config: Arc<RustlsConfig>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TcpStream,
peer_addr: CanonicalSocketAddr,
) -> anyhow::Result<()> {
let peer_addr = stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
@ -288,6 +315,13 @@ impl Connection {
match request {
Request::Announce(request) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr)
);
let info_hash = request.info_hash;
if self
@ -327,6 +361,13 @@ impl Connection {
}
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr)
);
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.into_iter() {
@ -454,6 +495,21 @@ impl Connection {
self.stream.write(&self.response_buffer[..position]).await?;
self.stream.flush().await?;
#[cfg(feature = "metrics")]
{
let response_type = match response {
Response::Announce(_) => "announce",
Response::Scrape(_) => "scrape",
Response::Failure(_) => "error",
};
::metrics::increment_counter!(
"aquatic_responses_total",
"type" => response_type,
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
);
}
Ok(())
}
}
@ -496,3 +552,12 @@ fn create_tcp_listener(
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
}
#[cfg(feature = "metrics")]
fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str {
if addr.is_ipv4() {
"4"
} else {
"6"
}
}