diff --git a/aquatic_http/src/lib/common/handlers.rs b/aquatic_http/src/lib/common/handlers.rs new file mode 100644 index 0000000..7f10290 --- /dev/null +++ b/aquatic_http/src/lib/common/handlers.rs @@ -0,0 +1,217 @@ +use std::collections::BTreeMap; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use either::Either; +use rand::{Rng}; + +use aquatic_common::extract_response_peers; +use aquatic_http_protocol::request::*; +use aquatic_http_protocol::response::*; + +use crate::config::Config; +use super::*; + +pub fn handle_announce_request( + config: &Config, + rng: &mut impl Rng, + torrent_maps: &mut TorrentMaps, + valid_until: ValidUntil, + meta: ConnectionMeta, + request: AnnounceRequest, +) -> AnnounceResponse { + let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); + + ::log::debug!("peer ip: {:?}", peer_ip); + + match peer_ip { + IpAddr::V4(peer_ip_address) => { + let torrent_data: &mut TorrentData = + torrent_maps.ipv4.entry(request.info_hash).or_default(); + + let peer_connection_meta = PeerConnectionMeta { + worker_index: meta.worker_index, + poll_token: meta.poll_token, + peer_ip_address, + }; + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + peer_connection_meta, + torrent_data, + request, + valid_until, + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(response_peers), + peers6: ResponsePeerListV6(vec![]), + }; + + response + } + IpAddr::V6(peer_ip_address) => { + let torrent_data: &mut TorrentData = + torrent_maps.ipv6.entry(request.info_hash).or_default(); + + let peer_connection_meta = PeerConnectionMeta { + worker_index: meta.worker_index, + poll_token: meta.poll_token, + peer_ip_address, + }; + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + peer_connection_meta, + torrent_data, + request, + valid_until, + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(vec![]), + peers6: ResponsePeerListV6(response_peers), + }; + + response + } + } +} + +/// Insert/update peer. Return num_seeders, num_leechers and response peers +pub fn upsert_peer_and_get_response_peers( + config: &Config, + rng: &mut impl Rng, + request_sender_meta: PeerConnectionMeta, + torrent_data: &mut TorrentData, + request: AnnounceRequest, + valid_until: ValidUntil, +) -> (usize, usize, Vec>) { + // Insert/update/remove peer who sent this request + + let peer_status = + PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); + + let peer = Peer { + connection_meta: request_sender_meta, + port: request.port, + status: peer_status, + valid_until, + }; + + ::log::debug!("peer: {:?}", peer); + + let ip_or_key = request + .key + .map(Either::Right) + .unwrap_or_else(|| Either::Left(request_sender_meta.peer_ip_address)); + + let peer_map_key = PeerMapKey { + peer_id: request.peer_id, + ip_or_key, + }; + + ::log::debug!("peer map key: {:?}", peer_map_key); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_map_key.clone(), peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_map_key.clone(), peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), + }; + + ::log::debug!("opt_removed_peer: {:?}", opt_removed_peer); + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + ::log::debug!("peer request numwant: {:?}", request.numwant); + + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; + + let response_peers: Vec> = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_map_key, + Peer::to_response_peer, + ); + + ( + torrent_data.num_seeders, + torrent_data.num_leechers, + response_peers, + ) +} + +pub fn handle_scrape_request( + config: &Config, + torrent_maps: &mut TorrentMaps, + (meta, request): (ConnectionMeta, ScrapeRequest), +) -> ScrapeResponse { + let num_to_take = request + .info_hashes + .len() + .min(config.protocol.max_scrape_torrents); + + let mut response = ScrapeResponse { + files: BTreeMap::new(), + }; + + let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); + + // If request.info_hashes is empty, don't return scrape for all + // torrents, even though reference server does it. It is too expensive. + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.into_iter().take(num_to_take) { + if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash) { + let stats = ScrapeStatistics { + complete: torrent_data.num_seeders, + downloaded: 0, // No implementation planned + incomplete: torrent_data.num_leechers, + }; + + response.files.insert(info_hash, stats); + } + } + } else { + for info_hash in request.info_hashes.into_iter().take(num_to_take) { + if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash) { + let stats = ScrapeStatistics { + complete: torrent_data.num_seeders, + downloaded: 0, // No implementation planned + incomplete: torrent_data.num_leechers, + }; + + response.files.insert(info_hash, stats); + } + } + }; + + response +} + diff --git a/aquatic_http/src/lib/common/mod.rs b/aquatic_http/src/lib/common/mod.rs index 8002a13..845db1c 100644 --- a/aquatic_http/src/lib/common/mod.rs +++ b/aquatic_http/src/lib/common/mod.rs @@ -1,3 +1,163 @@ +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Instant; + +use aquatic_common::access_list::{AccessList}; +use either::Either; +use hashbrown::HashMap; +use indexmap::IndexMap; +use mio::Token; +use smartstring::{LazyCompact, SmartString}; + +pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; + +use aquatic_http_protocol::common::*; +use aquatic_http_protocol::response::{ResponsePeer}; + +use crate::config::Config; + +pub mod handlers; + +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} + +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} + +#[derive(Clone, Copy, Debug)] +pub struct ConnectionMeta { + /// Index of socket worker responsible for this connection. Required for + /// sending back response through correct channel to correct worker. + pub worker_index: usize, + pub peer_addr: SocketAddr, + pub poll_token: Token, +} + +#[derive(Clone, Copy, Debug)] +pub struct PeerConnectionMeta { + pub worker_index: usize, + pub poll_token: Token, + pub peer_ip_address: I, +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if let Some(0) = opt_bytes_left { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Peer { + pub connection_meta: PeerConnectionMeta, + pub port: u16, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.connection_meta.peer_ip_address, + port: self.port, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PeerMapKey { + pub peer_id: PeerId, + pub ip_or_key: Either>, +} + +pub type PeerMap = IndexMap, Peer>; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + #[inline] + fn default() -> Self { + Self { + peers: IndexMap::new(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = HashMap>; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + Self::clean_torrent_map(config, access_list, &mut self.ipv4); + Self::clean_torrent_map(config, access_list, &mut self.ipv6); + } + + fn clean_torrent_map( + config: &Config, + access_list: &Arc, + torrent_map: &mut TorrentMap, + ) { + let now = Instant::now(); + + torrent_map.retain(|info_hash, torrent_data| { + if !access_list.allows(config.access_list.mode, &info_hash.0) { + return false; + } + + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + pub fn num_digits_in_usize(mut number: usize) -> usize { let mut num_digits = 1usize; diff --git a/aquatic_http/src/lib/glommio/handlers.rs b/aquatic_http/src/lib/glommio/handlers.rs new file mode 100644 index 0000000..e69de29 diff --git a/aquatic_http/src/lib/glommio/mod.rs b/aquatic_http/src/lib/glommio/mod.rs index e5b577e..a88a8d4 100644 --- a/aquatic_http/src/lib/glommio/mod.rs +++ b/aquatic_http/src/lib/glommio/mod.rs @@ -5,6 +5,7 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use crate::config::Config; +mod handlers; mod network; const SHARED_CHANNEL_SIZE: usize = 1024; diff --git a/aquatic_http/src/lib/mio/common.rs b/aquatic_http/src/lib/mio/common.rs index e4a9832..5c68ca6 100644 --- a/aquatic_http/src/lib/mio/common.rs +++ b/aquatic_http/src/lib/mio/common.rs @@ -1,168 +1,21 @@ -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; -use std::time::Instant; -use aquatic_common::access_list::{AccessList, AccessListArcSwap}; +use aquatic_common::access_list::{AccessListArcSwap}; use crossbeam_channel::{Receiver, Sender}; -use either::Either; -use hashbrown::HashMap; -use indexmap::IndexMap; use log::error; use mio::Token; use parking_lot::Mutex; -use smartstring::{LazyCompact, SmartString}; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; -use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::Request; -use aquatic_http_protocol::response::{Response, ResponsePeer}; +use aquatic_http_protocol::response::{Response}; -use crate::config::Config; +use crate::common::*; pub const LISTENER_TOKEN: Token = Token(0); pub const CHANNEL_TOKEN: Token = Token(1); -pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} - -impl Ip for Ipv4Addr {} -impl Ip for Ipv6Addr {} - -#[derive(Clone, Copy, Debug)] -pub struct ConnectionMeta { - /// Index of socket worker responsible for this connection. Required for - /// sending back response through correct channel to correct worker. - pub worker_index: usize, - pub peer_addr: SocketAddr, - pub poll_token: Token, -} - -#[derive(Clone, Copy, Debug)] -pub struct PeerConnectionMeta { - pub worker_index: usize, - pub poll_token: Token, - pub peer_ip_address: I, -} - -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum PeerStatus { - Seeding, - Leeching, - Stopped, -} - -impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { - if let AnnounceEvent::Stopped = event { - Self::Stopped - } else if let Some(0) = opt_bytes_left { - Self::Seeding - } else { - Self::Leeching - } - } -} - -#[derive(Debug, Clone, Copy)] -pub struct Peer { - pub connection_meta: PeerConnectionMeta, - pub port: u16, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -impl Peer { - pub fn to_response_peer(&self) -> ResponsePeer { - ResponsePeer { - ip_address: self.connection_meta.peer_ip_address, - port: self.port, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PeerMapKey { - pub peer_id: PeerId, - pub ip_or_key: Either>, -} - -pub type PeerMap = IndexMap, Peer>; - -pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: IndexMap::new(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -pub type TorrentMap = HashMap>; - -#[derive(Default)] -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { - Self::clean_torrent_map(config, access_list, &mut self.ipv4); - Self::clean_torrent_map(config, access_list, &mut self.ipv6); - } - - fn clean_torrent_map( - config: &Config, - access_list: &Arc, - torrent_map: &mut TorrentMap, - ) { - let now = Instant::now(); - - torrent_map.retain(|info_hash, torrent_data| { - if !access_list.allows(config.access_list.mode, &info_hash.0) { - return false; - } - - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); - } -} - #[derive(Clone)] pub struct State { pub access_list: Arc, diff --git a/aquatic_http/src/lib/mio/handler.rs b/aquatic_http/src/lib/mio/handler.rs index c54df07..510bac7 100644 --- a/aquatic_http/src/lib/mio/handler.rs +++ b/aquatic_http/src/lib/mio/handler.rs @@ -1,18 +1,16 @@ -use std::collections::BTreeMap; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::Arc; use std::time::Duration; use std::vec::Drain; -use either::Either; use mio::Waker; use parking_lot::MutexGuard; use rand::{rngs::SmallRng, Rng, SeedableRng}; -use aquatic_common::extract_response_peers; use aquatic_http_protocol::request::*; use aquatic_http_protocol::response::*; +use crate::common::handlers::{handle_announce_request, handle_scrape_request}; +use crate::common::*; use crate::config::Config; use super::common::*; @@ -105,159 +103,13 @@ pub fn handle_announce_requests( let valid_until = ValidUntil::new(config.cleaning.max_peer_age); for (meta, request) in requests { - let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); + let response = handle_announce_request(config, rng, torrent_maps, valid_until, meta, request); - ::log::debug!("peer ip: {:?}", peer_ip); - - let response = match peer_ip { - IpAddr::V4(peer_ip_address) => { - let torrent_data: &mut TorrentData = - torrent_maps.ipv4.entry(request.info_hash).or_default(); - - let peer_connection_meta = PeerConnectionMeta { - worker_index: meta.worker_index, - poll_token: meta.poll_token, - peer_ip_address, - }; - - let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( - config, - rng, - peer_connection_meta, - torrent_data, - request, - valid_until, - ); - - let response = AnnounceResponse { - complete: seeders, - incomplete: leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(response_peers), - peers6: ResponsePeerListV6(vec![]), - }; - - Response::Announce(response) - } - IpAddr::V6(peer_ip_address) => { - let torrent_data: &mut TorrentData = - torrent_maps.ipv6.entry(request.info_hash).or_default(); - - let peer_connection_meta = PeerConnectionMeta { - worker_index: meta.worker_index, - poll_token: meta.poll_token, - peer_ip_address, - }; - - let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( - config, - rng, - peer_connection_meta, - torrent_data, - request, - valid_until, - ); - - let response = AnnounceResponse { - complete: seeders, - incomplete: leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(vec![]), - peers6: ResponsePeerListV6(response_peers), - }; - - Response::Announce(response) - } - }; - - response_channel_sender.send(meta, response); + response_channel_sender.send(meta, Response::Announce(response)); wake_socket_workers[meta.worker_index] = true; } } -/// Insert/update peer. Return num_seeders, num_leechers and response peers -fn upsert_peer_and_get_response_peers( - config: &Config, - rng: &mut impl Rng, - request_sender_meta: PeerConnectionMeta, - torrent_data: &mut TorrentData, - request: AnnounceRequest, - valid_until: ValidUntil, -) -> (usize, usize, Vec>) { - // Insert/update/remove peer who sent this request - - let peer_status = - PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); - - let peer = Peer { - connection_meta: request_sender_meta, - port: request.port, - status: peer_status, - valid_until, - }; - - ::log::debug!("peer: {:?}", peer); - - let ip_or_key = request - .key - .map(Either::Right) - .unwrap_or_else(|| Either::Left(request_sender_meta.peer_ip_address)); - - let peer_map_key = PeerMapKey { - peer_id: request.peer_id, - ip_or_key, - }; - - ::log::debug!("peer map key: {:?}", peer_map_key); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_map_key.clone(), peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_map_key.clone(), peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), - }; - - ::log::debug!("opt_removed_peer: {:?}", opt_removed_peer); - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - ::log::debug!("peer request numwant: {:?}", request.numwant); - - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; - - let response_peers: Vec> = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_map_key, - Peer::to_response_peer, - ); - - ( - torrent_data.num_seeders, - torrent_data.num_leechers, - response_peers, - ) -} - pub fn handle_scrape_requests( config: &Config, torrent_maps: &mut TorrentMaps, @@ -266,44 +118,7 @@ pub fn handle_scrape_requests( requests: Drain<(ConnectionMeta, ScrapeRequest)>, ) { for (meta, request) in requests { - let num_to_take = request - .info_hashes - .len() - .min(config.protocol.max_scrape_torrents); - - let mut response = ScrapeResponse { - files: BTreeMap::new(), - }; - - let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); - - // If request.info_hashes is empty, don't return scrape for all - // torrents, even though reference server does it. It is too expensive. - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash) { - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, - }; - - response.files.insert(info_hash, stats); - } - } - } else { - for info_hash in request.info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash) { - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, - }; - - response.files.insert(info_hash, stats); - } - } - }; + let response = handle_scrape_request(config, torrent_maps, (meta, request)); response_channel_sender.send(meta, Response::Scrape(response)); wake_socket_workers[meta.worker_index] = true; diff --git a/aquatic_http/src/lib/mio/network/mod.rs b/aquatic_http/src/lib/mio/network/mod.rs index ed0a552..52e8b0d 100644 --- a/aquatic_http/src/lib/mio/network/mod.rs +++ b/aquatic_http/src/lib/mio/network/mod.rs @@ -15,6 +15,7 @@ use aquatic_http_protocol::response::*; use crate::mio::common::*; use crate::config::Config; +use crate::common::*; pub mod connection; pub mod stream;