diff --git a/TODO.md b/TODO.md index 927ff8b..79b4e4e 100644 --- a/TODO.md +++ b/TODO.md @@ -3,11 +3,7 @@ ## High priority * if peer_clients is on, add task to generate prometheus exports on regular - interval to clean up data. should more or less fix prometheus memory leak - -* ws - * try replacing race with futures::future::select - * bug report for glommio regarding memory leak + interval to clean up data * aquatic_bench * Opentracker "slow to get up to speed", is it due to getting faster once diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index bc4cd6c..fca6537 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -17,7 +17,7 @@ use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::{LocalReceiver, LocalSender}; +use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::net::TcpStream; use glommio::timer::timeout; use glommio::{enclose, prelude::*}; @@ -35,12 +35,17 @@ use crate::workers::socket::calculate_in_message_consumer_index; #[cfg(feature = "metrics")] use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; +/// Length of ConnectionReader backpressure channel +/// +/// ConnectionReader awaits a message in a channel before proceeding with +/// reading a request. For each response sent, a message is sent to the +/// channel, up to a maximum of this constant. +const READ_PASS_CHANNEL_LEN: usize = 4; + pub struct ConnectionRunner { pub config: Rc, pub access_list: Arc, pub in_message_senders: Rc>, - pub tq_prioritized: TaskQueueHandle, - pub tq_regular: TaskQueueHandle, pub connection_valid_until: Rc>, pub out_message_sender: Rc>, pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, @@ -75,23 +80,16 @@ impl ConnectionRunner { let config = self.config.clone(); let connection_id = self.connection_id.clone(); - let tq_regular = self.tq_regular; - - let connection_future = spawn_local_into( - enclose!(( - clean_up_data - ) async move { - if let Err(err) = self.run_inner(clean_up_data, stream).await { + race( + async { + if let Err(err) = self.run_inner(clean_up_data.clone(), stream).await { ::log::debug!("connection {:?} closed: {:#}", connection_id, err); } - }), - tq_regular, + }, + async { + close_conn_receiver.recv().await; + }, ) - .unwrap(); - - race(connection_future, async { - close_conn_receiver.recv().await; - }) .await; ::log::debug!("connection {:?} starting clean up", connection_id); @@ -165,69 +163,73 @@ impl ConnectionRunner { ..Default::default() }; let stream = async_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?; - let (ws_out, ws_in) = futures::StreamExt::split(stream); let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); let access_list_cache = create_access_list_cache(&self.access_list); + let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN); + + for _ in 0..READ_PASS_CHANNEL_LEN { + if let Err(err) = read_pass_sender.try_send(()) { + panic!( + "couldn't add initial entries to read pass channel: {:#}", + err + ) + }; + } + let config = self.config.clone(); - let reader_handle = spawn_local_into( - enclose!((pending_scrape_slab, clean_up_data) async move { - let mut reader = ConnectionReader { - config: self.config.clone(), - access_list_cache, - in_message_senders: self.in_message_senders, - out_message_sender: self.out_message_sender, - pending_scrape_slab, - out_message_consumer_id: self.out_message_consumer_id, - ws_in, - ip_version: self.ip_version, - connection_id: self.connection_id, - clean_up_data: clean_up_data.clone(), - #[cfg(feature = "metrics")] - total_announce_requests_counter: ::metrics::counter!( - "aquatic_requests_total", - "type" => "announce", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - #[cfg(feature = "metrics")] - total_scrape_requests_counter: ::metrics::counter!( - "aquatic_requests_total", - "type" => "scrape", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ) - }; + let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move { + let mut reader = ConnectionReader { + config: self.config.clone(), + access_list_cache, + in_message_senders: self.in_message_senders, + out_message_sender: self.out_message_sender, + read_pass_receiver, + pending_scrape_slab, + out_message_consumer_id: self.out_message_consumer_id, + ws_in, + ip_version: self.ip_version, + connection_id: self.connection_id, + clean_up_data: clean_up_data.clone(), + #[cfg(feature = "metrics")] + total_announce_requests_counter: ::metrics::counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ), + #[cfg(feature = "metrics")] + total_scrape_requests_counter: ::metrics::counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ) + }; - reader.run_in_message_loop().await - }), - self.tq_regular, - ) - .unwrap(); + reader.run_in_message_loop().await + }); - let writer_handle = spawn_local_into( - async move { - let mut writer = ConnectionWriter { - config, - out_message_receiver: self.out_message_receiver, - connection_valid_until: self.connection_valid_until, - ws_out, - pending_scrape_slab, - server_start_instant: self.server_start_instant, - ip_version: self.ip_version, - clean_up_data, - }; + let writer_future = async move { + let mut writer = ConnectionWriter { + config, + out_message_receiver: self.out_message_receiver, + read_pass_sender, + connection_valid_until: self.connection_valid_until, + ws_out, + pending_scrape_slab, + server_start_instant: self.server_start_instant, + ip_version: self.ip_version, + clean_up_data, + }; - writer.run_out_message_loop().await - }, - self.tq_prioritized, - ) - .unwrap(); + writer.run_out_message_loop().await + }; - race(reader_handle, writer_handle).await + race(reader_future, writer_future).await } } @@ -236,6 +238,7 @@ struct ConnectionReader { access_list_cache: AccessListCache, in_message_senders: Rc>, out_message_sender: Rc>, + read_pass_receiver: LocalReceiver<()>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, @@ -251,6 +254,11 @@ struct ConnectionReader { impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { + self.read_pass_receiver + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?; + let message = self .ws_in .next() @@ -488,6 +496,7 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, + read_pass_sender: LocalSender<()>, connection_valid_until: Rc>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, @@ -539,6 +548,12 @@ impl ConnectionWriter { self.send_out_message(&out_message).await?; } }; + + if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) { + return Err(anyhow::anyhow!("read pass channel closed")); + } + + yield_if_needed().await; } } diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index 492722c..eb284ad 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -173,8 +173,6 @@ pub async fn run_socket_worker( config, access_list, in_message_senders, - tq_prioritized, - tq_regular, connection_valid_until, out_message_sender, out_message_receiver, @@ -189,7 +187,7 @@ pub async fn run_socket_worker( connection_handles.borrow_mut().remove(connection_id); }), - tq_prioritized, + tq_regular, ) .unwrap() .detach();