Run cargo fmt

This commit is contained in:
Joakim Frostegård 2021-10-27 00:47:46 +02:00
parent 8f0dabc706
commit 17412868b9
11 changed files with 73 additions and 63 deletions

View file

@ -2,14 +2,14 @@ use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use either::Either; use either::Either;
use rand::{Rng}; use rand::Rng;
use aquatic_common::extract_response_peers; use aquatic_common::extract_response_peers;
use aquatic_http_protocol::request::*; use aquatic_http_protocol::request::*;
use aquatic_http_protocol::response::*; use aquatic_http_protocol::response::*;
use crate::config::Config;
use super::*; use super::*;
use crate::config::Config;
pub fn handle_announce_request( pub fn handle_announce_request(
config: &Config, config: &Config,
@ -215,4 +215,3 @@ pub fn handle_scrape_request(
response response
} }

View file

@ -1,7 +1,7 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::{AccessList}; use aquatic_common::access_list::AccessList;
use either::Either; use either::Either;
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
@ -10,7 +10,7 @@ use smartstring::{LazyCompact, SmartString};
pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil};
use aquatic_http_protocol::common::*; use aquatic_http_protocol::common::*;
use aquatic_http_protocol::response::{ResponsePeer}; use aquatic_http_protocol::response::ResponsePeer;
use crate::config::Config; use crate::config::Config;

View file

@ -1,6 +1,9 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use aquatic_http_protocol::{request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse}}; use aquatic_http_protocol::{
request::{AnnounceRequest, ScrapeRequest},
response::{AnnounceResponse, ScrapeResponse},
};
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize); pub struct ConsumerId(pub usize);
@ -22,7 +25,7 @@ pub enum ChannelRequest {
original_indices: Vec<usize>, original_indices: Vec<usize>,
connection_id: ConnectionId, connection_id: ConnectionId,
response_consumer_id: ConsumerId, response_consumer_id: ConsumerId,
} },
} }
#[derive(Debug)] #[derive(Debug)]
@ -37,7 +40,7 @@ pub enum ChannelResponse {
peer_addr: SocketAddr, peer_addr: SocketAddr,
original_indices: Vec<usize>, original_indices: Vec<usize>,
connection_id: ConnectionId, connection_id: ConnectionId,
} },
} }
impl ChannelResponse { impl ChannelResponse {
@ -53,4 +56,4 @@ impl ChannelResponse {
Self::Scrape { peer_addr, .. } => *peer_addr, Self::Scrape { peer_addr, .. } => *peer_addr,
} }
} }
} }

View file

@ -88,7 +88,7 @@ async fn handle_request_stream<S>(
request, request,
peer_addr, peer_addr,
response_consumer_id, response_consumer_id,
connection_id connection_id,
} => { } => {
let meta = ConnectionMeta { let meta = ConnectionMeta {
worker_index: response_consumer_id.0, worker_index: response_consumer_id.0,
@ -126,7 +126,8 @@ async fn handle_request_stream<S>(
peer_addr, peer_addr,
}; };
let response = handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request); let response =
handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request);
let response = ChannelResponse::Scrape { let response = ChannelResponse::Scrape {
response, response,
@ -148,4 +149,3 @@ async fn handle_request_stream<S>(
yield_if_needed().await; yield_if_needed().await;
} }
} }

View file

@ -1,4 +1,8 @@
use std::{fs::File, io::BufReader, sync::{Arc, atomic::AtomicUsize}}; use std::{
fs::File,
io::BufReader,
sync::{atomic::AtomicUsize, Arc},
};
use aquatic_common::access_list::AccessList; use aquatic_common::access_list::AccessList;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
@ -11,9 +15,7 @@ mod network;
const SHARED_CHANNEL_SIZE: usize = 1024; const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run( pub fn run(config: Config) -> anyhow::Result<()> {
config: Config,
) -> anyhow::Result<()> {
let access_list = if config.access_list.mode.is_on() { let access_list = if config.access_list.mode.is_on() {
AccessList::create_from_path(&config.access_list.path).expect("Load access list") AccessList::create_from_path(&config.access_list.path).expect("Load access list")
} else { } else {
@ -94,13 +96,11 @@ pub fn run(
.join() .join()
.unwrap(); .unwrap();
} }
Ok(()) Ok(())
} }
fn create_tls_config( fn create_tls_config(config: &Config) -> anyhow::Result<rustls::ServerConfig> {
config: &Config,
) -> anyhow::Result<rustls::ServerConfig> {
let certs = { let certs = {
let f = File::open(&config.network.tls.tls_certificate_path)?; let f = File::open(&config.network.tls.tls_certificate_path)?;
let mut f = BufReader::new(f); let mut f = BufReader::new(f);
@ -125,6 +125,6 @@ fn create_tls_config(
.with_safe_defaults() .with_safe_defaults()
.with_no_client_auth() .with_no_client_auth()
.with_single_cert(certs, private_key)?; .with_single_cert(certs, private_key)?;
Ok(tls_config) Ok(tls_config)
} }

View file

@ -1,20 +1,20 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::io::{Cursor, ErrorKind, Read, Write}; use std::io::{Cursor, ErrorKind, Read, Write};
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError}; use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError};
use aquatic_http_protocol::response::{FailureResponse, Response}; use aquatic_http_protocol::response::{FailureResponse, Response};
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders};
use glommio::channels::shared_channel::{ConnectedReceiver, ConnectedSender, SharedSender};
use glommio::prelude::*;
use glommio::net::{TcpListener, TcpStream};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::{ConnectedReceiver, ConnectedSender, SharedSender};
use glommio::net::{TcpListener, TcpStream};
use glommio::prelude::*;
use glommio::task::JoinHandle; use glommio::task::JoinHandle;
use rustls::{ServerConnection}; use rustls::ServerConnection;
use slab::Slab; use slab::Slab;
use crate::common::num_digits_in_usize; use crate::common::num_digits_in_usize;
@ -24,7 +24,6 @@ use super::common::*;
const BUFFER_SIZE: usize = 1024; const BUFFER_SIZE: usize = 1024;
struct ConnectionReference { struct ConnectionReference {
response_sender: LocalSender<ChannelResponse>, response_sender: LocalSender<ChannelResponse>,
handle: JoinHandle<()>, handle: JoinHandle<()>,
@ -64,7 +63,11 @@ pub async fn run_socket_worker(
let connection_slab = Rc::new(RefCell::new(Slab::new())); let connection_slab = Rc::new(RefCell::new(Slab::new()));
for (_, response_receiver) in response_receivers.streams() { for (_, response_receiver) in response_receivers.streams() {
spawn_local(receive_responses(response_receiver, connection_slab.clone())).detach(); spawn_local(receive_responses(
response_receiver,
connection_slab.clone(),
))
.detach();
} }
let mut incoming = listener.incoming(); let mut incoming = listener.incoming();
@ -103,12 +106,11 @@ pub async fn run_socket_worker(
}; };
entry.insert(connection_reference); entry.insert(connection_reference);
}, }
Err(err) => { Err(err) => {
::log::error!("accept connection: {:?}", err); ::log::error!("accept connection: {:?}", err);
} }
} }
} }
} }
@ -117,7 +119,10 @@ async fn receive_responses(
connection_references: Rc<RefCell<Slab<ConnectionReference>>>, connection_references: Rc<RefCell<Slab<ConnectionReference>>>,
) { ) {
while let Some(channel_response) = response_receiver.next().await { while let Some(channel_response) = response_receiver.next().await {
if let Some(reference) = connection_references.borrow().get(channel_response.get_connection_id().0) { if let Some(reference) = connection_references
.borrow()
.get(channel_response.get_connection_id().0)
{
reference.response_sender.try_send(channel_response); reference.response_sender.try_send(channel_response);
} }
} }
@ -129,12 +134,13 @@ impl Connection {
let opt_request = self.read_tls().await?; let opt_request = self.read_tls().await?;
if let Some(request) = opt_request { if let Some(request) = opt_request {
let peer_addr = self.stream let peer_addr = self
.stream
.peer_addr() .peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
match request { match request {
Request::Announce(request@AnnounceRequest { info_hash, .. }) => { Request::Announce(request @ AnnounceRequest { info_hash, .. }) => {
let request = ChannelRequest::Announce { let request = ChannelRequest::Announce {
request, request,
connection_id: self.connection_id, connection_id: self.connection_id,
@ -142,12 +148,13 @@ impl Connection {
peer_addr, peer_addr,
}; };
let consumer_index = calculate_request_consumer_index(&self.config, info_hash); let consumer_index =
calculate_request_consumer_index(&self.config, info_hash);
self.request_senders.try_send_to(consumer_index, request); self.request_senders.try_send_to(consumer_index, request);
}, }
Request::Scrape(request) => { Request::Scrape(request) => {
// TODO // TODO
}, }
} }
// Wait for response to arrive, then send it // Wait for response to arrive, then send it
@ -155,12 +162,16 @@ impl Connection {
if channel_response.get_peer_addr() != peer_addr { if channel_response.get_peer_addr() != peer_addr {
return Err(anyhow::anyhow!("peer addressess didn't match")); return Err(anyhow::anyhow!("peer addressess didn't match"));
} }
let opt_response = match channel_response { let opt_response = match channel_response {
ChannelResponse::Announce { response, .. } => { ChannelResponse::Announce { response, .. } => {
Some(Response::Announce(response)) Some(Response::Announce(response))
} }
ChannelResponse::Scrape { response, original_indices, .. } => { ChannelResponse::Scrape {
response,
original_indices,
..
} => {
None // TODO: accumulate scrape requests None // TODO: accumulate scrape requests
} }
}; };
@ -219,7 +230,7 @@ impl Connection {
self.request_buffer.extend_from_slice(&buf[..amt]); self.request_buffer.extend_from_slice(&buf[..amt]);
added_plaintext = true; added_plaintext = true;
}, }
Err(err) if err.kind() == ErrorKind::WouldBlock => { Err(err) if err.kind() == ErrorKind::WouldBlock => {
break; break;
} }
@ -241,7 +252,10 @@ impl Connection {
return Ok(Some(request)); return Ok(Some(request));
} }
Err(RequestParseError::NeedMoreData) => { Err(RequestParseError::NeedMoreData) => {
::log::debug!("need more request data. current data: {:?}", std::str::from_utf8(&self.request_buffer)); ::log::debug!(
"need more request data. current data: {:?}",
std::str::from_utf8(&self.request_buffer)
);
} }
Err(RequestParseError::Invalid(err)) => { Err(RequestParseError::Invalid(err)) => {
::log::debug!("invalid request: {:?}", err); ::log::debug!("invalid request: {:?}", err);
@ -259,7 +273,7 @@ impl Connection {
} }
if self.tls.wants_write() { if self.tls.wants_write() {
break break;
} }
} }
@ -310,4 +324,4 @@ impl Connection {
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers (info_hash.0[0] as usize) % config.request_workers
} }

View file

@ -1,12 +1,12 @@
use cfg_if::cfg_if; use cfg_if::cfg_if;
pub mod config;
pub mod common; pub mod common;
pub mod config;
#[cfg(feature = "with-mio")]
pub mod mio;
#[cfg(all(feature = "with-glommio", target_os = "linux"))] #[cfg(all(feature = "with-glommio", target_os = "linux"))]
pub mod glommio; pub mod glommio;
#[cfg(feature = "with-mio")]
pub mod mio;
pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use aquatic_common::access_list::{AccessListArcSwap}; use aquatic_common::access_list::AccessListArcSwap;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use log::error; use log::error;
use mio::Token; use mio::Token;
@ -9,7 +9,7 @@ use parking_lot::Mutex;
pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil};
use aquatic_http_protocol::request::Request; use aquatic_http_protocol::request::Request;
use aquatic_http_protocol::response::{Response}; use aquatic_http_protocol::response::Response;
use crate::common::*; use crate::common::*;

View file

@ -8,10 +8,10 @@ use rand::{rngs::SmallRng, SeedableRng};
use aquatic_http_protocol::request::*; use aquatic_http_protocol::request::*;
use aquatic_http_protocol::response::*; use aquatic_http_protocol::response::*;
use super::common::*;
use crate::common::handlers::{handle_announce_request, handle_scrape_request}; use crate::common::handlers::{handle_announce_request, handle_scrape_request};
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
use super::common::*;
pub fn run_request_worker( pub fn run_request_worker(
config: Config, config: Config,
@ -59,8 +59,7 @@ pub fn run_request_worker(
} }
} }
let mut torrent_maps = let mut torrent_maps = opt_torrent_maps.unwrap_or_else(|| state.torrent_maps.lock());
opt_torrent_maps.unwrap_or_else(|| state.torrent_maps.lock());
let valid_until = ValidUntil::new(config.cleaning.max_peer_age); let valid_until = ValidUntil::new(config.cleaning.max_peer_age);
@ -71,7 +70,7 @@ pub fn run_request_worker(
&mut torrent_maps, &mut torrent_maps,
valid_until, valid_until,
meta, meta,
request request,
); );
response_channel_sender.send(meta, Response::Announce(response)); response_channel_sender.send(meta, Response::Announce(response));
@ -79,12 +78,7 @@ pub fn run_request_worker(
} }
for (meta, request) in scrape_requests.drain(..) { for (meta, request) in scrape_requests.drain(..) {
let response = handle_scrape_request( let response = handle_scrape_request(&config, &mut torrent_maps, meta, request);
&config,
&mut torrent_maps,
meta,
request
);
response_channel_sender.send(meta, Response::Scrape(response)); response_channel_sender.send(meta, Response::Scrape(response));
wake_socket_workers[meta.worker_index] = true; wake_socket_workers[meta.worker_index] = true;

View file

@ -13,9 +13,9 @@ use native_tls::TlsAcceptor;
use aquatic_http_protocol::response::*; use aquatic_http_protocol::response::*;
use crate::mio::common::*;
use crate::config::Config;
use crate::common::*; use crate::common::*;
use crate::config::Config;
use crate::mio::common::*;
pub mod connection; pub mod connection;
pub mod stream; pub mod stream;

View file

@ -2,8 +2,8 @@ use histogram::Histogram;
use aquatic_common::access_list::{AccessListMode, AccessListQuery}; use aquatic_common::access_list::{AccessListMode, AccessListQuery};
use crate::config::Config;
use super::common::*; use super::common::*;
use crate::config::Config;
pub fn update_access_list(config: &Config, state: &State) { pub fn update_access_list(config: &Config, state: &State) {
match config.access_list.mode { match config.access_list.mode {