From 6b1b5bf191614217026fedce15f401c3e379602a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 23 Mar 2022 21:30:03 +0100 Subject: [PATCH] http: move various definitions from common.rs to request.rs --- aquatic_http/src/common.rs | 159 +------------------------ aquatic_http/src/workers/request.rs | 172 ++++++++++++++++++++++++++-- 2 files changed, 164 insertions(+), 167 deletions(-) diff --git a/aquatic_http/src/common.rs b/aquatic_http/src/common.rs index 930f865..5bfa9b1 100644 --- a/aquatic_http/src/common.rs +++ b/aquatic_http/src/common.rs @@ -1,19 +1,10 @@ -use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; -use std::time::Instant; -use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; -use either::Either; -use smartstring::{LazyCompact, SmartString}; +use aquatic_common::access_list::AccessListArcSwap; +use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; -use aquatic_http_protocol::common::*; -use aquatic_http_protocol::response::ResponsePeer; - -use crate::config::Config; - use aquatic_http_protocol::{ request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse}, @@ -72,152 +63,6 @@ impl ChannelResponse { } } -pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} - -impl Ip for Ipv4Addr {} -impl Ip for Ipv6Addr {} - -#[derive(Clone, Copy, Debug)] -pub struct ConnectionMeta { - /// Index of socket worker responsible for this connection. Required for - /// sending back response through correct channel to correct worker. - pub response_consumer_id: ConsumerId, - pub peer_addr: CanonicalSocketAddr, - /// Connection id local to socket worker - pub connection_id: ConnectionId, -} - -#[derive(Clone, Copy, Debug)] -pub struct PeerConnectionMeta { - pub response_consumer_id: ConsumerId, - pub connection_id: ConnectionId, - pub peer_ip_address: I, -} - -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum PeerStatus { - Seeding, - Leeching, - Stopped, -} - -impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { - if let AnnounceEvent::Stopped = event { - Self::Stopped - } else if let Some(0) = opt_bytes_left { - Self::Seeding - } else { - Self::Leeching - } - } -} - -#[derive(Debug, Clone, Copy)] -pub struct Peer { - pub connection_meta: PeerConnectionMeta, - pub port: u16, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -impl Peer { - pub fn to_response_peer(&self) -> ResponsePeer { - ResponsePeer { - ip_address: self.connection_meta.peer_ip_address, - port: self.port, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PeerMapKey { - pub peer_id: PeerId, - pub ip_or_key: Either>, -} - -pub type PeerMap = AmortizedIndexMap, Peer>; - -pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -pub type TorrentMap = AmortizedIndexMap>; - -#[derive(Default)] -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - let mut access_list_cache = create_access_list_cache(access_list); - - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); - } - - fn clean_torrent_map( - config: &Config, - access_list_cache: &mut AccessListCache, - torrent_map: &mut TorrentMap, - ) { - let now = Instant::now(); - - torrent_map.retain(|info_hash, torrent_data| { - if !access_list_cache - .load() - .allows(config.access_list.mode, &info_hash.0) - { - return false; - } - - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); - } -} - #[derive(Default, Clone)] pub struct State { pub access_list: Arc, diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/request.rs index 2adc12f..4dd36aa 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/request.rs @@ -1,27 +1,179 @@ +use std::cell::RefCell; use std::collections::BTreeMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::rc::Rc; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use either::Either; -use rand::Rng; - -use aquatic_common::extract_response_peers; -use aquatic_http_protocol::request::*; -use aquatic_http_protocol::response::*; - -use std::cell::RefCell; -use std::rc::Rc; -use std::time::Duration; - use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::timer::TimerActionRepeat; use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; +use rand::Rng; use rand::SeedableRng; +use smartstring::{LazyCompact, SmartString}; + +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::extract_response_peers; +use aquatic_common::ValidUntil; +use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; +use aquatic_http_protocol::common::*; +use aquatic_http_protocol::request::*; +use aquatic_http_protocol::response::ResponsePeer; +use aquatic_http_protocol::response::*; use crate::common::*; use crate::config::Config; +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} + +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} + +#[derive(Clone, Copy, Debug)] +pub struct ConnectionMeta { + /// Index of socket worker responsible for this connection. Required for + /// sending back response through correct channel to correct worker. + pub response_consumer_id: ConsumerId, + pub peer_addr: CanonicalSocketAddr, + /// Connection id local to socket worker + pub connection_id: ConnectionId, +} + +#[derive(Clone, Copy, Debug)] +pub struct PeerConnectionMeta { + pub response_consumer_id: ConsumerId, + pub connection_id: ConnectionId, + pub peer_ip_address: I, +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if let Some(0) = opt_bytes_left { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Peer { + pub connection_meta: PeerConnectionMeta, + pub port: u16, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.connection_meta.peer_ip_address, + port: self.port, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PeerMapKey { + pub peer_id: PeerId, + pub ip_or_key: Either>, +} + +pub type PeerMap = AmortizedIndexMap, Peer>; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + #[inline] + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = AmortizedIndexMap>; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let mut access_list_cache = create_access_list_cache(access_list); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + } + + fn clean_torrent_map( + config: &Config, + access_list_cache: &mut AccessListCache, + torrent_map: &mut TorrentMap, + ) { + let now = Instant::now(); + + torrent_map.retain(|info_hash, torrent_data| { + if !access_list_cache + .load() + .allows(config.access_list.mode, &info_hash.0) + { + return false; + } + + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + pub async fn run_request_worker( config: Config, state: State,