From 49ed4371e7e7784e24aab28f9e6e0e4c46978342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 28 Oct 2021 01:23:43 +0200 Subject: [PATCH] Run cargo fmt, clean up imports --- aquatic_common/src/cpu_pinning.rs | 2 +- aquatic_common/src/privileges.rs | 11 +++-- aquatic_http/src/lib/config.rs | 2 +- aquatic_http/src/lib/glommio/common.rs | 9 ++-- aquatic_http/src/lib/glommio/mod.rs | 10 ++-- aquatic_http/src/lib/glommio/network.rs | 64 +++++++++++++----------- aquatic_http_load_test/src/main.rs | 16 +++--- aquatic_http_load_test/src/network.rs | 66 +++++++++++++++---------- aquatic_udp/src/lib/config.rs | 2 +- aquatic_udp/src/lib/glommio/common.rs | 7 ++- aquatic_udp/src/lib/glommio/mod.rs | 10 ++-- aquatic_udp/src/lib/lib.rs | 2 +- aquatic_udp/src/lib/mio/mod.rs | 7 ++- 13 files changed, 125 insertions(+), 83 deletions(-) diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index 386e52b..a13fc73 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,4 +1,4 @@ -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index 67b9e5b..a898969 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -1,7 +1,13 @@ -use std::{sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; use privdrop::PrivDrop; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -56,4 +62,3 @@ pub fn drop_privileges_after_socket_binding( Ok(()) } - diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index 33068a4..1e60db5 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -1,7 +1,7 @@ use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; diff --git a/aquatic_http/src/lib/glommio/common.rs b/aquatic_http/src/lib/glommio/common.rs index 7495165..3887b9f 100644 --- a/aquatic_http/src/lib/glommio/common.rs +++ b/aquatic_http/src/lib/glommio/common.rs @@ -1,7 +1,7 @@ use std::borrow::Borrow; use std::cell::RefCell; -use std::rc::Rc; use std::net::SocketAddr; +use std::rc::Rc; use aquatic_common::access_list::AccessList; use futures_lite::AsyncBufReadExt; @@ -66,7 +66,10 @@ impl ChannelResponse { } } -pub async fn update_access_list>(config: C, access_list: Rc>) { +pub async fn update_access_list>( + config: C, + access_list: Rc>, +) { if config.borrow().access_list.mode.is_on() { match BufferedFile::open(&config.borrow().access_list.path).await { Ok(file) => { @@ -104,5 +107,3 @@ pub async fn update_access_list>(config: C, access_list: Rc anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = - builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); } let executor = builder.spawn(|| async move { @@ -94,7 +93,12 @@ pub fn run(config: Config) -> anyhow::Result<()> { executors.push(executor); } - drop_privileges_after_socket_binding(&config.privileges, num_bound_sockets, config.socket_workers).unwrap(); + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); for executor in executors { executor diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 085d417..4efbab7 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -9,7 +9,7 @@ use std::time::Duration; use aquatic_common::access_list::AccessList; use aquatic_http_protocol::common::InfoHash; -use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError, ScrapeRequest}; +use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ FailureResponse, Response, ScrapeResponse, ScrapeStatistics, }; @@ -19,9 +19,9 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; -use glommio::{enclose, prelude::*}; use glommio::task::JoinHandle; use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; use rustls::ServerConnection; use slab::Slab; @@ -89,24 +89,26 @@ pub async fn run_socket_worker( })); // Periodically remove closed connections - TimerActionRepeat::repeat(enclose!((config, connection_slab, connections_to_remove) move || { - enclose!((config, connection_slab, connections_to_remove) move || async move { - let connections_to_remove = connections_to_remove.replace(Vec::new()); + TimerActionRepeat::repeat( + enclose!((config, connection_slab, connections_to_remove) move || { + enclose!((config, connection_slab, connections_to_remove) move || async move { + let connections_to_remove = connections_to_remove.replace(Vec::new()); - for connection_id in connections_to_remove { - if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { - ::log::debug!("removed connection with id {}", connection_id); - } else { - ::log::error!( - "couldn't remove connection with id {}, it is not in connection slab", - connection_id - ); + for connection_id in connections_to_remove { + if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { + ::log::debug!("removed connection with id {}", connection_id); + } else { + ::log::error!( + "couldn't remove connection with id {}, it is not in connection slab", + connection_id + ); + } } - } - Some(Duration::from_secs(config.cleaning.interval)) - })() - })); + Some(Duration::from_secs(config.cleaning.interval)) + })() + }), + ); for (_, response_receiver) in response_receivers.streams() { spawn_local(receive_responses( @@ -148,7 +150,8 @@ pub async fn run_socket_worker( } connections_to_remove.borrow_mut().push(key); - }).detach(); + }) + .detach(); let connection_reference = ConnectionReference { response_sender, @@ -188,15 +191,12 @@ impl Connection { match self.read_tls().await? { Some(Either::Left(request)) => { let response = match self.handle_request(request).await? { - Some(Either::Left(response)) => { - response - } + Some(Either::Left(response)) => response, Some(Either::Right(pending_scrape_response)) => { - self.wait_for_response(Some(pending_scrape_response)).await? - }, - None => { - self.wait_for_response(None).await? + self.wait_for_response(Some(pending_scrape_response)) + .await? } + None => self.wait_for_response(None).await?, }; self.queue_response(&response)?; @@ -257,7 +257,8 @@ impl Connection { if end > self.request_buffer.len() { return Err(anyhow::anyhow!("request too large")); } else { - let request_buffer_slice = &mut self.request_buffer[self.request_buffer_position..end]; + let request_buffer_slice = + &mut self.request_buffer[self.request_buffer_position..end]; request_buffer_slice.copy_from_slice(&buf[..amt]); @@ -341,7 +342,7 @@ impl Connection { /// relevant request workers, and return PendingScrapeResponse struct. async fn handle_request( &self, - request: Request + request: Request, ) -> anyhow::Result>> { let peer_addr = self.get_peer_addr()?; @@ -349,8 +350,11 @@ impl Connection { Request::Announce(request) => { let info_hash = request.info_hash; - if self.access_list.borrow().allows(self.config.access_list.mode, &info_hash.0) { - + if self + .access_list + .borrow() + .allows(self.config.access_list.mode, &info_hash.0) + { let request = ChannelRequest::Announce { request, connection_id: self.connection_id, @@ -417,7 +421,7 @@ impl Connection { /// return full response async fn wait_for_response( &self, - mut opt_pending_scrape_response: Option + mut opt_pending_scrape_response: Option, ) -> anyhow::Result { loop { if let Some(channel_response) = self.response_receiver.recv().await { diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index b0d3120..e719f77 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -61,9 +61,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { let tls_config = tls_config.clone(); let state = state.clone(); - LocalExecutorBuilder::default().spawn(|| async move { - run_socket_thread(config, tls_config, state).await.unwrap(); - }).unwrap(); + LocalExecutorBuilder::default() + .spawn(|| async move { + run_socket_thread(config, tls_config, state).await.unwrap(); + }) + .unwrap(); } monitor_statistics(state, &config); @@ -175,8 +177,10 @@ fn create_tls_config() -> anyhow::Result> { .with_safe_defaults() .with_root_certificates(rustls::RootCertStore::empty()) .with_no_client_auth(); - - config.dangerous().set_certificate_verifier(Arc::new(FakeCertificateVerifier)); - + + config + .dangerous() + .set_certificate_verifier(Arc::new(FakeCertificateVerifier)); + Ok(Arc::new(config)) } diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 5c38dc2..aaa15c0 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,10 +1,17 @@ -use std::{cell::RefCell, convert::TryInto, io::{Cursor, ErrorKind, Read}, rc::Rc, sync::{Arc, atomic::Ordering}, time::Duration}; +use std::{ + cell::RefCell, + convert::TryInto, + io::{Cursor, ErrorKind, Read}, + rc::Rc, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; use aquatic_http_protocol::response::Response; use futures_lite::{AsyncReadExt, AsyncWriteExt}; -use glommio::{prelude::*, timer::TimerActionRepeat}; use glommio::net::TcpStream; -use rand::{SeedableRng, prelude::SmallRng}; +use glommio::{prelude::*, timer::TimerActionRepeat}; +use rand::{prelude::SmallRng, SeedableRng}; use rustls::ClientConnection; use crate::{common::LoadTestState, config::Config, utils::create_random_request}; @@ -17,12 +24,14 @@ pub async fn run_socket_thread( let config = Rc::new(config); let num_active_connections = Rc::new(RefCell::new(0usize)); - TimerActionRepeat::repeat(move || periodically_open_connections( - config.clone(), - tls_config.clone(), - load_test_state.clone(), - num_active_connections.clone()) - ); + TimerActionRepeat::repeat(move || { + periodically_open_connections( + config.clone(), + tls_config.clone(), + load_test_state.clone(), + num_active_connections.clone(), + ) + }); futures_lite::future::pending::().await; @@ -37,10 +46,13 @@ async fn periodically_open_connections( ) -> Option { if *num_active_connections.borrow() < config.num_connections { spawn_local(async move { - if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await { + if let Err(err) = + Connection::run(config, tls_config, load_test_state, num_active_connections).await + { eprintln!("connection creation error: {:?}", err); } - }).detach(); + }) + .detach(); } Some(Duration::from_secs(1)) @@ -65,7 +77,8 @@ impl Connection { load_test_state: LoadTestState, num_active_connections: Rc>, ) -> anyhow::Result<()> { - let stream = TcpStream::connect(config.server_address).await + let stream = TcpStream::connect(config.server_address) + .await .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); let rng = SmallRng::from_entropy(); @@ -98,7 +111,8 @@ impl Connection { async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { if self.send_new_request { - let request = create_random_request(&self.config, &self.load_test_state, &mut self.rng); + let request = + create_random_request(&self.config, &self.load_test_state, &mut self.rng); request.write(&mut self.tls.writer())?; self.queued_responses += 1; @@ -121,8 +135,7 @@ impl Connection { return Err(anyhow::anyhow!("Peer has closed connection")); } - self - .load_test_state + self.load_test_state .statistics .bytes_received .fetch_add(bytes_read, Ordering::SeqCst); @@ -145,7 +158,8 @@ impl Connection { if end > self.response_buffer.len() { return Err(anyhow::anyhow!("response too large")); } else { - let response_buffer_slice = &mut self.response_buffer[self.response_buffer_position..end]; + let response_buffer_slice = + &mut self.response_buffer[self.response_buffer_position..end]; response_buffer_slice.copy_from_slice(&buf[..amt]); @@ -182,25 +196,21 @@ impl Connection { match Response::from_bytes(interesting_bytes) { Ok(response) => { - match response { Response::Announce(_) => { - self - .load_test_state + self.load_test_state .statistics .responses_announce .fetch_add(1, Ordering::SeqCst); } Response::Scrape(_) => { - self - .load_test_state + self.load_test_state .statistics .responses_scrape .fetch_add(1, Ordering::SeqCst); } Response::Failure(response) => { - self - .load_test_state + self.load_test_state .statistics .responses_failure .fetch_add(1, Ordering::SeqCst); @@ -253,18 +263,20 @@ impl Connection { self.stream.write_all(&buf.into_inner()).await?; self.stream.flush().await?; - self - .load_test_state + self.load_test_state .statistics .bytes_sent .fetch_add(len, Ordering::SeqCst); if self.queued_responses != 0 { - self.load_test_state.statistics.requests.fetch_add(self.queued_responses, Ordering::SeqCst); + self.load_test_state + .statistics + .requests + .fetch_add(self.queued_responses, Ordering::SeqCst); self.queued_responses = 0; } Ok(()) } -} \ No newline at end of file +} diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 2f1540a..dbda176 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; -use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 393038d..4b3d80d 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -9,7 +9,10 @@ use glommio::prelude::*; use crate::common::*; use crate::config::Config; -pub async fn update_access_list>(config: C, access_list: Rc>) { +pub async fn update_access_list>( + config: C, + access_list: Rc>, +) { if config.borrow().access_list.mode.is_on() { match BufferedFile::open(&config.borrow().access_list.path).await { Ok(file) => { @@ -46,4 +49,4 @@ pub async fn update_access_list>(config: C, access_list: Rc anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.cpu_pinning.active { - builder = - builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); } let executor = builder.spawn(|| async move { @@ -88,7 +87,12 @@ pub fn run(config: Config) -> anyhow::Result<()> { executors.push(executor); } - drop_privileges_after_socket_binding(&config.privileges, num_bound_sockets, config.socket_workers).unwrap(); + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); for executor in executors { executor diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index e7796d4..78676c2 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -27,4 +27,4 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { mio::run(config) } } -} \ No newline at end of file +} diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 3127ce2..1b8e656 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -35,7 +35,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - drop_privileges_after_socket_binding(&config.privileges, num_bound_sockets, config.socket_workers).unwrap(); + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval));