diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs index 7db5146..b65487a 100644 --- a/crates/http/src/workers/socket/connection.rs +++ b/crates/http/src/workers/socket/connection.rs @@ -16,11 +16,9 @@ use aquatic_http_protocol::response::{ use arc_swap::ArcSwap; use either::Either; use futures::stream::FuturesUnordered; -use futures_lite::future::race; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::Senders; -use glommio::channels::local_channel::LocalReceiver; use glommio::channels::shared_channel::{self, SharedReceiver}; use glommio::net::TcpStream; use once_cell::sync::Lazy; @@ -76,7 +74,6 @@ pub(super) async fn run_connection( server_start_instant: ServerStartInstant, opt_tls_config: Option>>, valid_until: Rc>, - close_conn_receiver: LocalReceiver<()>, stream: TcpStream, ) -> Result<(), ConnectionError> { let access_list_cache = create_access_list_cache(&access_list); @@ -119,7 +116,7 @@ pub(super) async fn run_connection( stream, }; - conn.run(close_conn_receiver).await?; + conn.run().await } else { let mut conn = Connection { config, @@ -135,10 +132,8 @@ pub(super) async fn run_connection( stream, }; - conn.run(close_conn_receiver).await?; + conn.run().await } - - Ok(()) } struct Connection { @@ -159,18 +154,7 @@ impl Connection where S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, { - async fn run(&mut self, close_conn_receiver: LocalReceiver<()>) -> Result<(), ConnectionError> { - let f1 = async { self.run_request_response_loop().await }; - let f2 = async { - close_conn_receiver.recv().await; - - Err(ConnectionError::Inactive) - }; - - race(f1, f2).await - } - - async fn run_request_response_loop(&mut self) -> Result<(), ConnectionError> { + async fn run(&mut self) -> Result<(), ConnectionError> { loop { let response = match self.read_request().await? { Either::Left(response) => Response::Failure(response), diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs index 0baa68b..9fab395 100644 --- a/crates/http/src/workers/socket/mod.rs +++ b/crates/http/src/workers/socket/mod.rs @@ -12,6 +12,7 @@ use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; use arc_swap::ArcSwap; +use futures_lite::future::race; use futures_lite::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; use glommio::channels::local_channel::{new_bounded, LocalSender}; @@ -97,16 +98,23 @@ pub async fn run_socket_worker( "worker_index" => worker_index.to_string(), ); - let result = run_connection( - config, - access_list, - request_senders, - server_start_instant, - opt_tls_config, - valid_until.clone(), - close_conn_receiver, - stream, - ).await; + let f1 = async { run_connection( + config, + access_list, + request_senders, + server_start_instant, + opt_tls_config, + valid_until.clone(), + stream, + ).await + }; + let f2 = async { + close_conn_receiver.recv().await; + + Err(ConnectionError::Inactive) + }; + + let result = race(f1, f2).await; #[cfg(feature = "metrics")] ::metrics::decrement_gauge!(