From 786642f486a3eee2d8159bb332ce0d34e563df82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Nov 2021 19:20:49 +0100 Subject: [PATCH] aquatic_ws: in handlers, collect out messages in Vec for sending --- aquatic_ws/src/lib/handlers.rs | 92 ++++++++++++++++------------------ 1 file changed, 42 insertions(+), 50 deletions(-) diff --git a/aquatic_ws/src/lib/handlers.rs b/aquatic_ws/src/lib/handlers.rs index fccd51b..b7bd88f 100644 --- a/aquatic_ws/src/lib/handlers.rs +++ b/aquatic_ws/src/lib/handlers.rs @@ -82,41 +82,45 @@ async fn handle_request_stream( })() })); + let mut out_messages = Vec::new(); + while let Some((meta, in_message)) = stream.next().await { match in_message { - InMessage::AnnounceRequest(request) => { - handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut(), - &out_message_senders, - peer_valid_until.borrow().to_owned(), - meta, - request, - ) - .await; - } - InMessage::ScrapeRequest(request) => { - handle_scrape_request( - &config, - &mut torrents.borrow_mut(), - &out_message_senders, - meta, - request, - ) - .await; - } + InMessage::AnnounceRequest(request) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + &mut out_messages, + peer_valid_until.borrow().to_owned(), + meta, + request, + ), + InMessage::ScrapeRequest(request) => handle_scrape_request( + &config, + &mut torrents.borrow_mut(), + &mut out_messages, + meta, + request, + ), }; + for (meta, out_message) in out_messages.drain(..) { + if let Err(err) = + out_message_senders.try_send_to(meta.out_message_consumer_id.0, (meta, out_message)) + { + ::log::error!("failed sending out_message to socket worker: {:?}", err) + } + } + yield_if_needed().await; } } -pub async fn handle_announce_request( +pub fn handle_announce_request( config: &Config, rng: &mut impl Rng, torrent_maps: &mut TorrentMaps, - out_message_senders: &Rc>, + out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, valid_until: ValidUntil, request_sender_meta: ConnectionMeta, request: AnnounceRequest, @@ -206,15 +210,12 @@ pub async fn handle_announce_request( offer_id: offer.offer_id, }; - out_message_senders.try_send_to( - offer_receiver.connection_meta.out_message_consumer_id.0, - ( - offer_receiver.connection_meta, - OutMessage::Offer(middleman_offer), - ), - ); + out_messages.push(( + offer_receiver.connection_meta, + OutMessage::Offer(middleman_offer), + )); ::log::trace!( - "sent middleman offer to {:?}", + "sending middleman offer to {:?}", offer_receiver.connection_meta ); } @@ -233,15 +234,12 @@ pub async fn handle_announce_request( offer_id, }; - out_message_senders.try_send_to( - answer_receiver.connection_meta.out_message_consumer_id.0, - ( - answer_receiver.connection_meta, - OutMessage::Answer(middleman_answer), - ), - ); + out_messages.push(( + answer_receiver.connection_meta, + OutMessage::Answer(middleman_answer), + )); ::log::trace!( - "sent middleman answer to {:?}", + "sending middleman answer to {:?}", answer_receiver.connection_meta ); } @@ -255,16 +253,13 @@ pub async fn handle_announce_request( announce_interval: config.protocol.peer_announce_interval, }); - out_message_senders.send_to( - request_sender_meta.out_message_consumer_id.0, - (request_sender_meta, out_message), - ); + out_messages.push((request_sender_meta, out_message)); } -pub async fn handle_scrape_request( +pub fn handle_scrape_request( config: &Config, torrent_maps: &mut TorrentMaps, - out_message_senders: &Rc>, + out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, meta: ConnectionMeta, request: ScrapeRequest, ) { @@ -301,8 +296,5 @@ pub async fn handle_scrape_request( } } - out_message_senders.try_send_to( - meta.out_message_consumer_id.0, - (meta, OutMessage::ScrapeResponse(out_message)), - ); + out_messages.push((meta, OutMessage::ScrapeResponse(out_message))); }