From 2386dd0e8b17b96f9fa135547618202989205ca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Jul 2020 14:12:01 +0200 Subject: [PATCH] aquatic_http: parameterise many data structures over peer IP protocol --- TODO.md | 6 +- aquatic_http/src/lib/common.rs | 58 ++--- aquatic_http/src/lib/handler.rs | 288 +++++++++++++--------- aquatic_http/src/lib/protocol/common.rs | 4 +- aquatic_http/src/lib/protocol/response.rs | 39 +-- aquatic_http/src/lib/protocol/utils.rs | 8 +- aquatic_http/src/lib/tasks.rs | 32 +-- 7 files changed, 230 insertions(+), 205 deletions(-) diff --git a/TODO.md b/TODO.md index 4389fae..ed05daa 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,9 @@ ## General -* use ipv4-mapped address functions +* use ipv4-mapped address functions, but I should check that they really + work as they really work as they should. All announces over ipv4 should + go to ipv4 map, all over ipv6 to ipv6 map * avx-512 should be avoided, maybe this should be mentioned in README and maybe run scripts should be adjusted @@ -11,8 +13,6 @@ * test tls * current serialized byte strings valid * scrape: does it work with multiple hashes? -* store Ipv4Addr / Ipv6 addr in peer map, for correctness and so that strange - conversion in handler doesn't have to occur * compact=0 should result in error response * tests of request parsing * tests of response serialization (against data known to be good would be nice) diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 3375480..c0f2549 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -1,4 +1,4 @@ -use std::net::{IpAddr, SocketAddr}; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use either::Either; @@ -13,7 +13,7 @@ pub use aquatic_common::{ValidUntil, convert_ipv4_mapped_ipv4}; use crate::protocol::common::*; use crate::protocol::request::Request; -use crate::protocol::response::Response; +use crate::protocol::response::{Response, ResponsePeer}; #[derive(Clone, Copy, Debug)] @@ -26,19 +26,11 @@ pub struct ConnectionMeta { } -impl ConnectionMeta { - pub fn map_ipv4_ip(&self) -> Self { - let peer_addr = SocketAddr::new( - convert_ipv4_mapped_ipv4(self.peer_addr.ip()), - self.peer_addr.port() - ); - - Self { - worker_index: self.worker_index, - peer_addr, - poll_token: self.poll_token - } - } +#[derive(Clone, Copy, Debug)] +pub struct PeerConnectionMeta

{ + pub worker_index: usize, + pub poll_token: Token, + pub peer_ip_address: P, } @@ -71,32 +63,42 @@ impl PeerStatus { #[derive(Clone, Copy)] -pub struct Peer { - pub connection_meta: ConnectionMeta, +pub struct Peer

{ + pub connection_meta: PeerConnectionMeta

, pub port: u16, pub status: PeerStatus, pub valid_until: ValidUntil, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PeerMapKey { - pub peer_id: PeerId, - pub ip_or_key: Either +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.connection_meta.peer_ip_address, + port: self.port + } + } } -pub type PeerMap = IndexMap; +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PeerMapKey { + pub peer_id: PeerId, + pub ip_or_key: Either +} -pub struct TorrentData { - pub peers: PeerMap, +pub type PeerMap

= IndexMap, Peer

>; + + +pub struct TorrentData { + pub peers: PeerMap

, pub num_seeders: usize, pub num_leechers: usize, } -impl Default for TorrentData { +impl Default for TorrentData

{ #[inline] fn default() -> Self { Self { @@ -108,13 +110,13 @@ impl Default for TorrentData { } -pub type TorrentMap = HashMap; +pub type TorrentMap

= HashMap>; #[derive(Default)] pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, } diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index 3e8d2da..caff517 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -1,6 +1,6 @@ use std::time::Duration; use std::vec::Drain; -use std::net::IpAddr; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use either::Either; use hashbrown::HashMap; @@ -97,122 +97,155 @@ pub fn handle_announce_requests( let valid_until = ValidUntil::new(config.cleaning.max_peer_age); responses.extend(requests.map(|(request_sender_meta, request)| { - let converted_request_sender_meta = request_sender_meta.map_ipv4_ip(); - - let torrent_data: &mut TorrentData = if converted_request_sender_meta.peer_addr.is_ipv4(){ - torrent_maps.ipv4.entry(request.info_hash).or_default() - } else { - torrent_maps.ipv6.entry(request.info_hash).or_default() - }; - - // Insert/update/remove peer who sent this request - { - let request_sender_meta = converted_request_sender_meta; - - let peer_status = PeerStatus::from_event_and_bytes_left( - request.event, - Some(request.bytes_left) - ); - - let peer = Peer { - connection_meta: request_sender_meta, - port: request.port, - status: peer_status, - valid_until, - }; - - let ip_or_key = request.key - .map(Either::Right) - .unwrap_or_else(|| - Either::Left(request_sender_meta.peer_addr.ip()) - ); - - let peer_map_key = PeerMapKey { - peer_id: request.peer_id, - ip_or_key, - }; - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_map_key, peer) - }, - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_map_key, peer) - }, - PeerStatus::Stopped => { - torrent_data.peers.remove(&peer_map_key) - } - }; - - match opt_removed_peer.map(|peer| peer.status){ - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - }, - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - }, - _ => {} - } - } - - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; - - // FIXME: proper protocol peer should be extracted here, not below. - // Ideally, protocol-specific IP should be stored in connection meta - // in peer map. - let response_peers: Vec = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - ResponsePeer::from_peer + let peer_ip = convert_ipv4_mapped_ipv4( + request_sender_meta.peer_addr.ip() ); - let response_peers_v4 = response_peers.iter() - .filter_map(|peer| { - if let IpAddr::V4(ip_address) = peer.ip_address { - Some(ResponsePeerV4 { - ip_address, - port: peer.port - }) - } else { - None - } - }) - .collect(); + let response = match peer_ip { + IpAddr::V4(peer_ip_address) => { + let torrent_data: &mut TorrentData = torrent_maps.ipv4 + .entry(request.info_hash) + .or_default(); + + let peer_connection_meta = PeerConnectionMeta { + worker_index: request_sender_meta.worker_index, + poll_token: request_sender_meta.poll_token, + peer_ip_address, + }; - let response_peers_v6 = response_peers.iter() - .filter_map(|peer| { - if let IpAddr::V6(ip_address) = peer.ip_address { - Some(ResponsePeerV6 { - ip_address, - port: peer.port - }) - } else { - None - } - }) - .collect(); + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + peer_connection_meta, + torrent_data, + request, + valid_until + ); - let response = Response::Announce(AnnounceResponse { - complete: torrent_data.num_seeders, - incomplete: torrent_data.num_leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(response_peers_v4), - peers6: ResponsePeerListV6(response_peers_v6), - }); + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(response_peers), + peers6: ResponsePeerListV6(vec![]), + }; + + Response::Announce(response) + }, + IpAddr::V6(peer_ip_address) => { + let torrent_data: &mut TorrentData = torrent_maps.ipv6 + .entry(request.info_hash) + .or_default(); + + let peer_connection_meta = PeerConnectionMeta { + worker_index: request_sender_meta.worker_index, + poll_token: request_sender_meta.poll_token, + peer_ip_address + }; + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + peer_connection_meta, + torrent_data, + request, + valid_until + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(vec![]), + peers6: ResponsePeerListV6(response_peers), + }; + + Response::Announce(response) + }, + }; (request_sender_meta, response) })); } +/// Insert/update peer. Return num_seeders, num_leechers and response peers +fn upsert_peer_and_get_response_peers( + config: &Config, + rng: &mut impl Rng, + request_sender_meta: PeerConnectionMeta

, + torrent_data: &mut TorrentData

, + request: AnnounceRequest, + valid_until: ValidUntil, +) -> (usize, usize, Vec>) { + // Insert/update/remove peer who sent this request + { + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event, + Some(request.bytes_left) + ); + + let peer = Peer { + connection_meta: request_sender_meta, + port: request.port, + status: peer_status, + valid_until, + }; + + let ip_or_key = request.key + .map(Either::Right) + .unwrap_or_else(|| + Either::Left(request_sender_meta.peer_ip_address) + ); + + let peer_map_key = PeerMapKey { + peer_id: request.peer_id, + ip_or_key, + }; + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_map_key, peer) + }, + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_map_key, peer) + }, + PeerStatus::Stopped => { + torrent_data.peers.remove(&peer_map_key) + } + }; + + match opt_removed_peer.map(|peer| peer.status){ + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + }, + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + }, + _ => {} + } + } + + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; + + let response_peers: Vec> = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + Peer::to_response_peer + ); + + (torrent_data.num_seeders, torrent_data.num_leechers, response_peers) +} + + pub fn handle_scrape_requests( config: &Config, torrent_maps: &mut TorrentMaps, @@ -228,25 +261,38 @@ pub fn handle_scrape_requests( files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4(){ - &mut torrent_maps.ipv4 - } else { - &mut torrent_maps.ipv6 - }; + let peer_ip = convert_ipv4_mapped_ipv4( + meta.peer_addr.ip() + ); // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. - for info_hash in request.info_hashes.into_iter().take(num_to_take){ - if let Some(torrent_data) = torrent_map.get(&info_hash){ - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, - }; + if peer_ip.is_ipv4(){ + for info_hash in request.info_hashes.into_iter().take(num_to_take){ + if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash){ + let stats = ScrapeStatistics { + complete: torrent_data.num_seeders, + downloaded: 0, // No implementation planned + incomplete: torrent_data.num_leechers, + }; - response.files.insert(info_hash, stats); + response.files.insert(info_hash, stats); + } } - } + } else { + for info_hash in request.info_hashes.into_iter().take(num_to_take){ + if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash){ + let stats = ScrapeStatistics { + complete: torrent_data.num_seeders, + downloaded: 0, // No implementation planned + incomplete: torrent_data.num_leechers, + }; + + response.files.insert(info_hash, stats); + } + } + }; + (meta, Response::Scrape(response)) })); diff --git a/aquatic_http/src/lib/protocol/common.rs b/aquatic_http/src/lib/protocol/common.rs index 25949db..782bc22 100644 --- a/aquatic_http/src/lib/protocol/common.rs +++ b/aquatic_http/src/lib/protocol/common.rs @@ -5,7 +5,7 @@ use serde::Serialize; use super::utils::*; -#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)] #[serde(transparent)] pub struct PeerId( #[serde( @@ -15,7 +15,7 @@ pub struct PeerId( ); -#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)] #[serde(transparent)] pub struct InfoHash( #[serde( diff --git a/aquatic_http/src/lib/protocol/response.rs b/aquatic_http/src/lib/protocol/response.rs index 1fb905e..fd9b217 100644 --- a/aquatic_http/src/lib/protocol/response.rs +++ b/aquatic_http/src/lib/protocol/response.rs @@ -1,42 +1,15 @@ -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr}; use hashbrown::HashMap; use serde::Serialize; -use crate::common::Peer; - use super::common::*; use super::utils::*; -pub struct ResponsePeer { - pub ip_address: IpAddr, - pub port: u16 -} - - -impl ResponsePeer { - pub fn from_peer(peer: &Peer) -> Self { - let ip_address = peer.connection_meta.peer_addr.ip(); - - Self { - ip_address, - port: peer.port - } - } -} - - -#[derive(Clone, Copy, Debug, Serialize)] -pub struct ResponsePeerV4 { - pub ip_address: Ipv4Addr, - pub port: u16 -} - - -#[derive(Clone, Copy, Debug, Serialize)] -pub struct ResponsePeerV6 { - pub ip_address: Ipv6Addr, +#[derive(Debug, Clone, Serialize)] +pub struct ResponsePeer{ + pub ip_address: I, pub port: u16 } @@ -45,7 +18,7 @@ pub struct ResponsePeerV6 { #[serde(transparent)] pub struct ResponsePeerListV4( #[serde(serialize_with = "serialize_response_peers_ipv4")] - pub Vec + pub Vec> ); @@ -53,7 +26,7 @@ pub struct ResponsePeerListV4( #[serde(transparent)] pub struct ResponsePeerListV6( #[serde(serialize_with = "serialize_response_peers_ipv6")] - pub Vec + pub Vec> ); diff --git a/aquatic_http/src/lib/protocol/utils.rs b/aquatic_http/src/lib/protocol/utils.rs index ff681d7..eea0d86 100644 --- a/aquatic_http/src/lib/protocol/utils.rs +++ b/aquatic_http/src/lib/protocol/utils.rs @@ -1,6 +1,8 @@ +use std::net::{Ipv4Addr, Ipv6Addr}; + use serde::Serializer; -use super::response::{ResponsePeerV4, ResponsePeerV6}; +use super::response::ResponsePeer; /// Not for serde @@ -41,7 +43,7 @@ pub fn serialize_20_bytes( pub fn serialize_response_peers_ipv4( - response_peers: &[ResponsePeerV4], + response_peers: &[ResponsePeer], serializer: S ) -> Result where S: Serializer { let mut bytes = Vec::with_capacity(response_peers.len() * 6); @@ -56,7 +58,7 @@ pub fn serialize_response_peers_ipv4( pub fn serialize_response_peers_ipv6( - response_peers: &[ResponsePeerV6], + response_peers: &[ResponsePeer], serializer: S ) -> Result where S: Serializer { let mut bytes = Vec::with_capacity(response_peers.len() * 6); diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs index e91e5e4..ad901dc 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/tasks.rs @@ -4,24 +4,26 @@ use crate::common::*; pub fn clean_torrents(state: &State){ - fn clean_torrent_map( - torrent_map: &mut TorrentMap, - ){ - let now = Instant::now(); - - torrent_map.retain(|_, torrent_data| { - torrent_data.peers.retain(|_, peer| { - peer.valid_until.0 >= now - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); - } let mut torrent_maps = state.torrent_maps.lock(); clean_torrent_map(&mut torrent_maps.ipv4); clean_torrent_map(&mut torrent_maps.ipv6); +} + + +fn clean_torrent_map( + torrent_map: &mut TorrentMap, +){ + let now = Instant::now(); + + torrent_map.retain(|_, torrent_data| { + torrent_data.peers.retain(|_, peer| { + peer.valid_until.0 >= now + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); } \ No newline at end of file