ws: add Connection backpressure channel instead of spawning tasks

Seems to fix memory leak issue
This commit is contained in:
Joakim Frostegård 2024-01-07 11:13:36 +01:00
parent 6e7d36cffc
commit 188da135ab
3 changed files with 86 additions and 77 deletions

View file

@ -3,11 +3,7 @@
## High priority ## High priority
* if peer_clients is on, add task to generate prometheus exports on regular * if peer_clients is on, add task to generate prometheus exports on regular
interval to clean up data. should more or less fix prometheus memory leak interval to clean up data
* ws
* try replacing race with futures::future::select
* bug report for glommio regarding memory leak
* aquatic_bench * aquatic_bench
* Opentracker "slow to get up to speed", is it due to getting faster once * Opentracker "slow to get up to speed", is it due to getting faster once

View file

@ -17,7 +17,7 @@ use futures::{AsyncWriteExt, StreamExt};
use futures_lite::future::race; use futures_lite::future::race;
use futures_rustls::TlsAcceptor; use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::Senders; use glommio::channels::channel_mesh::Senders;
use glommio::channels::local_channel::{LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::net::TcpStream; use glommio::net::TcpStream;
use glommio::timer::timeout; use glommio::timer::timeout;
use glommio::{enclose, prelude::*}; use glommio::{enclose, prelude::*};
@ -35,12 +35,17 @@ use crate::workers::socket::calculate_in_message_consumer_index;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX};
/// Length of ConnectionReader backpressure channel
///
/// ConnectionReader awaits a message in a channel before proceeding with
/// reading a request. For each response sent, a message is sent to the
/// channel, up to a maximum of this constant.
const READ_PASS_CHANNEL_LEN: usize = 4;
pub struct ConnectionRunner { pub struct ConnectionRunner {
pub config: Rc<Config>, pub config: Rc<Config>,
pub access_list: Arc<AccessListArcSwap>, pub access_list: Arc<AccessListArcSwap>,
pub in_message_senders: Rc<Senders<(InMessageMeta, InMessage)>>, pub in_message_senders: Rc<Senders<(InMessageMeta, InMessage)>>,
pub tq_prioritized: TaskQueueHandle,
pub tq_regular: TaskQueueHandle,
pub connection_valid_until: Rc<RefCell<ValidUntil>>, pub connection_valid_until: Rc<RefCell<ValidUntil>>,
pub out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>, pub out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>,
@ -75,23 +80,16 @@ impl ConnectionRunner {
let config = self.config.clone(); let config = self.config.clone();
let connection_id = self.connection_id.clone(); let connection_id = self.connection_id.clone();
let tq_regular = self.tq_regular; race(
async {
let connection_future = spawn_local_into( if let Err(err) = self.run_inner(clean_up_data.clone(), stream).await {
enclose!((
clean_up_data
) async move {
if let Err(err) = self.run_inner(clean_up_data, stream).await {
::log::debug!("connection {:?} closed: {:#}", connection_id, err); ::log::debug!("connection {:?} closed: {:#}", connection_id, err);
} }
}), },
tq_regular, async {
close_conn_receiver.recv().await;
},
) )
.unwrap();
race(connection_future, async {
close_conn_receiver.recv().await;
})
.await; .await;
::log::debug!("connection {:?} starting clean up", connection_id); ::log::debug!("connection {:?} starting clean up", connection_id);
@ -165,69 +163,73 @@ impl ConnectionRunner {
..Default::default() ..Default::default()
}; };
let stream = async_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?; let stream = async_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?;
let (ws_out, ws_in) = futures::StreamExt::split(stream); let (ws_out, ws_in) = futures::StreamExt::split(stream);
let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); let pending_scrape_slab = Rc::new(RefCell::new(Slab::new()));
let access_list_cache = create_access_list_cache(&self.access_list); let access_list_cache = create_access_list_cache(&self.access_list);
let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN);
for _ in 0..READ_PASS_CHANNEL_LEN {
if let Err(err) = read_pass_sender.try_send(()) {
panic!(
"couldn't add initial entries to read pass channel: {:#}",
err
)
};
}
let config = self.config.clone(); let config = self.config.clone();
let reader_handle = spawn_local_into( let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move {
enclose!((pending_scrape_slab, clean_up_data) async move { let mut reader = ConnectionReader {
let mut reader = ConnectionReader { config: self.config.clone(),
config: self.config.clone(), access_list_cache,
access_list_cache, in_message_senders: self.in_message_senders,
in_message_senders: self.in_message_senders, out_message_sender: self.out_message_sender,
out_message_sender: self.out_message_sender, read_pass_receiver,
pending_scrape_slab, pending_scrape_slab,
out_message_consumer_id: self.out_message_consumer_id, out_message_consumer_id: self.out_message_consumer_id,
ws_in, ws_in,
ip_version: self.ip_version, ip_version: self.ip_version,
connection_id: self.connection_id, connection_id: self.connection_id,
clean_up_data: clean_up_data.clone(), clean_up_data: clean_up_data.clone(),
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
total_announce_requests_counter: ::metrics::counter!( total_announce_requests_counter: ::metrics::counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "announce", "type" => "announce",
"ip_version" => ip_version_to_metrics_str(self.ip_version), "ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
), ),
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
total_scrape_requests_counter: ::metrics::counter!( total_scrape_requests_counter: ::metrics::counter!(
"aquatic_requests_total", "aquatic_requests_total",
"type" => "scrape", "type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version), "ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
) )
}; };
reader.run_in_message_loop().await reader.run_in_message_loop().await
}), });
self.tq_regular,
)
.unwrap();
let writer_handle = spawn_local_into( let writer_future = async move {
async move { let mut writer = ConnectionWriter {
let mut writer = ConnectionWriter { config,
config, out_message_receiver: self.out_message_receiver,
out_message_receiver: self.out_message_receiver, read_pass_sender,
connection_valid_until: self.connection_valid_until, connection_valid_until: self.connection_valid_until,
ws_out, ws_out,
pending_scrape_slab, pending_scrape_slab,
server_start_instant: self.server_start_instant, server_start_instant: self.server_start_instant,
ip_version: self.ip_version, ip_version: self.ip_version,
clean_up_data, clean_up_data,
}; };
writer.run_out_message_loop().await writer.run_out_message_loop().await
}, };
self.tq_prioritized,
)
.unwrap();
race(reader_handle, writer_handle).await race(reader_future, writer_future).await
} }
} }
@ -236,6 +238,7 @@ struct ConnectionReader<S> {
access_list_cache: AccessListCache, access_list_cache: AccessListCache,
in_message_senders: Rc<Senders<(InMessageMeta, InMessage)>>, in_message_senders: Rc<Senders<(InMessageMeta, InMessage)>>,
out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>, out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
read_pass_receiver: LocalReceiver<()>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>, pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
out_message_consumer_id: ConsumerId, out_message_consumer_id: ConsumerId,
ws_in: SplitStream<WebSocketStream<S>>, ws_in: SplitStream<WebSocketStream<S>>,
@ -251,6 +254,11 @@ struct ConnectionReader<S> {
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> { impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> {
loop { loop {
self.read_pass_receiver
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?;
let message = self let message = self
.ws_in .ws_in
.next() .next()
@ -488,6 +496,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
struct ConnectionWriter<S> { struct ConnectionWriter<S> {
config: Rc<Config>, config: Rc<Config>,
out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>,
read_pass_sender: LocalSender<()>,
connection_valid_until: Rc<RefCell<ValidUntil>>, connection_valid_until: Rc<RefCell<ValidUntil>>,
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>, ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>, pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
@ -539,6 +548,12 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
self.send_out_message(&out_message).await?; self.send_out_message(&out_message).await?;
} }
}; };
if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) {
return Err(anyhow::anyhow!("read pass channel closed"));
}
yield_if_needed().await;
} }
} }

View file

@ -173,8 +173,6 @@ pub async fn run_socket_worker(
config, config,
access_list, access_list,
in_message_senders, in_message_senders,
tq_prioritized,
tq_regular,
connection_valid_until, connection_valid_until,
out_message_sender, out_message_sender,
out_message_receiver, out_message_receiver,
@ -189,7 +187,7 @@ pub async fn run_socket_worker(
connection_handles.borrow_mut().remove(connection_id); connection_handles.borrow_mut().remove(connection_id);
}), }),
tq_prioritized, tq_regular,
) )
.unwrap() .unwrap()
.detach(); .detach();