From 071f088d8bdbbf8f69d8456c35fdf4fc17ba2fa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 6 Jan 2024 00:01:41 +0100 Subject: [PATCH] ws: socket worker: wait for interal close message in priority queue --- crates/ws/src/workers/socket/connection.rs | 40 ++++++++++++++-------- crates/ws/src/workers/socket/mod.rs | 5 ++- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index 67f569c..e620ba4 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -42,7 +42,6 @@ pub struct ConnectionRunner { pub connection_valid_until: Rc>, pub out_message_sender: Rc>, pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, - pub close_conn_receiver: LocalReceiver<()>, pub server_start_instant: ServerStartInstant, pub out_message_consumer_id: ConsumerId, pub connection_id: ConnectionId, @@ -54,6 +53,7 @@ impl ConnectionRunner { pub async fn run( self, control_message_senders: Rc>, + close_conn_receiver: LocalReceiver<()>, stream: TcpStream, ) { let clean_up_data = ConnectionCleanupData { @@ -65,14 +65,34 @@ impl ConnectionRunner { clean_up_data.before_open(); let config = self.config.clone(); + let connection_id = self.connection_id.clone(); - if let Err(err) = self.run_inner(clean_up_data.clone(), stream).await { - ::log::debug!("connection error: {:#}", err); - } + let tq_regular = self.tq_regular; + + let connection_future = spawn_local_into( + enclose!(( + clean_up_data + ) async move { + if let Err(err) = self.run_inner(clean_up_data, stream).await { + ::log::debug!("connection {:?} error: {:#}", connection_id, err); + } + }), + tq_regular, + ) + .unwrap(); + + race(connection_future, async { + close_conn_receiver.recv().await; + }) + .await; + + ::log::debug!("connection {:?} starting clean up", connection_id); clean_up_data .after_close(&config, control_message_senders) .await; + + ::log::debug!("connection {:?} finished clean up", connection_id); } async fn run_inner( @@ -184,17 +204,7 @@ impl ConnectionRunner { ) .unwrap(); - let close_conn_future = spawn_local_into( - async move { - self.close_conn_receiver.recv().await; - - Ok(()) - }, - self.tq_prioritized, - ) - .unwrap(); - - race(close_conn_future, race(reader_handle, writer_handle)).await + race(reader_handle, writer_handle).await } } diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs index 7056747..cedb7d2 100644 --- a/crates/ws/src/workers/socket/mod.rs +++ b/crates/ws/src/workers/socket/mod.rs @@ -178,7 +178,6 @@ pub async fn run_socket_worker( connection_valid_until, out_message_sender, out_message_receiver, - close_conn_receiver, server_start_instant, out_message_consumer_id, connection_id, @@ -186,11 +185,11 @@ pub async fn run_socket_worker( ip_version }; - runner.run(control_message_senders, stream).await; + runner.run(control_message_senders, close_conn_receiver, stream).await; connection_handles.borrow_mut().remove(connection_id); }), - tq_regular, + tq_prioritized, ) .unwrap() .detach();