From 15642914716246cb1ac7603740385534ed298934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:56:00 +0200 Subject: [PATCH] Run rustfmt --- aquatic_udp/src/lib/common/handlers.rs | 2 +- aquatic_udp/src/lib/glommio/handlers.rs | 48 ++++++++--------- aquatic_udp/src/lib/glommio/network.rs | 54 ++++++++++++-------- aquatic_udp/src/lib/mio/handlers/announce.rs | 2 +- aquatic_udp/src/lib/mio/handlers/mod.rs | 7 ++- aquatic_udp/src/lib/mio/network.rs | 2 +- aquatic_udp_bench/src/announce.rs | 2 +- aquatic_udp_bench/src/scrape.rs | 2 +- 8 files changed, 66 insertions(+), 53 deletions(-) diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index 41d7728..ce6446f 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -2,8 +2,8 @@ use std::net::SocketAddr; use rand::rngs::SmallRng; -use aquatic_common::extract_response_peers; use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_common::extract_response_peers; use crate::common::*; diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 88ddc33..809fca0 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -11,8 +11,8 @@ use rand::prelude::SmallRng; use rand::SeedableRng; use crate::common::handlers::handle_announce_request; -use crate::common::*; use crate::common::handlers::*; +use crate::common::*; use crate::config::Config; use crate::glommio::common::update_access_list; @@ -83,29 +83,29 @@ async fn handle_request_stream( while let Some((producer_index, request, addr)) = stream.next().await { let response = match request { - ConnectedRequest::Announce(request) => { - ConnectedResponse::Announce(match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - }) - } - ConnectedRequest::Scrape(request) => { - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.borrow_mut(), addr, request)) - } + ConnectedRequest::Announce(request) => ConnectedResponse::Announce(match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv4, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv6, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + }), + ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(handle_scrape_request( + &mut torrents.borrow_mut(), + addr, + request, + )), }; ::log::debug!("preparing to send response to channel: {:?}", response); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index eabbec4..df35f00 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -39,7 +39,12 @@ struct PendingScrapeResponse { struct PendingScrapeResponses(HashMap); impl PendingScrapeResponses { - fn prepare(&mut self, transaction_id: TransactionId, pending_worker_responses: usize, valid_until: ValidUntil) { + fn prepare( + &mut self, + transaction_id: TransactionId, + pending_worker_responses: usize, + valid_until: ValidUntil, + ) { let pending = PendingScrapeResponse { pending_worker_responses, valid_until, @@ -76,9 +81,7 @@ impl PendingScrapeResponses { fn clean(&mut self) { let now = Instant::now(); - self.0.retain(|_, v| { - v.valid_until.0 > now - }); + self.0.retain(|_, v| v.valid_until.0 > now); self.0.shrink_to_fit(); } } @@ -159,7 +162,8 @@ async fn read_requests( let max_connection_age = config.cleaning.max_connection_age; let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); - let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); + let pending_scrape_valid_until = + Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); @@ -236,7 +240,11 @@ async fn read_requests( if let Err(err) = request_senders.try_send_to( request_consumer_index, - (response_consumer_index, ConnectedRequest::Announce(request), src), + ( + response_consumer_index, + ConnectedRequest::Announce(request), + src, + ), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -252,19 +260,19 @@ async fn read_requests( } Ok(Request::Scrape(request)) => { if connections.borrow().contains(request.connection_id, src) { - let mut consumer_requests: HashMap = HashMap::new(); + let mut consumer_requests: HashMap = + HashMap::new(); for info_hash in request.info_hashes { consumer_requests .entry(calculate_request_consumer_index(&config, info_hash)) - .or_insert( - ScrapeRequest { - transaction_id: request.transaction_id, - connection_id: request.connection_id, - info_hashes: Vec::new(), - } - ) - .info_hashes.push(info_hash); + .or_insert(ScrapeRequest { + transaction_id: request.transaction_id, + connection_id: request.connection_id, + info_hashes: Vec::new(), + }) + .info_hashes + .push(info_hash); } pending_scrape_responses.borrow_mut().prepare( @@ -276,7 +284,11 @@ async fn read_requests( for (consumer_index, request) in consumer_requests { if let Err(err) = request_senders.try_send_to( consumer_index, - (response_consumer_index, ConnectedRequest::Scrape(request), src), + ( + response_consumer_index, + ConnectedRequest::Scrape(request), + src, + ), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -326,12 +338,10 @@ async fn handle_shared_responses( while let Some((response, addr)) = stream.next().await { let opt_response = match response { ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), - ConnectedResponse::Scrape(response) => { - pending_scrape_responses - .borrow_mut() - .add_and_get_finished(response) - .map(|response| (Response::Scrape(response), addr)) - }, + ConnectedResponse::Scrape(response) => pending_scrape_responses + .borrow_mut() + .add_and_get_finished(response) + .map(|response| (Response::Scrape(response), addr)), }; if let Some((response, addr)) = opt_response { diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index b399e3b..11fb19a 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -8,9 +8,9 @@ use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; use crate::common::handlers::handle_announce_request; +use crate::common::handlers::*; use crate::common::*; use crate::config::Config; -use crate::common::handlers::*; #[inline] pub fn handle_announce_requests( diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs index 5afd71b..bd97161 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; -use crate::config::Config; use crate::common::handlers::*; +use crate::config::Config; use crate::mio::common::*; mod announce; @@ -75,7 +75,10 @@ pub fn run_request_worker( ); responses.extend(scrape_requests.drain(..).map(|(request, src)| { - (ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), src) + ( + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), + src, + ) })); } diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 1739dd7..a510fca 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -16,9 +16,9 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::handlers::*; use crate::common::network::ConnectionMap; use crate::common::*; -use crate::common::handlers::*; use crate::config::Config; use super::common::*; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index eb09c3a..5eac23d 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; +use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 450b65e..d5eee1c 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; +use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig;