From 76bd8951592ec83e281438c0dbc46428e3d264b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Nov 2021 09:52:08 +0100 Subject: [PATCH] http glommio: wait_for_response: panic if response sender is closed --- aquatic_http/src/lib/network.rs | 71 +++++++++++++++++---------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/aquatic_http/src/lib/network.rs b/aquatic_http/src/lib/network.rs index 2fecd6a..cebdeb9 100644 --- a/aquatic_http/src/lib/network.rs +++ b/aquatic_http/src/lib/network.rs @@ -281,7 +281,9 @@ impl Connection { Err(RequestParseError::NeedMoreData) => { ::log::debug!( "need more request data. current data: {:?}", - std::str::from_utf8(&self.request_buffer[..self.request_buffer_position]) + std::str::from_utf8( + &self.request_buffer[..self.request_buffer_position] + ) ); } } @@ -374,42 +376,41 @@ impl Connection { mut opt_pending_scrape_response: Option, ) -> anyhow::Result { loop { - if let Some(channel_response) = self.response_receiver.recv().await { - if channel_response.get_peer_addr() != self.peer_addr { - return Err(anyhow::anyhow!("peer addresses didn't match")); - } + let channel_response = self + .response_receiver + .recv() + .await + .expect("wait_for_response: can't receive response, sender is closed"); - match channel_response { - ChannelResponse::Announce { response, .. } => { - break Ok(Response::Announce(response)); - } - ChannelResponse::Scrape { response, .. } => { - if let Some(mut pending) = opt_pending_scrape_response.take() { - pending.stats.extend(response.files); - pending.pending_worker_responses -= 1; - - if pending.pending_worker_responses == 0 { - let response = Response::Scrape(ScrapeResponse { - files: pending.stats, - }); - - break Ok(response); - } else { - opt_pending_scrape_response = Some(pending); - } - } else { - return Err(anyhow::anyhow!( - "received channel scrape response without pending scrape response" - )); - } - } - }; - } else { - // TODO: this is a serious error condition and should maybe be handled differently - return Err(anyhow::anyhow!( - "response receiver can't receive - sender is closed" - )); + if channel_response.get_peer_addr() != self.peer_addr { + return Err(anyhow::anyhow!("peer addresses didn't match")); } + + match channel_response { + ChannelResponse::Announce { response, .. } => { + break Ok(Response::Announce(response)); + } + ChannelResponse::Scrape { response, .. } => { + if let Some(mut pending) = opt_pending_scrape_response.take() { + pending.stats.extend(response.files); + pending.pending_worker_responses -= 1; + + if pending.pending_worker_responses == 0 { + let response = Response::Scrape(ScrapeResponse { + files: pending.stats, + }); + + break Ok(response); + } else { + opt_pending_scrape_response = Some(pending); + } + } else { + return Err(anyhow::anyhow!( + "received channel scrape response without pending scrape response" + )); + } + } + }; } }