aquatic_http: glommio: panic if request receiver channel is closed

This commit is contained in:
Joakim Frostegård 2021-10-27 12:22:01 +02:00
parent c02f8f228e
commit e7305114ad

View file

@ -144,7 +144,7 @@ impl Connection {
let opt_request = self.read_tls().await?;
if let Some(request) = opt_request {
self.handle_request(request)?;
self.handle_request(request).await?;
self.wait_for_and_send_response().await?;
}
@ -263,7 +263,7 @@ impl Connection {
}
/// Send on request to proper request worker/workers
fn handle_request(&mut self, request: Request) -> anyhow::Result<()> {
async fn handle_request(&mut self, request: Request) -> anyhow::Result<()> {
let peer_addr = self.get_peer_addr()?;
match request {
@ -278,12 +278,8 @@ impl Connection {
let consumer_index =
calculate_request_consumer_index(&self.config, info_hash);
if let Err(err) = self.request_senders.try_send_to(
consumer_index,
request,
) {
::log::warn!("request_sender.try_send failed: {:?}", err);
}
// Only fails when receiver is closed
self.request_senders.send_to(consumer_index, request).await.unwrap();
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
@ -309,12 +305,8 @@ impl Connection {
connection_id: self.connection_id,
};
if let Err(err) = self.request_senders.try_send_to(
consumer_index,
request,
) {
::log::warn!("request_sender.try_send failed: {:?}", err);
}
// Only fails when receiver is closed
self.request_senders.send_to(consumer_index, request).await.unwrap();
}
}
}