Run cargo fmt, clean up imports

This commit is contained in:
Joakim Frostegård 2021-10-28 01:23:43 +02:00
parent 80447d9d1b
commit 49ed4371e7
13 changed files with 125 additions and 83 deletions

View file

@ -1,4 +1,4 @@
use serde::{Serialize, Deserialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)] #[serde(default)]

View file

@ -1,7 +1,13 @@
use std::{sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration}; use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use privdrop::PrivDrop; use privdrop::PrivDrop;
use serde::{Serialize, Deserialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)] #[serde(default)]
@ -56,4 +62,3 @@ pub fn drop_privileges_after_socket_binding(
Ok(()) Ok(())
} }

View file

@ -1,7 +1,7 @@
use std::{net::SocketAddr, path::PathBuf}; use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use aquatic_common::cpu_pinning::CpuPinningConfig; use aquatic_common::cpu_pinning::CpuPinningConfig;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use aquatic_cli_helpers::LogLevel; use aquatic_cli_helpers::LogLevel;

View file

@ -1,7 +1,7 @@
use std::borrow::Borrow; use std::borrow::Borrow;
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::rc::Rc;
use aquatic_common::access_list::AccessList; use aquatic_common::access_list::AccessList;
use futures_lite::AsyncBufReadExt; use futures_lite::AsyncBufReadExt;
@ -66,7 +66,10 @@ impl ChannelResponse {
} }
} }
pub async fn update_access_list<C: Borrow<Config>>(config: C, access_list: Rc<RefCell<AccessList>>) { pub async fn update_access_list<C: Borrow<Config>>(
config: C,
access_list: Rc<RefCell<AccessList>>,
) {
if config.borrow().access_list.mode.is_on() { if config.borrow().access_list.mode.is_on() {
match BufferedFile::open(&config.borrow().access_list.path).await { match BufferedFile::open(&config.borrow().access_list.path).await {
Ok(file) => { Ok(file) => {
@ -104,5 +107,3 @@ pub async fn update_access_list<C: Borrow<Config>>(config: C, access_list: Rc<Re
}; };
} }
} }

View file

@ -77,8 +77,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut builder = LocalExecutorBuilder::default(); let mut builder = LocalExecutorBuilder::default();
if config.cpu_pinning.active { if config.cpu_pinning.active {
builder = builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
} }
let executor = builder.spawn(|| async move { let executor = builder.spawn(|| async move {
@ -94,7 +93,12 @@ pub fn run(config: Config) -> anyhow::Result<()> {
executors.push(executor); executors.push(executor);
} }
drop_privileges_after_socket_binding(&config.privileges, num_bound_sockets, config.socket_workers).unwrap(); drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
for executor in executors { for executor in executors {
executor executor

View file

@ -9,7 +9,7 @@ use std::time::Duration;
use aquatic_common::access_list::AccessList; use aquatic_common::access_list::AccessList;
use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{ use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics, FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
}; };
@ -19,9 +19,9 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::ConnectedReceiver; use glommio::channels::shared_channel::ConnectedReceiver;
use glommio::net::{TcpListener, TcpStream}; use glommio::net::{TcpListener, TcpStream};
use glommio::{enclose, prelude::*};
use glommio::task::JoinHandle; use glommio::task::JoinHandle;
use glommio::timer::TimerActionRepeat; use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use rustls::ServerConnection; use rustls::ServerConnection;
use slab::Slab; use slab::Slab;
@ -89,24 +89,26 @@ pub async fn run_socket_worker(
})); }));
// Periodically remove closed connections // Periodically remove closed connections
TimerActionRepeat::repeat(enclose!((config, connection_slab, connections_to_remove) move || { TimerActionRepeat::repeat(
enclose!((config, connection_slab, connections_to_remove) move || async move { enclose!((config, connection_slab, connections_to_remove) move || {
let connections_to_remove = connections_to_remove.replace(Vec::new()); enclose!((config, connection_slab, connections_to_remove) move || async move {
let connections_to_remove = connections_to_remove.replace(Vec::new());
for connection_id in connections_to_remove { for connection_id in connections_to_remove {
if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) {
::log::debug!("removed connection with id {}", connection_id); ::log::debug!("removed connection with id {}", connection_id);
} else { } else {
::log::error!( ::log::error!(
"couldn't remove connection with id {}, it is not in connection slab", "couldn't remove connection with id {}, it is not in connection slab",
connection_id connection_id
); );
}
} }
}
Some(Duration::from_secs(config.cleaning.interval)) Some(Duration::from_secs(config.cleaning.interval))
})() })()
})); }),
);
for (_, response_receiver) in response_receivers.streams() { for (_, response_receiver) in response_receivers.streams() {
spawn_local(receive_responses( spawn_local(receive_responses(
@ -148,7 +150,8 @@ pub async fn run_socket_worker(
} }
connections_to_remove.borrow_mut().push(key); connections_to_remove.borrow_mut().push(key);
}).detach(); })
.detach();
let connection_reference = ConnectionReference { let connection_reference = ConnectionReference {
response_sender, response_sender,
@ -188,15 +191,12 @@ impl Connection {
match self.read_tls().await? { match self.read_tls().await? {
Some(Either::Left(request)) => { Some(Either::Left(request)) => {
let response = match self.handle_request(request).await? { let response = match self.handle_request(request).await? {
Some(Either::Left(response)) => { Some(Either::Left(response)) => response,
response
}
Some(Either::Right(pending_scrape_response)) => { Some(Either::Right(pending_scrape_response)) => {
self.wait_for_response(Some(pending_scrape_response)).await? self.wait_for_response(Some(pending_scrape_response))
}, .await?
None => {
self.wait_for_response(None).await?
} }
None => self.wait_for_response(None).await?,
}; };
self.queue_response(&response)?; self.queue_response(&response)?;
@ -257,7 +257,8 @@ impl Connection {
if end > self.request_buffer.len() { if end > self.request_buffer.len() {
return Err(anyhow::anyhow!("request too large")); return Err(anyhow::anyhow!("request too large"));
} else { } else {
let request_buffer_slice = &mut self.request_buffer[self.request_buffer_position..end]; let request_buffer_slice =
&mut self.request_buffer[self.request_buffer_position..end];
request_buffer_slice.copy_from_slice(&buf[..amt]); request_buffer_slice.copy_from_slice(&buf[..amt]);
@ -341,7 +342,7 @@ impl Connection {
/// relevant request workers, and return PendingScrapeResponse struct. /// relevant request workers, and return PendingScrapeResponse struct.
async fn handle_request( async fn handle_request(
&self, &self,
request: Request request: Request,
) -> anyhow::Result<Option<Either<Response, PendingScrapeResponse>>> { ) -> anyhow::Result<Option<Either<Response, PendingScrapeResponse>>> {
let peer_addr = self.get_peer_addr()?; let peer_addr = self.get_peer_addr()?;
@ -349,8 +350,11 @@ impl Connection {
Request::Announce(request) => { Request::Announce(request) => {
let info_hash = request.info_hash; let info_hash = request.info_hash;
if self.access_list.borrow().allows(self.config.access_list.mode, &info_hash.0) { if self
.access_list
.borrow()
.allows(self.config.access_list.mode, &info_hash.0)
{
let request = ChannelRequest::Announce { let request = ChannelRequest::Announce {
request, request,
connection_id: self.connection_id, connection_id: self.connection_id,
@ -417,7 +421,7 @@ impl Connection {
/// return full response /// return full response
async fn wait_for_response( async fn wait_for_response(
&self, &self,
mut opt_pending_scrape_response: Option<PendingScrapeResponse> mut opt_pending_scrape_response: Option<PendingScrapeResponse>,
) -> anyhow::Result<Response> { ) -> anyhow::Result<Response> {
loop { loop {
if let Some(channel_response) = self.response_receiver.recv().await { if let Some(channel_response) = self.response_receiver.recv().await {

View file

@ -61,9 +61,11 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let tls_config = tls_config.clone(); let tls_config = tls_config.clone();
let state = state.clone(); let state = state.clone();
LocalExecutorBuilder::default().spawn(|| async move { LocalExecutorBuilder::default()
run_socket_thread(config, tls_config, state).await.unwrap(); .spawn(|| async move {
}).unwrap(); run_socket_thread(config, tls_config, state).await.unwrap();
})
.unwrap();
} }
monitor_statistics(state, &config); monitor_statistics(state, &config);
@ -175,8 +177,10 @@ fn create_tls_config() -> anyhow::Result<Arc<rustls::ClientConfig>> {
.with_safe_defaults() .with_safe_defaults()
.with_root_certificates(rustls::RootCertStore::empty()) .with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth(); .with_no_client_auth();
config.dangerous().set_certificate_verifier(Arc::new(FakeCertificateVerifier)); config
.dangerous()
.set_certificate_verifier(Arc::new(FakeCertificateVerifier));
Ok(Arc::new(config)) Ok(Arc::new(config))
} }

View file

@ -1,10 +1,17 @@
use std::{cell::RefCell, convert::TryInto, io::{Cursor, ErrorKind, Read}, rc::Rc, sync::{Arc, atomic::Ordering}, time::Duration}; use std::{
cell::RefCell,
convert::TryInto,
io::{Cursor, ErrorKind, Read},
rc::Rc,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use aquatic_http_protocol::response::Response; use aquatic_http_protocol::response::Response;
use futures_lite::{AsyncReadExt, AsyncWriteExt}; use futures_lite::{AsyncReadExt, AsyncWriteExt};
use glommio::{prelude::*, timer::TimerActionRepeat};
use glommio::net::TcpStream; use glommio::net::TcpStream;
use rand::{SeedableRng, prelude::SmallRng}; use glommio::{prelude::*, timer::TimerActionRepeat};
use rand::{prelude::SmallRng, SeedableRng};
use rustls::ClientConnection; use rustls::ClientConnection;
use crate::{common::LoadTestState, config::Config, utils::create_random_request}; use crate::{common::LoadTestState, config::Config, utils::create_random_request};
@ -17,12 +24,14 @@ pub async fn run_socket_thread(
let config = Rc::new(config); let config = Rc::new(config);
let num_active_connections = Rc::new(RefCell::new(0usize)); let num_active_connections = Rc::new(RefCell::new(0usize));
TimerActionRepeat::repeat(move || periodically_open_connections( TimerActionRepeat::repeat(move || {
config.clone(), periodically_open_connections(
tls_config.clone(), config.clone(),
load_test_state.clone(), tls_config.clone(),
num_active_connections.clone()) load_test_state.clone(),
); num_active_connections.clone(),
)
});
futures_lite::future::pending::<bool>().await; futures_lite::future::pending::<bool>().await;
@ -37,10 +46,13 @@ async fn periodically_open_connections(
) -> Option<Duration> { ) -> Option<Duration> {
if *num_active_connections.borrow() < config.num_connections { if *num_active_connections.borrow() < config.num_connections {
spawn_local(async move { spawn_local(async move {
if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await { if let Err(err) =
Connection::run(config, tls_config, load_test_state, num_active_connections).await
{
eprintln!("connection creation error: {:?}", err); eprintln!("connection creation error: {:?}", err);
} }
}).detach(); })
.detach();
} }
Some(Duration::from_secs(1)) Some(Duration::from_secs(1))
@ -65,7 +77,8 @@ impl Connection {
load_test_state: LoadTestState, load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>, num_active_connections: Rc<RefCell<usize>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let stream = TcpStream::connect(config.server_address).await let stream = TcpStream::connect(config.server_address)
.await
.map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?;
let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap();
let rng = SmallRng::from_entropy(); let rng = SmallRng::from_entropy();
@ -98,7 +111,8 @@ impl Connection {
async fn run_connection_loop(&mut self) -> anyhow::Result<()> { async fn run_connection_loop(&mut self) -> anyhow::Result<()> {
loop { loop {
if self.send_new_request { if self.send_new_request {
let request = create_random_request(&self.config, &self.load_test_state, &mut self.rng); let request =
create_random_request(&self.config, &self.load_test_state, &mut self.rng);
request.write(&mut self.tls.writer())?; request.write(&mut self.tls.writer())?;
self.queued_responses += 1; self.queued_responses += 1;
@ -121,8 +135,7 @@ impl Connection {
return Err(anyhow::anyhow!("Peer has closed connection")); return Err(anyhow::anyhow!("Peer has closed connection"));
} }
self self.load_test_state
.load_test_state
.statistics .statistics
.bytes_received .bytes_received
.fetch_add(bytes_read, Ordering::SeqCst); .fetch_add(bytes_read, Ordering::SeqCst);
@ -145,7 +158,8 @@ impl Connection {
if end > self.response_buffer.len() { if end > self.response_buffer.len() {
return Err(anyhow::anyhow!("response too large")); return Err(anyhow::anyhow!("response too large"));
} else { } else {
let response_buffer_slice = &mut self.response_buffer[self.response_buffer_position..end]; let response_buffer_slice =
&mut self.response_buffer[self.response_buffer_position..end];
response_buffer_slice.copy_from_slice(&buf[..amt]); response_buffer_slice.copy_from_slice(&buf[..amt]);
@ -182,25 +196,21 @@ impl Connection {
match Response::from_bytes(interesting_bytes) { match Response::from_bytes(interesting_bytes) {
Ok(response) => { Ok(response) => {
match response { match response {
Response::Announce(_) => { Response::Announce(_) => {
self self.load_test_state
.load_test_state
.statistics .statistics
.responses_announce .responses_announce
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
} }
Response::Scrape(_) => { Response::Scrape(_) => {
self self.load_test_state
.load_test_state
.statistics .statistics
.responses_scrape .responses_scrape
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
} }
Response::Failure(response) => { Response::Failure(response) => {
self self.load_test_state
.load_test_state
.statistics .statistics
.responses_failure .responses_failure
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
@ -253,18 +263,20 @@ impl Connection {
self.stream.write_all(&buf.into_inner()).await?; self.stream.write_all(&buf.into_inner()).await?;
self.stream.flush().await?; self.stream.flush().await?;
self self.load_test_state
.load_test_state
.statistics .statistics
.bytes_sent .bytes_sent
.fetch_add(len, Ordering::SeqCst); .fetch_add(len, Ordering::SeqCst);
if self.queued_responses != 0 { if self.queued_responses != 0 {
self.load_test_state.statistics.requests.fetch_add(self.queued_responses, Ordering::SeqCst); self.load_test_state
.statistics
.requests
.fetch_add(self.queued_responses, Ordering::SeqCst);
self.queued_responses = 0; self.queued_responses = 0;
} }
Ok(()) Ok(())
} }
} }

View file

@ -1,7 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use aquatic_common::cpu_pinning::CpuPinningConfig; use aquatic_common::cpu_pinning::CpuPinningConfig;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use aquatic_cli_helpers::LogLevel; use aquatic_cli_helpers::LogLevel;

View file

@ -9,7 +9,10 @@ use glommio::prelude::*;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub async fn update_access_list<C: Borrow<Config>>(config: C, access_list: Rc<RefCell<AccessList>>) { pub async fn update_access_list<C: Borrow<Config>>(
config: C,
access_list: Rc<RefCell<AccessList>>,
) {
if config.borrow().access_list.mode.is_on() { if config.borrow().access_list.mode.is_on() {
match BufferedFile::open(&config.borrow().access_list.path).await { match BufferedFile::open(&config.borrow().access_list.path).await {
Ok(file) => { Ok(file) => {
@ -46,4 +49,4 @@ pub async fn update_access_list<C: Borrow<Config>>(config: C, access_list: Rc<Re
} }
}; };
} }
} }

View file

@ -71,8 +71,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut builder = LocalExecutorBuilder::default(); let mut builder = LocalExecutorBuilder::default();
if config.cpu_pinning.active { if config.cpu_pinning.active {
builder = builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
} }
let executor = builder.spawn(|| async move { let executor = builder.spawn(|| async move {
@ -88,7 +87,12 @@ pub fn run(config: Config) -> anyhow::Result<()> {
executors.push(executor); executors.push(executor);
} }
drop_privileges_after_socket_binding(&config.privileges, num_bound_sockets, config.socket_workers).unwrap(); drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
for executor in executors { for executor in executors {
executor executor

View file

@ -27,4 +27,4 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
mio::run(config) mio::run(config)
} }
} }
} }

View file

@ -35,7 +35,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?;
drop_privileges_after_socket_binding(&config.privileges, num_bound_sockets, config.socket_workers).unwrap(); drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
loop { loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); ::std::thread::sleep(Duration::from_secs(config.cleaning.interval));