mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws: remove ineffective backpressure implementation
This commit is contained in:
parent
bcd8988ccd
commit
4c831643b1
1 changed files with 1 additions and 32 deletions
|
|
@ -17,7 +17,7 @@ use futures::{AsyncWriteExt, StreamExt};
|
|||
use futures_lite::future::race;
|
||||
use futures_rustls::TlsAcceptor;
|
||||
use glommio::channels::channel_mesh::Senders;
|
||||
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
|
||||
use glommio::channels::local_channel::{LocalReceiver, LocalSender};
|
||||
use glommio::net::TcpStream;
|
||||
use glommio::timer::timeout;
|
||||
use glommio::{enclose, prelude::*};
|
||||
|
|
@ -35,13 +35,6 @@ use crate::workers::socket::calculate_in_message_consumer_index;
|
|||
#[cfg(feature = "metrics")]
|
||||
use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX};
|
||||
|
||||
/// Length of ConnectionReader backpressure channel
|
||||
///
|
||||
/// ConnectionReader awaits a message in a channel before proceeding with
|
||||
/// reading a request. For each response sent, a message is sent to the
|
||||
/// channel, up to a maximum of this constant.
|
||||
const READ_PASS_CHANNEL_LEN: usize = 4;
|
||||
|
||||
pub struct ConnectionRunner {
|
||||
pub config: Rc<Config>,
|
||||
pub access_list: Arc<AccessListArcSwap>,
|
||||
|
|
@ -168,17 +161,6 @@ impl ConnectionRunner {
|
|||
let pending_scrape_slab = Rc::new(RefCell::new(Slab::new()));
|
||||
let access_list_cache = create_access_list_cache(&self.access_list);
|
||||
|
||||
let (read_pass_sender, read_pass_receiver) = new_bounded(READ_PASS_CHANNEL_LEN);
|
||||
|
||||
for _ in 0..READ_PASS_CHANNEL_LEN {
|
||||
if let Err(err) = read_pass_sender.try_send(()) {
|
||||
panic!(
|
||||
"couldn't add initial entries to read pass channel: {:#}",
|
||||
err
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
let config = self.config.clone();
|
||||
|
||||
let reader_future = enclose!((pending_scrape_slab, clean_up_data) async move {
|
||||
|
|
@ -187,7 +169,6 @@ impl ConnectionRunner {
|
|||
access_list_cache,
|
||||
in_message_senders: self.in_message_senders,
|
||||
out_message_sender: self.out_message_sender,
|
||||
read_pass_receiver,
|
||||
pending_scrape_slab,
|
||||
out_message_consumer_id: self.out_message_consumer_id,
|
||||
ws_in,
|
||||
|
|
@ -217,7 +198,6 @@ impl ConnectionRunner {
|
|||
let mut writer = ConnectionWriter {
|
||||
config,
|
||||
out_message_receiver: self.out_message_receiver,
|
||||
read_pass_sender,
|
||||
connection_valid_until: self.connection_valid_until,
|
||||
ws_out,
|
||||
pending_scrape_slab,
|
||||
|
|
@ -238,7 +218,6 @@ struct ConnectionReader<S> {
|
|||
access_list_cache: AccessListCache,
|
||||
in_message_senders: Rc<Senders<(InMessageMeta, InMessage)>>,
|
||||
out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
|
||||
read_pass_receiver: LocalReceiver<()>,
|
||||
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
||||
out_message_consumer_id: ConsumerId,
|
||||
ws_in: SplitStream<WebSocketStream<S>>,
|
||||
|
|
@ -254,11 +233,6 @@ struct ConnectionReader<S> {
|
|||
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
||||
async fn run_in_message_loop(&mut self) -> anyhow::Result<()> {
|
||||
loop {
|
||||
self.read_pass_receiver
|
||||
.recv()
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("read pass channel closed"))?;
|
||||
|
||||
let message = self
|
||||
.ws_in
|
||||
.next()
|
||||
|
|
@ -496,7 +470,6 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
|||
struct ConnectionWriter<S> {
|
||||
config: Rc<Config>,
|
||||
out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>,
|
||||
read_pass_sender: LocalSender<()>,
|
||||
connection_valid_until: Rc<RefCell<ValidUntil>>,
|
||||
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
|
||||
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
||||
|
|
@ -549,10 +522,6 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
|
|||
}
|
||||
};
|
||||
|
||||
if let Err(GlommioError::Closed(_)) = self.read_pass_sender.try_send(()) {
|
||||
return Err(anyhow::anyhow!("read pass channel closed"));
|
||||
}
|
||||
|
||||
yield_if_needed().await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue