Run cargo fmt

This commit is contained in:
Joakim Frostegård 2021-10-27 12:24:27 +02:00
parent 92fc427665
commit c9233726ab
2 changed files with 24 additions and 11 deletions

View file

@ -1,6 +1,9 @@
use std::net::SocketAddr;
use aquatic_http_protocol::{common::InfoHash, request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse, ScrapeStatistics}};
use aquatic_http_protocol::{
request::{AnnounceRequest, ScrapeRequest},
response::{AnnounceResponse, ScrapeResponse},
};
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);

View file

@ -8,7 +8,9 @@ use std::sync::Arc;
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{FailureResponse, Response, ScrapeResponse, ScrapeStatistics};
use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
};
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
@ -275,11 +277,13 @@ impl Connection {
peer_addr,
};
let consumer_index =
calculate_request_consumer_index(&self.config, info_hash);
let consumer_index = calculate_request_consumer_index(&self.config, info_hash);
// Only fails when receiver is closed
self.request_senders.send_to(consumer_index, request).await.unwrap();
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
@ -306,7 +310,10 @@ impl Connection {
};
// Only fails when receiver is closed
self.request_senders.send_to(consumer_index, request).await.unwrap();
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
}
}
}
@ -326,7 +333,7 @@ impl Connection {
ChannelResponse::Announce { response, .. } => {
break Response::Announce(response);
}
ChannelResponse::Scrape { response, .. } => {
ChannelResponse::Scrape { response, .. } => {
if let Some(mut pending) = self.pending_scrape_response.take() {
pending.stats.extend(response.files);
pending.pending_worker_responses -= 1;
@ -341,13 +348,17 @@ impl Connection {
self.pending_scrape_response = Some(pending);
}
} else {
return Err(anyhow::anyhow!("received channel scrape response without pending scrape response"));
return Err(anyhow::anyhow!(
"received channel scrape response without pending scrape response"
));
}
}
};
} else {
// TODO: this is a serious error condition and should maybe be handled differently
return Err(anyhow::anyhow!("response receiver can't receive - sender is closed"));
return Err(anyhow::anyhow!(
"response receiver can't receive - sender is closed"
));
}
};
@ -382,8 +393,7 @@ impl Connection {
}
fn get_peer_addr(&self) -> anyhow::Result<SocketAddr> {
self
.stream
self.stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))
}