diff --git a/aquatic_http/src/lib/common/handlers.rs b/aquatic_http/src/lib/common/handlers.rs index fa7fe3d..01f7bc0 100644 --- a/aquatic_http/src/lib/common/handlers.rs +++ b/aquatic_http/src/lib/common/handlers.rs @@ -2,14 +2,14 @@ use std::collections::BTreeMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use either::Either; -use rand::{Rng}; +use rand::Rng; use aquatic_common::extract_response_peers; use aquatic_http_protocol::request::*; use aquatic_http_protocol::response::*; -use crate::config::Config; use super::*; +use crate::config::Config; pub fn handle_announce_request( config: &Config, @@ -215,4 +215,3 @@ pub fn handle_scrape_request( response } - diff --git a/aquatic_http/src/lib/common/mod.rs b/aquatic_http/src/lib/common/mod.rs index 9ff51b1..b9c26b7 100644 --- a/aquatic_http/src/lib/common/mod.rs +++ b/aquatic_http/src/lib/common/mod.rs @@ -1,7 +1,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::time::Instant; -use aquatic_common::access_list::{AccessList}; +use aquatic_common::access_list::AccessList; use either::Either; use hashbrown::HashMap; use indexmap::IndexMap; @@ -10,7 +10,7 @@ use smartstring::{LazyCompact, SmartString}; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; use aquatic_http_protocol::common::*; -use aquatic_http_protocol::response::{ResponsePeer}; +use aquatic_http_protocol::response::ResponsePeer; use crate::config::Config; diff --git a/aquatic_http/src/lib/glommio/common.rs b/aquatic_http/src/lib/glommio/common.rs index 1752deb..0ca0bed 100644 --- a/aquatic_http/src/lib/glommio/common.rs +++ b/aquatic_http/src/lib/glommio/common.rs @@ -1,6 +1,9 @@ use std::net::SocketAddr; -use aquatic_http_protocol::{request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse}}; +use aquatic_http_protocol::{ + request::{AnnounceRequest, ScrapeRequest}, + response::{AnnounceResponse, ScrapeResponse}, +}; #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); @@ -22,7 +25,7 @@ pub enum ChannelRequest { original_indices: Vec, connection_id: ConnectionId, response_consumer_id: ConsumerId, - } + }, } #[derive(Debug)] @@ -37,7 +40,7 @@ pub enum ChannelResponse { peer_addr: SocketAddr, original_indices: Vec, connection_id: ConnectionId, - } + }, } impl ChannelResponse { @@ -53,4 +56,4 @@ impl ChannelResponse { Self::Scrape { peer_addr, .. } => *peer_addr, } } -} \ No newline at end of file +} diff --git a/aquatic_http/src/lib/glommio/handlers.rs b/aquatic_http/src/lib/glommio/handlers.rs index 6457318..e7abd65 100644 --- a/aquatic_http/src/lib/glommio/handlers.rs +++ b/aquatic_http/src/lib/glommio/handlers.rs @@ -88,7 +88,7 @@ async fn handle_request_stream( request, peer_addr, response_consumer_id, - connection_id + connection_id, } => { let meta = ConnectionMeta { worker_index: response_consumer_id.0, @@ -126,7 +126,8 @@ async fn handle_request_stream( peer_addr, }; - let response = handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request); + let response = + handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request); let response = ChannelResponse::Scrape { response, @@ -148,4 +149,3 @@ async fn handle_request_stream( yield_if_needed().await; } } - diff --git a/aquatic_http/src/lib/glommio/mod.rs b/aquatic_http/src/lib/glommio/mod.rs index 531a300..acb3e41 100644 --- a/aquatic_http/src/lib/glommio/mod.rs +++ b/aquatic_http/src/lib/glommio/mod.rs @@ -1,4 +1,8 @@ -use std::{fs::File, io::BufReader, sync::{Arc, atomic::AtomicUsize}}; +use std::{ + fs::File, + io::BufReader, + sync::{atomic::AtomicUsize, Arc}, +}; use aquatic_common::access_list::AccessList; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; @@ -11,9 +15,7 @@ mod network; const SHARED_CHANNEL_SIZE: usize = 1024; -pub fn run( - config: Config, -) -> anyhow::Result<()> { +pub fn run(config: Config) -> anyhow::Result<()> { let access_list = if config.access_list.mode.is_on() { AccessList::create_from_path(&config.access_list.path).expect("Load access list") } else { @@ -94,13 +96,11 @@ pub fn run( .join() .unwrap(); } - + Ok(()) } -fn create_tls_config( - config: &Config, -) -> anyhow::Result { +fn create_tls_config(config: &Config) -> anyhow::Result { let certs = { let f = File::open(&config.network.tls.tls_certificate_path)?; let mut f = BufReader::new(f); @@ -125,6 +125,6 @@ fn create_tls_config( .with_safe_defaults() .with_no_client_auth() .with_single_cert(certs, private_key)?; - + Ok(tls_config) -} \ No newline at end of file +} diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 29f1e75..b0097d6 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -1,20 +1,20 @@ use std::cell::RefCell; use std::io::{Cursor, ErrorKind, Read, Write}; use std::rc::Rc; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError}; use aquatic_http_protocol::response::{FailureResponse, Response}; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; -use glommio::channels::shared_channel::{ConnectedReceiver, ConnectedSender, SharedSender}; -use glommio::prelude::*; -use glommio::net::{TcpListener, TcpStream}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::channels::shared_channel::{ConnectedReceiver, ConnectedSender, SharedSender}; +use glommio::net::{TcpListener, TcpStream}; +use glommio::prelude::*; use glommio::task::JoinHandle; -use rustls::{ServerConnection}; +use rustls::ServerConnection; use slab::Slab; use crate::common::num_digits_in_usize; @@ -24,7 +24,6 @@ use super::common::*; const BUFFER_SIZE: usize = 1024; - struct ConnectionReference { response_sender: LocalSender, handle: JoinHandle<()>, @@ -64,7 +63,11 @@ pub async fn run_socket_worker( let connection_slab = Rc::new(RefCell::new(Slab::new())); for (_, response_receiver) in response_receivers.streams() { - spawn_local(receive_responses(response_receiver, connection_slab.clone())).detach(); + spawn_local(receive_responses( + response_receiver, + connection_slab.clone(), + )) + .detach(); } let mut incoming = listener.incoming(); @@ -103,12 +106,11 @@ pub async fn run_socket_worker( }; entry.insert(connection_reference); - }, + } Err(err) => { ::log::error!("accept connection: {:?}", err); } } - } } @@ -117,7 +119,10 @@ async fn receive_responses( connection_references: Rc>>, ) { while let Some(channel_response) = response_receiver.next().await { - if let Some(reference) = connection_references.borrow().get(channel_response.get_connection_id().0) { + if let Some(reference) = connection_references + .borrow() + .get(channel_response.get_connection_id().0) + { reference.response_sender.try_send(channel_response); } } @@ -129,12 +134,13 @@ impl Connection { let opt_request = self.read_tls().await?; if let Some(request) = opt_request { - let peer_addr = self.stream + let peer_addr = self + .stream .peer_addr() .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; - + match request { - Request::Announce(request@AnnounceRequest { info_hash, .. }) => { + Request::Announce(request @ AnnounceRequest { info_hash, .. }) => { let request = ChannelRequest::Announce { request, connection_id: self.connection_id, @@ -142,12 +148,13 @@ impl Connection { peer_addr, }; - let consumer_index = calculate_request_consumer_index(&self.config, info_hash); + let consumer_index = + calculate_request_consumer_index(&self.config, info_hash); self.request_senders.try_send_to(consumer_index, request); - }, + } Request::Scrape(request) => { // TODO - }, + } } // Wait for response to arrive, then send it @@ -155,12 +162,16 @@ impl Connection { if channel_response.get_peer_addr() != peer_addr { return Err(anyhow::anyhow!("peer addressess didn't match")); } - + let opt_response = match channel_response { - ChannelResponse::Announce { response, .. } => { + ChannelResponse::Announce { response, .. } => { Some(Response::Announce(response)) } - ChannelResponse::Scrape { response, original_indices, .. } => { + ChannelResponse::Scrape { + response, + original_indices, + .. + } => { None // TODO: accumulate scrape requests } }; @@ -219,7 +230,7 @@ impl Connection { self.request_buffer.extend_from_slice(&buf[..amt]); added_plaintext = true; - }, + } Err(err) if err.kind() == ErrorKind::WouldBlock => { break; } @@ -241,7 +252,10 @@ impl Connection { return Ok(Some(request)); } Err(RequestParseError::NeedMoreData) => { - ::log::debug!("need more request data. current data: {:?}", std::str::from_utf8(&self.request_buffer)); + ::log::debug!( + "need more request data. current data: {:?}", + std::str::from_utf8(&self.request_buffer) + ); } Err(RequestParseError::Invalid(err)) => { ::log::debug!("invalid request: {:?}", err); @@ -259,7 +273,7 @@ impl Connection { } if self.tls.wants_write() { - break + break; } } @@ -310,4 +324,4 @@ impl Connection { fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { (info_hash.0[0] as usize) % config.request_workers -} \ No newline at end of file +} diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 7f1456f..10cb683 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -1,12 +1,12 @@ use cfg_if::cfg_if; -pub mod config; pub mod common; +pub mod config; -#[cfg(feature = "with-mio")] -pub mod mio; #[cfg(all(feature = "with-glommio", target_os = "linux"))] pub mod glommio; +#[cfg(feature = "with-mio")] +pub mod mio; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; diff --git a/aquatic_http/src/lib/mio/common.rs b/aquatic_http/src/lib/mio/common.rs index 5c68ca6..ef224d6 100644 --- a/aquatic_http/src/lib/mio/common.rs +++ b/aquatic_http/src/lib/mio/common.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use aquatic_common::access_list::{AccessListArcSwap}; +use aquatic_common::access_list::AccessListArcSwap; use crossbeam_channel::{Receiver, Sender}; use log::error; use mio::Token; @@ -9,7 +9,7 @@ use parking_lot::Mutex; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; use aquatic_http_protocol::request::Request; -use aquatic_http_protocol::response::{Response}; +use aquatic_http_protocol::response::Response; use crate::common::*; diff --git a/aquatic_http/src/lib/mio/handler.rs b/aquatic_http/src/lib/mio/handler.rs index c42acb1..fcdc4e9 100644 --- a/aquatic_http/src/lib/mio/handler.rs +++ b/aquatic_http/src/lib/mio/handler.rs @@ -8,10 +8,10 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_http_protocol::request::*; use aquatic_http_protocol::response::*; +use super::common::*; use crate::common::handlers::{handle_announce_request, handle_scrape_request}; use crate::common::*; use crate::config::Config; -use super::common::*; pub fn run_request_worker( config: Config, @@ -59,8 +59,7 @@ pub fn run_request_worker( } } - let mut torrent_maps = - opt_torrent_maps.unwrap_or_else(|| state.torrent_maps.lock()); + let mut torrent_maps = opt_torrent_maps.unwrap_or_else(|| state.torrent_maps.lock()); let valid_until = ValidUntil::new(config.cleaning.max_peer_age); @@ -71,7 +70,7 @@ pub fn run_request_worker( &mut torrent_maps, valid_until, meta, - request + request, ); response_channel_sender.send(meta, Response::Announce(response)); @@ -79,12 +78,7 @@ pub fn run_request_worker( } for (meta, request) in scrape_requests.drain(..) { - let response = handle_scrape_request( - &config, - &mut torrent_maps, - meta, - request - ); + let response = handle_scrape_request(&config, &mut torrent_maps, meta, request); response_channel_sender.send(meta, Response::Scrape(response)); wake_socket_workers[meta.worker_index] = true; diff --git a/aquatic_http/src/lib/mio/network/mod.rs b/aquatic_http/src/lib/mio/network/mod.rs index 7887f95..fa0c9bd 100644 --- a/aquatic_http/src/lib/mio/network/mod.rs +++ b/aquatic_http/src/lib/mio/network/mod.rs @@ -13,9 +13,9 @@ use native_tls::TlsAcceptor; use aquatic_http_protocol::response::*; -use crate::mio::common::*; -use crate::config::Config; use crate::common::*; +use crate::config::Config; +use crate::mio::common::*; pub mod connection; pub mod stream; diff --git a/aquatic_http/src/lib/mio/tasks.rs b/aquatic_http/src/lib/mio/tasks.rs index 67c9158..0106e4e 100644 --- a/aquatic_http/src/lib/mio/tasks.rs +++ b/aquatic_http/src/lib/mio/tasks.rs @@ -2,8 +2,8 @@ use histogram::Histogram; use aquatic_common::access_list::{AccessListMode, AccessListQuery}; -use crate::config::Config; use super::common::*; +use crate::config::Config; pub fn update_access_list(config: &Config, state: &State) { match config.access_list.mode {