From 1b2009ba6095b2c294e222537c74842f2f09725d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 3 Apr 2022 01:42:46 +0200 Subject: [PATCH] http_private: do more work on request worker --- aquatic_http_private/src/workers/request.rs | 171 +++++++++++++++++++- 1 file changed, 163 insertions(+), 8 deletions(-) diff --git a/aquatic_http_private/src/workers/request.rs b/aquatic_http_private/src/workers/request.rs index 2fcd8b0..4698d4d 100644 --- a/aquatic_http_private/src/workers/request.rs +++ b/aquatic_http_private/src/workers/request.rs @@ -1,10 +1,136 @@ -use tokio::sync::mpsc::Receiver; +use std::cell::RefCell; +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::rc::Rc; +use std::time::Instant; -use aquatic_http_protocol::response::{FailureResponse, Response}; +use tokio::sync::mpsc::Receiver; +use tokio::task::LocalSet; +use tokio::time; + +use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr, ValidUntil}; +use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId}; +use aquatic_http_protocol::request::AnnounceRequest; +use aquatic_http_protocol::response::{FailureResponse, Response, ResponsePeer}; use crate::common::ChannelAnnounceRequest; use crate::config::Config; +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} + +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if let Some(0) = opt_bytes_left { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Peer { + pub ip_address: I, + pub port: u16, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PeerMapKey { + pub peer_id: PeerId, + pub ip: I, +} + +pub type PeerMap = AmortizedIndexMap, Peer>; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + #[inline] + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = AmortizedIndexMap>; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + pub fn clean(&mut self) { + Self::clean_torrent_map(&mut self.ipv4); + Self::clean_torrent_map(&mut self.ipv6); + } + + fn clean_torrent_map(torrent_map: &mut TorrentMap) { + let now = Instant::now(); + + torrent_map.retain(|_, torrent_data| { + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + pub fn run_request_worker( config: Config, request_receiver: Receiver, @@ -22,18 +148,47 @@ async fn run_inner( config: Config, mut request_receiver: Receiver, ) -> anyhow::Result<()> { + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + + LocalSet::new().spawn_local(periodically_clean_torrents( + config.clone(), + torrents.clone(), + )); + loop { let request = request_receiver .recv() .await .ok_or_else(|| anyhow::anyhow!("request channel closed"))?; - println!("{:?}", request); + let response = handle_announce_request( + &config, + &torrents, + request.source_addr, + request.request.into(), + ); - let _ = request - .response_sender - .send(Response::Failure(FailureResponse::new( - "successful actually", - ))); + let _ = request.response_sender.send(response); + } +} + +fn handle_announce_request( + config: &Config, + torrents: &Rc>, + source_addr: CanonicalSocketAddr, + request: AnnounceRequest, +) -> Response { + Response::Failure(FailureResponse::new("actually successful")) +} + +async fn periodically_clean_torrents(config: Config, torrents: Rc>) { + let mut interval = time::interval(time::Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )); + + loop { + interval.tick().await; + + torrents.borrow_mut().clean(); } }