Run rustfmt

This commit is contained in:
Joakim Frostegård 2021-10-23 14:56:00 +02:00
parent 3355822422
commit 1564291471
8 changed files with 66 additions and 53 deletions

View file

@ -2,8 +2,8 @@ use std::net::SocketAddr;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use aquatic_common::extract_response_peers;
use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_common::extract_response_peers;
use crate::common::*; use crate::common::*;

View file

@ -11,8 +11,8 @@ use rand::prelude::SmallRng;
use rand::SeedableRng; use rand::SeedableRng;
use crate::common::handlers::handle_announce_request; use crate::common::handlers::handle_announce_request;
use crate::common::*;
use crate::common::handlers::*; use crate::common::handlers::*;
use crate::common::*;
use crate::config::Config; use crate::config::Config;
use crate::glommio::common::update_access_list; use crate::glommio::common::update_access_list;
@ -83,29 +83,29 @@ async fn handle_request_stream<S>(
while let Some((producer_index, request, addr)) = stream.next().await { while let Some((producer_index, request, addr)) = stream.next().await {
let response = match request { let response = match request {
ConnectedRequest::Announce(request) => { ConnectedRequest::Announce(request) => ConnectedResponse::Announce(match addr.ip() {
ConnectedResponse::Announce(match addr.ip() { IpAddr::V4(ip) => handle_announce_request(
IpAddr::V4(ip) => handle_announce_request( &config,
&config, &mut rng,
&mut rng, &mut torrents.borrow_mut().ipv4,
&mut torrents.borrow_mut().ipv4, request,
request, ip,
ip, peer_valid_until.borrow().to_owned(),
peer_valid_until.borrow().to_owned(), ),
), IpAddr::V6(ip) => handle_announce_request(
IpAddr::V6(ip) => handle_announce_request( &config,
&config, &mut rng,
&mut rng, &mut torrents.borrow_mut().ipv6,
&mut torrents.borrow_mut().ipv6, request,
request, ip,
ip, peer_valid_until.borrow().to_owned(),
peer_valid_until.borrow().to_owned(), ),
), }),
}) ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(handle_scrape_request(
} &mut torrents.borrow_mut(),
ConnectedRequest::Scrape(request) => { addr,
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.borrow_mut(), addr, request)) request,
} )),
}; };
::log::debug!("preparing to send response to channel: {:?}", response); ::log::debug!("preparing to send response to channel: {:?}", response);

View file

@ -39,7 +39,12 @@ struct PendingScrapeResponse {
struct PendingScrapeResponses(HashMap<TransactionId, PendingScrapeResponse>); struct PendingScrapeResponses(HashMap<TransactionId, PendingScrapeResponse>);
impl PendingScrapeResponses { impl PendingScrapeResponses {
fn prepare(&mut self, transaction_id: TransactionId, pending_worker_responses: usize, valid_until: ValidUntil) { fn prepare(
&mut self,
transaction_id: TransactionId,
pending_worker_responses: usize,
valid_until: ValidUntil,
) {
let pending = PendingScrapeResponse { let pending = PendingScrapeResponse {
pending_worker_responses, pending_worker_responses,
valid_until, valid_until,
@ -76,9 +81,7 @@ impl PendingScrapeResponses {
fn clean(&mut self) { fn clean(&mut self) {
let now = Instant::now(); let now = Instant::now();
self.0.retain(|_, v| { self.0.retain(|_, v| v.valid_until.0 > now);
v.valid_until.0 > now
});
self.0.shrink_to_fit(); self.0.shrink_to_fit();
} }
} }
@ -159,7 +162,8 @@ async fn read_requests(
let max_connection_age = config.cleaning.max_connection_age; let max_connection_age = config.cleaning.max_connection_age;
let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age)));
let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); let pending_scrape_valid_until =
Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT)));
let access_list = Rc::new(RefCell::new(access_list)); let access_list = Rc::new(RefCell::new(access_list));
let connections = Rc::new(RefCell::new(ConnectionMap::default())); let connections = Rc::new(RefCell::new(ConnectionMap::default()));
@ -236,7 +240,11 @@ async fn read_requests(
if let Err(err) = request_senders.try_send_to( if let Err(err) = request_senders.try_send_to(
request_consumer_index, request_consumer_index,
(response_consumer_index, ConnectedRequest::Announce(request), src), (
response_consumer_index,
ConnectedRequest::Announce(request),
src,
),
) { ) {
::log::warn!("request_sender.try_send failed: {:?}", err) ::log::warn!("request_sender.try_send failed: {:?}", err)
} }
@ -252,19 +260,19 @@ async fn read_requests(
} }
Ok(Request::Scrape(request)) => { Ok(Request::Scrape(request)) => {
if connections.borrow().contains(request.connection_id, src) { if connections.borrow().contains(request.connection_id, src) {
let mut consumer_requests: HashMap<usize, ScrapeRequest> = HashMap::new(); let mut consumer_requests: HashMap<usize, ScrapeRequest> =
HashMap::new();
for info_hash in request.info_hashes { for info_hash in request.info_hashes {
consumer_requests consumer_requests
.entry(calculate_request_consumer_index(&config, info_hash)) .entry(calculate_request_consumer_index(&config, info_hash))
.or_insert( .or_insert(ScrapeRequest {
ScrapeRequest { transaction_id: request.transaction_id,
transaction_id: request.transaction_id, connection_id: request.connection_id,
connection_id: request.connection_id, info_hashes: Vec::new(),
info_hashes: Vec::new(), })
} .info_hashes
) .push(info_hash);
.info_hashes.push(info_hash);
} }
pending_scrape_responses.borrow_mut().prepare( pending_scrape_responses.borrow_mut().prepare(
@ -276,7 +284,11 @@ async fn read_requests(
for (consumer_index, request) in consumer_requests { for (consumer_index, request) in consumer_requests {
if let Err(err) = request_senders.try_send_to( if let Err(err) = request_senders.try_send_to(
consumer_index, consumer_index,
(response_consumer_index, ConnectedRequest::Scrape(request), src), (
response_consumer_index,
ConnectedRequest::Scrape(request),
src,
),
) { ) {
::log::warn!("request_sender.try_send failed: {:?}", err) ::log::warn!("request_sender.try_send failed: {:?}", err)
} }
@ -326,12 +338,10 @@ async fn handle_shared_responses<S>(
while let Some((response, addr)) = stream.next().await { while let Some((response, addr)) = stream.next().await {
let opt_response = match response { let opt_response = match response {
ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)),
ConnectedResponse::Scrape(response) => { ConnectedResponse::Scrape(response) => pending_scrape_responses
pending_scrape_responses .borrow_mut()
.borrow_mut() .add_and_get_finished(response)
.add_and_get_finished(response) .map(|response| (Response::Scrape(response), addr)),
.map(|response| (Response::Scrape(response), addr))
},
}; };
if let Some((response, addr)) = opt_response { if let Some((response, addr)) = opt_response {

View file

@ -8,9 +8,9 @@ use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::common::handlers::handle_announce_request; use crate::common::handlers::handle_announce_request;
use crate::common::handlers::*;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
use crate::common::handlers::*;
#[inline] #[inline]
pub fn handle_announce_requests( pub fn handle_announce_requests(

View file

@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng};
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::config::Config;
use crate::common::handlers::*; use crate::common::handlers::*;
use crate::config::Config;
use crate::mio::common::*; use crate::mio::common::*;
mod announce; mod announce;
@ -75,7 +75,10 @@ pub fn run_request_worker(
); );
responses.extend(scrape_requests.drain(..).map(|(request, src)| { responses.extend(scrape_requests.drain(..).map(|(request, src)| {
(ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), src) (
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)),
src,
)
})); }));
} }

View file

@ -16,9 +16,9 @@ use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::{IpVersion, Request, Response}; use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::handlers::*;
use crate::common::network::ConnectionMap; use crate::common::network::ConnectionMap;
use crate::common::*; use crate::common::*;
use crate::common::handlers::*;
use crate::config::Config; use crate::config::Config;
use super::common::*; use super::common::*;

View file

@ -6,9 +6,9 @@ use indicatif::ProgressIterator;
use rand::Rng; use rand::Rng;
use rand_distr::Pareto; use rand_distr::Pareto;
use aquatic_udp::common::handlers::*;
use aquatic_udp::common::*; use aquatic_udp::common::*;
use aquatic_udp::config::Config; use aquatic_udp::config::Config;
use aquatic_udp::common::handlers::*;
use crate::common::*; use crate::common::*;
use crate::config::BenchConfig; use crate::config::BenchConfig;

View file

@ -6,9 +6,9 @@ use indicatif::ProgressIterator;
use rand::Rng; use rand::Rng;
use rand_distr::Pareto; use rand_distr::Pareto;
use aquatic_udp::common::handlers::*;
use aquatic_udp::common::*; use aquatic_udp::common::*;
use aquatic_udp::config::Config; use aquatic_udp::config::Config;
use aquatic_udp::common::handlers::*;
use crate::common::*; use crate::common::*;
use crate::config::BenchConfig; use crate::config::BenchConfig;