diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index fca6537..74908e9 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -17,7 +17,7 @@ use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::channels::local_channel::{LocalReceiver, LocalSender}; use glommio::net::TcpStream; use glommio::timer::timeout; use glommio::{enclose, prelude::*}; @@ -35,13 +35,6 @@ use crate::workers::socket::calculate_in_message_consumer_index; #[cfg(feature = "metrics")] use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX}; -/// Length of ConnectionReader backpressure channel -/// -/// ConnectionReader awaits a message in a channel before proceeding with -/// reading a request. For each response sent, a message is sent to the -/// channel, up to a maximum of this constant. -const READ_PASS_CHANNEL_LEN: usize = 4; - pub struct ConnectionRunner { pub config: Rc, pub access_list: Arc, @@ -168,17 +161,6 @@ impl ConnectionRunner { let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); let access_list_cache = create_access_list_cache(&self.access_list); - let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN); - - for _ in 0..READ_PASS_CHANNEL_LEN { - if let Err(err) = read_pass_sender.try_send(()) { - panic!( - "couldn't add initial entries to read pass channel: {:#}", - err - ) - }; - } - let config = self.config.clone(); let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move { @@ -187,7 +169,6 @@ impl ConnectionRunner { access_list_cache, in_message_senders: self.in_message_senders, out_message_sender: self.out_message_sender, - read_pass_receiver, pending_scrape_slab, out_message_consumer_id: self.out_message_consumer_id, ws_in, @@ -217,7 +198,6 @@ impl ConnectionRunner { let mut writer = ConnectionWriter { config, out_message_receiver: self.out_message_receiver, - read_pass_sender, connection_valid_until: self.connection_valid_until, ws_out, pending_scrape_slab, @@ -238,7 +218,6 @@ struct ConnectionReader { access_list_cache: AccessListCache, in_message_senders: Rc>, out_message_sender: Rc>, - read_pass_receiver: LocalReceiver<()>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, @@ -254,11 +233,6 @@ struct ConnectionReader { impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { - self.read_pass_receiver - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?; - let message = self .ws_in .next() @@ -496,7 +470,6 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, - read_pass_sender: LocalSender<()>, connection_valid_until: Rc>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, @@ -549,10 +522,6 @@ impl ConnectionWriter { } }; - if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) { - return Err(anyhow::anyhow!("read pass channel closed")); - } - yield_if_needed().await; } }