http glommio: wait_for_response: panic if response sender is closed

This commit is contained in:
Joakim Frostegård 2021-11-01 09:52:08 +01:00
parent 90d560c307
commit 76bd895159

View file

@ -281,7 +281,9 @@ impl Connection {
Err(RequestParseError::NeedMoreData) => { Err(RequestParseError::NeedMoreData) => {
::log::debug!( ::log::debug!(
"need more request data. current data: {:?}", "need more request data. current data: {:?}",
std::str::from_utf8(&self.request_buffer[..self.request_buffer_position]) std::str::from_utf8(
&self.request_buffer[..self.request_buffer_position]
)
); );
} }
} }
@ -374,42 +376,41 @@ impl Connection {
mut opt_pending_scrape_response: Option<PendingScrapeResponse>, mut opt_pending_scrape_response: Option<PendingScrapeResponse>,
) -> anyhow::Result<Response> { ) -> anyhow::Result<Response> {
loop { loop {
if let Some(channel_response) = self.response_receiver.recv().await { let channel_response = self
if channel_response.get_peer_addr() != self.peer_addr { .response_receiver
return Err(anyhow::anyhow!("peer addresses didn't match")); .recv()
} .await
.expect("wait_for_response: can't receive response, sender is closed");
match channel_response { if channel_response.get_peer_addr() != self.peer_addr {
ChannelResponse::Announce { response, .. } => { return Err(anyhow::anyhow!("peer addresses didn't match"));
break Ok(Response::Announce(response));
}
ChannelResponse::Scrape { response, .. } => {
if let Some(mut pending) = opt_pending_scrape_response.take() {
pending.stats.extend(response.files);
pending.pending_worker_responses -= 1;
if pending.pending_worker_responses == 0 {
let response = Response::Scrape(ScrapeResponse {
files: pending.stats,
});
break Ok(response);
} else {
opt_pending_scrape_response = Some(pending);
}
} else {
return Err(anyhow::anyhow!(
"received channel scrape response without pending scrape response"
));
}
}
};
} else {
// TODO: this is a serious error condition and should maybe be handled differently
return Err(anyhow::anyhow!(
"response receiver can't receive - sender is closed"
));
} }
match channel_response {
ChannelResponse::Announce { response, .. } => {
break Ok(Response::Announce(response));
}
ChannelResponse::Scrape { response, .. } => {
if let Some(mut pending) = opt_pending_scrape_response.take() {
pending.stats.extend(response.files);
pending.pending_worker_responses -= 1;
if pending.pending_worker_responses == 0 {
let response = Response::Scrape(ScrapeResponse {
files: pending.stats,
});
break Ok(response);
} else {
opt_pending_scrape_response = Some(pending);
}
} else {
return Err(anyhow::anyhow!(
"received channel scrape response without pending scrape response"
));
}
}
};
} }
} }