aquatic_ws: in handlers, collect out messages in Vec for sending

This commit is contained in:
Joakim Frostegård 2021-11-01 19:20:49 +01:00
parent 24bfaf67c0
commit 786642f486

View file

@ -82,41 +82,45 @@ async fn handle_request_stream<S>(
})() })()
})); }));
let mut out_messages = Vec::new();
while let Some((meta, in_message)) = stream.next().await { while let Some((meta, in_message)) = stream.next().await {
match in_message { match in_message {
InMessage::AnnounceRequest(request) => { InMessage::AnnounceRequest(request) => handle_announce_request(
handle_announce_request( &config,
&config, &mut rng,
&mut rng, &mut torrents.borrow_mut(),
&mut torrents.borrow_mut(), &mut out_messages,
&out_message_senders, peer_valid_until.borrow().to_owned(),
peer_valid_until.borrow().to_owned(), meta,
meta, request,
request, ),
) InMessage::ScrapeRequest(request) => handle_scrape_request(
.await; &config,
} &mut torrents.borrow_mut(),
InMessage::ScrapeRequest(request) => { &mut out_messages,
handle_scrape_request( meta,
&config, request,
&mut torrents.borrow_mut(), ),
&out_message_senders,
meta,
request,
)
.await;
}
}; };
for (meta, out_message) in out_messages.drain(..) {
if let Err(err) =
out_message_senders.try_send_to(meta.out_message_consumer_id.0, (meta, out_message))
{
::log::error!("failed sending out_message to socket worker: {:?}", err)
}
}
yield_if_needed().await; yield_if_needed().await;
} }
} }
pub async fn handle_announce_request( pub fn handle_announce_request(
config: &Config, config: &Config,
rng: &mut impl Rng, rng: &mut impl Rng,
torrent_maps: &mut TorrentMaps, torrent_maps: &mut TorrentMaps,
out_message_senders: &Rc<Senders<(ConnectionMeta, OutMessage)>>, out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
valid_until: ValidUntil, valid_until: ValidUntil,
request_sender_meta: ConnectionMeta, request_sender_meta: ConnectionMeta,
request: AnnounceRequest, request: AnnounceRequest,
@ -206,15 +210,12 @@ pub async fn handle_announce_request(
offer_id: offer.offer_id, offer_id: offer.offer_id,
}; };
out_message_senders.try_send_to( out_messages.push((
offer_receiver.connection_meta.out_message_consumer_id.0, offer_receiver.connection_meta,
( OutMessage::Offer(middleman_offer),
offer_receiver.connection_meta, ));
OutMessage::Offer(middleman_offer),
),
);
::log::trace!( ::log::trace!(
"sent middleman offer to {:?}", "sending middleman offer to {:?}",
offer_receiver.connection_meta offer_receiver.connection_meta
); );
} }
@ -233,15 +234,12 @@ pub async fn handle_announce_request(
offer_id, offer_id,
}; };
out_message_senders.try_send_to( out_messages.push((
answer_receiver.connection_meta.out_message_consumer_id.0, answer_receiver.connection_meta,
( OutMessage::Answer(middleman_answer),
answer_receiver.connection_meta, ));
OutMessage::Answer(middleman_answer),
),
);
::log::trace!( ::log::trace!(
"sent middleman answer to {:?}", "sending middleman answer to {:?}",
answer_receiver.connection_meta answer_receiver.connection_meta
); );
} }
@ -255,16 +253,13 @@ pub async fn handle_announce_request(
announce_interval: config.protocol.peer_announce_interval, announce_interval: config.protocol.peer_announce_interval,
}); });
out_message_senders.send_to( out_messages.push((request_sender_meta, out_message));
request_sender_meta.out_message_consumer_id.0,
(request_sender_meta, out_message),
);
} }
pub async fn handle_scrape_request( pub fn handle_scrape_request(
config: &Config, config: &Config,
torrent_maps: &mut TorrentMaps, torrent_maps: &mut TorrentMaps,
out_message_senders: &Rc<Senders<(ConnectionMeta, OutMessage)>>, out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
meta: ConnectionMeta, meta: ConnectionMeta,
request: ScrapeRequest, request: ScrapeRequest,
) { ) {
@ -301,8 +296,5 @@ pub async fn handle_scrape_request(
} }
} }
out_message_senders.try_send_to( out_messages.push((meta, OutMessage::ScrapeResponse(out_message)));
meta.out_message_consumer_id.0,
(meta, OutMessage::ScrapeResponse(out_message)),
);
} }