update ws dependencies as well as http rustls dependency

This commit is contained in:
Joakim Frostegård 2024-01-07 10:40:50 +01:00
parent 071f088d8b
commit 3042539101
16 changed files with 499 additions and 295 deletions

View file

@ -9,7 +9,6 @@ use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::ServerStartInstant;
use aquatic_peer_id::PeerClient;
use aquatic_ws_protocol::*;
use arc_swap::ArcSwap;
use async_tungstenite::WebSocketStream;
@ -26,6 +25,9 @@ use hashbrown::hash_map::Entry;
use hashbrown::HashMap;
use slab::Slab;
#[cfg(feature = "metrics")]
use metrics::{Counter, Gauge};
use crate::common::*;
use crate::config::Config;
use crate::workers::socket::calculate_in_message_consumer_index;
@ -60,6 +62,12 @@ impl ConnectionRunner {
announced_info_hashes: Default::default(),
ip_version: self.ip_version,
opt_peer_client: Default::default(),
#[cfg(feature = "metrics")]
active_connections_gauge: ::metrics::gauge!(
"aquatic_active_connections",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.get().to_string(),
),
};
clean_up_data.before_open();
@ -74,7 +82,7 @@ impl ConnectionRunner {
clean_up_data
) async move {
if let Err(err) = self.run_inner(clean_up_data, stream).await {
::log::debug!("connection {:?} error: {:#}", connection_id, err);
::log::debug!("connection {:?} closed: {:#}", connection_id, err);
}
}),
tq_regular,
@ -101,7 +109,8 @@ impl ConnectionRunner {
mut stream: TcpStream,
) -> anyhow::Result<()> {
if let Some(tls_config) = self.opt_tls_config.as_ref() {
let tls_acceptor: TlsAcceptor = tls_config.load_full().into();
let tls_config = tls_config.load_full();
let tls_acceptor = TlsAcceptor::from(tls_config);
let stream = tls_acceptor.accept(stream).await?;
@ -177,6 +186,20 @@ impl ConnectionRunner {
ip_version: self.ip_version,
connection_id: self.connection_id,
clean_up_data: clean_up_data.clone(),
#[cfg(feature = "metrics")]
total_announce_requests_counter: ::metrics::counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
),
#[cfg(feature = "metrics")]
total_scrape_requests_counter: ::metrics::counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
)
};
reader.run_in_message_loop().await
@ -219,6 +242,10 @@ struct ConnectionReader<S> {
ip_version: IpVersion,
connection_id: ConnectionId,
clean_up_data: ConnectionCleanupData,
#[cfg(feature = "metrics")]
total_announce_requests_counter: Counter,
#[cfg(feature = "metrics")]
total_scrape_requests_counter: Counter,
}
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
@ -270,12 +297,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
self.total_announce_requests_counter.increment(1);
let info_hash = request.info_hash;
@ -315,24 +337,28 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
&& self.clean_up_data.opt_peer_client.borrow().is_none()
{
let peer_id = aquatic_peer_id::PeerId(request.peer_id.0);
let client = peer_id.client();
let prefix = peer_id.first_8_bytes_hex().to_string();
::metrics::increment_gauge!(
let peer_client_gauge = ::metrics::gauge!(
"aquatic_peer_clients",
1.0,
"client" => client.to_string(),
"client" => peer_id.client().to_string(),
);
if self.config.metrics.peer_id_prefixes {
::metrics::increment_gauge!(
"aquatic_peer_id_prefixes",
1.0,
"prefix_hex" => prefix.to_string(),
);
}
peer_client_gauge.increment(1.0);
*self.clean_up_data.opt_peer_client.borrow_mut() = Some((client, prefix));
let opt_peer_id_prefix_gauge =
self.config.metrics.peer_id_prefixes.then(|| {
let g = ::metrics::gauge!(
"aquatic_peer_id_prefixes",
"prefix_hex" => peer_id.first_8_bytes_hex().to_string(),
);
g.increment(1.0);
g
});
*self.clean_up_data.opt_peer_client.borrow_mut() =
Some((peer_client_gauge, opt_peer_id_prefix_gauge));
};
}
}
@ -370,12 +396,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
async fn handle_scrape_request(&mut self, request: ScrapeRequest) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
self.total_scrape_requests_counter.increment(1);
let info_hashes = if let Some(info_hashes) = request.info_hashes {
info_hashes
@ -548,32 +569,24 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
OutMessage::ErrorResponse(_) => "error",
};
::metrics::increment_counter!(
::metrics::counter!(
"aquatic_responses_total",
"type" => out_message_type,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
)
.increment(1);
if let Some((peer_client, prefix)) =
// As long as connection is still alive, increment peer client
// gauges by zero to prevent them from being removed due to
// idleness
if let Some((peer_client_gauge, opt_peer_id_prefix_gauge)) =
self.clean_up_data.opt_peer_client.borrow().as_ref()
{
// As long as connection is still alive, increment peer client
// gauges by zero to prevent them from being removed due to
// idleness
peer_client_gauge.increment(0.0);
::metrics::increment_gauge!(
"aquatic_peer_clients",
0.0,
"client" => peer_client.to_string(),
);
if self.config.metrics.peer_id_prefixes {
::metrics::increment_gauge!(
"aquatic_peer_id_prefixes",
0.0,
"prefix_hex" => prefix.to_string(),
);
if let Some(g) = opt_peer_id_prefix_gauge {
g.increment(0.0);
}
}
}
@ -587,18 +600,15 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
struct ConnectionCleanupData {
announced_info_hashes: Rc<RefCell<HashMap<InfoHash, PeerId>>>,
ip_version: IpVersion,
opt_peer_client: Rc<RefCell<Option<(PeerClient, String)>>>,
opt_peer_client: Rc<RefCell<Option<(Gauge, Option<Gauge>)>>>,
#[cfg(feature = "metrics")]
active_connections_gauge: Gauge,
}
impl ConnectionCleanupData {
fn before_open(&self) {
#[cfg(feature = "metrics")]
::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.get().to_string(),
);
self.active_connections_gauge.increment(1.0);
}
async fn after_close(
&self,
@ -625,28 +635,14 @@ impl ConnectionCleanupData {
}
#[cfg(feature = "metrics")]
{
::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.get().to_string(),
);
self.active_connections_gauge.decrement(1.0);
if let Some((peer_client, prefix)) = self.opt_peer_client.borrow().as_ref() {
::metrics::decrement_gauge!(
"aquatic_peer_clients",
1.0,
"client" => peer_client.to_string(),
);
#[cfg(feature = "metrics")]
if let Some((peer_client_gauge, opt_peer_id_prefix_gauge)) = self.opt_peer_client.take() {
peer_client_gauge.decrement(1.0);
if config.metrics.peer_id_prefixes {
::metrics::decrement_gauge!(
"aquatic_peer_id_prefixes",
1.0,
"prefix_hex" => prefix.to_string(),
);
}
if let Some(g) = opt_peer_id_prefix_gauge {
g.decrement(1.0);
}
}
}