From 193ad1689fc53cc3f307f4869bbe3f7090e9677f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 18 Mar 2022 15:45:07 +0100 Subject: [PATCH] ws: move code from common.rs into lib.rs and request.rs --- aquatic_ws/src/common.rs | 148 +----------------------------- aquatic_ws/src/lib.rs | 57 +++++++++--- aquatic_ws/src/workers/request.rs | 114 ++++++++++++++++++++++- 3 files changed, 158 insertions(+), 161 deletions(-) diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index f7ced95..2d2f834 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -1,17 +1,10 @@ -use std::fs::File; -use std::io::BufReader; use std::sync::Arc; -use std::time::Instant; -use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; +use aquatic_common::access_list::AccessListArcSwap; +use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; -use aquatic_ws_protocol::*; - -use crate::config::Config; - pub type TlsConfig = futures_rustls::rustls::ServerConfig; #[derive(Default, Clone)] @@ -37,140 +30,3 @@ pub struct ConnectionMeta { pub peer_addr: CanonicalSocketAddr, pub pending_scrape_id: Option, } - -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum PeerStatus { - Seeding, - Leeching, - Stopped, -} - -impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { - if let AnnounceEvent::Stopped = event { - Self::Stopped - } else if let Some(0) = opt_bytes_left { - Self::Seeding - } else { - Self::Leeching - } - } -} - -#[derive(Clone, Copy)] -pub struct Peer { - pub connection_meta: ConnectionMeta, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -pub type PeerMap = AHashIndexMap; - -pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -pub type TorrentMap = AHashIndexMap; - -#[derive(Default)] -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let mut access_list_cache = create_access_list_cache(access_list); - - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); - } - - fn clean_torrent_map( - config: &Config, - access_list_cache: &mut AccessListCache, - torrent_map: &mut TorrentMap, - ) { - let now = Instant::now(); - - torrent_map.retain(|info_hash, torrent_data| { - if !access_list_cache - .load() - .allows(config.access_list.mode, &info_hash.0) - { - return false; - } - - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); - } -} - -pub fn create_tls_config(config: &Config) -> anyhow::Result { - let certs = { - let f = File::open(&config.network.tls_certificate_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::certs(&mut f)? - .into_iter() - .map(|bytes| rustls::Certificate(bytes)) - .collect() - }; - - let private_key = { - let f = File::open(&config.network.tls_private_key_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::pkcs8_private_keys(&mut f)? - .first() - .map(|bytes| rustls::PrivateKey(bytes.clone())) - .ok_or(anyhow::anyhow!("No private keys in file"))? - }; - - let tls_config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, private_key)?; - - Ok(tls_config) -} diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index a9a496c..0e8cd38 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -1,20 +1,22 @@ -use aquatic_common::access_list::update_access_list; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use common::State; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; - -use std::sync::{atomic::AtomicUsize, Arc}; - -use crate::{common::create_tls_config, config::Config}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; - -use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; - pub mod common; pub mod config; pub mod workers; +use std::fs::File; +use std::io::BufReader; +use std::sync::{atomic::AtomicUsize, Arc}; + +use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; +use signal_hook::{consts::SIGUSR1, iterator::Signals}; + +use aquatic_common::access_list::update_access_list; +#[cfg(feature = "cpu-pinning")] +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; +use aquatic_common::privileges::drop_privileges_after_socket_binding; + +use common::*; +use config::Config; + pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; @@ -147,3 +149,32 @@ pub fn run_workers(config: Config, state: State) -> anyhow::Result<()> { Ok(()) } + +pub fn create_tls_config(config: &Config) -> anyhow::Result { + let certs = { + let f = File::open(&config.network.tls_certificate_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::certs(&mut f)? + .into_iter() + .map(|bytes| rustls::Certificate(bytes)) + .collect() + }; + + let private_key = { + let f = File::open(&config.network.tls_private_key_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::pkcs8_private_keys(&mut f)? + .first() + .map(|bytes| rustls::PrivateKey(bytes.clone())) + .ok_or(anyhow::anyhow!("No private keys in file"))? + }; + + let tls_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, private_key)?; + + Ok(tls_config) +} diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/request.rs index a5dd22f..6cf2ca0 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/request.rs @@ -1,7 +1,9 @@ use std::cell::RefCell; use std::rc::Rc; -use std::time::Duration; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::enclose; @@ -10,13 +12,121 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::extract_response_peers; +use aquatic_common::{extract_response_peers, AHashIndexMap}; use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; use crate::SHARED_IN_CHANNEL_SIZE; +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if let Some(0) = opt_bytes_left { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Clone, Copy)] +pub struct Peer { + pub connection_meta: ConnectionMeta, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +pub type PeerMap = AHashIndexMap; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + #[inline] + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = AHashIndexMap; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let mut access_list_cache = create_access_list_cache(access_list); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + } + + fn clean_torrent_map( + config: &Config, + access_list_cache: &mut AccessListCache, + torrent_map: &mut TorrentMap, + ) { + let now = Instant::now(); + + torrent_map.retain(|info_hash, torrent_data| { + if !access_list_cache + .load() + .allows(config.access_list.mode, &info_hash.0) + { + return false; + } + + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + pub async fn run_request_worker( config: Config, state: State,