ws: socket worker: wait for interal close message in priority queue

This commit is contained in:
Joakim Frostegård 2024-01-06 00:01:41 +01:00
parent 579fcb2140
commit 071f088d8b
2 changed files with 27 additions and 18 deletions

View file

@ -42,7 +42,6 @@ pub struct ConnectionRunner {
pub connection_valid_until: Rc<RefCell<ValidUntil>>, pub connection_valid_until: Rc<RefCell<ValidUntil>>,
pub out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>, pub out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, pub out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>,
pub close_conn_receiver: LocalReceiver<()>,
pub server_start_instant: ServerStartInstant, pub server_start_instant: ServerStartInstant,
pub out_message_consumer_id: ConsumerId, pub out_message_consumer_id: ConsumerId,
pub connection_id: ConnectionId, pub connection_id: ConnectionId,
@ -54,6 +53,7 @@ impl ConnectionRunner {
pub async fn run( pub async fn run(
self, self,
control_message_senders: Rc<Senders<SwarmControlMessage>>, control_message_senders: Rc<Senders<SwarmControlMessage>>,
close_conn_receiver: LocalReceiver<()>,
stream: TcpStream, stream: TcpStream,
) { ) {
let clean_up_data = ConnectionCleanupData { let clean_up_data = ConnectionCleanupData {
@ -65,14 +65,34 @@ impl ConnectionRunner {
clean_up_data.before_open(); clean_up_data.before_open();
let config = self.config.clone(); let config = self.config.clone();
let connection_id = self.connection_id.clone();
if let Err(err) = self.run_inner(clean_up_data.clone(), stream).await { let tq_regular = self.tq_regular;
::log::debug!("connection error: {:#}", err);
} let connection_future = spawn_local_into(
enclose!((
clean_up_data
) async move {
if let Err(err) = self.run_inner(clean_up_data, stream).await {
::log::debug!("connection {:?} error: {:#}", connection_id, err);
}
}),
tq_regular,
)
.unwrap();
race(connection_future, async {
close_conn_receiver.recv().await;
})
.await;
::log::debug!("connection {:?} starting clean up", connection_id);
clean_up_data clean_up_data
.after_close(&config, control_message_senders) .after_close(&config, control_message_senders)
.await; .await;
::log::debug!("connection {:?} finished clean up", connection_id);
} }
async fn run_inner( async fn run_inner(
@ -184,17 +204,7 @@ impl ConnectionRunner {
) )
.unwrap(); .unwrap();
let close_conn_future = spawn_local_into( race(reader_handle, writer_handle).await
async move {
self.close_conn_receiver.recv().await;
Ok(())
},
self.tq_prioritized,
)
.unwrap();
race(close_conn_future, race(reader_handle, writer_handle)).await
} }
} }

View file

@ -178,7 +178,6 @@ pub async fn run_socket_worker(
connection_valid_until, connection_valid_until,
out_message_sender, out_message_sender,
out_message_receiver, out_message_receiver,
close_conn_receiver,
server_start_instant, server_start_instant,
out_message_consumer_id, out_message_consumer_id,
connection_id, connection_id,
@ -186,11 +185,11 @@ pub async fn run_socket_worker(
ip_version ip_version
}; };
runner.run(control_message_senders, stream).await; runner.run(control_message_senders, close_conn_receiver, stream).await;
connection_handles.borrow_mut().remove(connection_id); connection_handles.borrow_mut().remove(connection_id);
}), }),
tq_regular, tq_prioritized,
) )
.unwrap() .unwrap()
.detach(); .detach();