From 43a33d80c453920333d4795eb1addce091b6b455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Jul 2020 12:26:41 +0200 Subject: [PATCH] aquatic_http: add ipv6 compact responses; fix ipv4/ipv6 issue --- TODO.md | 7 ++-- aquatic_common/src/lib.rs | 15 +++++++++ aquatic_http/src/lib/common.rs | 18 +++++++++- aquatic_http/src/lib/handler.rs | 41 +++++++++++++++++++++-- aquatic_http/src/lib/protocol/response.rs | 39 +++++++++++++++++---- aquatic_http/src/lib/protocol/utils.rs | 34 +++++++++++-------- 6 files changed, 127 insertions(+), 27 deletions(-) diff --git a/TODO.md b/TODO.md index fbdc81f..4389fae 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,7 @@ ## General +* use ipv4-mapped address functions * avx-512 should be avoided, maybe this should be mentioned in README and maybe run scripts should be adjusted @@ -10,9 +11,9 @@ * test tls * current serialized byte strings valid * scrape: does it work with multiple hashes? -* ipv6 response peers: use https://www.bittorrent.org/beps/bep_0007.html? - (peers6 compact key) - * compact=0 should result in error response +* store Ipv4Addr / Ipv6 addr in peer map, for correctness and so that strange + conversion in handler doesn't have to occur +* compact=0 should result in error response * tests of request parsing * tests of response serialization (against data known to be good would be nice) * Connection.send_response: handle case when all bytes are not written: can diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index d1ed185..889c0b5 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,4 +1,5 @@ use std::time::{Duration, Instant}; +use std::net::IpAddr; use indexmap::IndexMap; use rand::Rng; @@ -77,4 +78,18 @@ pub fn extract_response_peers( peers } +} + + +#[inline] +pub fn convert_ipv4_mapped_ipv4(ip_address: IpAddr) -> IpAddr { + if let IpAddr::V6(ip) = ip_address { + if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments(){ + ip.to_ipv4().expect("convert ipv4-mapped ip").into() + } else { + ip_address + } + } else { + ip_address + } } \ No newline at end of file diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 50ca880..3375480 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -9,7 +9,7 @@ use log::error; use mio::Token; use parking_lot::Mutex; -pub use aquatic_common::ValidUntil; +pub use aquatic_common::{ValidUntil, convert_ipv4_mapped_ipv4}; use crate::protocol::common::*; use crate::protocol::request::Request; @@ -26,6 +26,22 @@ pub struct ConnectionMeta { } +impl ConnectionMeta { + pub fn map_ipv4_ip(&self) -> Self { + let peer_addr = SocketAddr::new( + convert_ipv4_mapped_ipv4(self.peer_addr.ip()), + self.peer_addr.port() + ); + + Self { + worker_index: self.worker_index, + peer_addr, + poll_token: self.poll_token + } + } +} + + #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index 5c581ec..3e8d2da 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -1,5 +1,6 @@ use std::time::Duration; use std::vec::Drain; +use std::net::IpAddr; use either::Either; use hashbrown::HashMap; @@ -96,7 +97,9 @@ pub fn handle_announce_requests( let valid_until = ValidUntil::new(config.cleaning.max_peer_age); responses.extend(requests.map(|(request_sender_meta, request)| { - let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4(){ + let converted_request_sender_meta = request_sender_meta.map_ipv4_ip(); + + let torrent_data: &mut TorrentData = if converted_request_sender_meta.peer_addr.is_ipv4(){ torrent_maps.ipv4.entry(request.info_hash).or_default() } else { torrent_maps.ipv6.entry(request.info_hash).or_default() @@ -104,6 +107,8 @@ pub fn handle_announce_requests( // Insert/update/remove peer who sent this request { + let request_sender_meta = converted_request_sender_meta; + let peer_status = PeerStatus::from_event_and_bytes_left( request.event, Some(request.bytes_left) @@ -159,18 +164,48 @@ pub fn handle_announce_requests( Some(numwant) => numwant.min(config.protocol.max_peers), }; + // FIXME: proper protocol peer should be extracted here, not below. + // Ideally, protocol-specific IP should be stored in connection meta + // in peer map. let response_peers: Vec = extract_response_peers( rng, &torrent_data.peers, max_num_peers_to_take, - ResponsePeer::from_peer + ResponsePeer::from_peer ); + let response_peers_v4 = response_peers.iter() + .filter_map(|peer| { + if let IpAddr::V4(ip_address) = peer.ip_address { + Some(ResponsePeerV4 { + ip_address, + port: peer.port + }) + } else { + None + } + }) + .collect(); + + let response_peers_v6 = response_peers.iter() + .filter_map(|peer| { + if let IpAddr::V6(ip_address) = peer.ip_address { + Some(ResponsePeerV6 { + ip_address, + port: peer.port + }) + } else { + None + } + }) + .collect(); + let response = Response::Announce(AnnounceResponse { complete: torrent_data.num_seeders, incomplete: torrent_data.num_leechers, announce_interval: config.protocol.peer_announce_interval, - peers: response_peers, + peers: ResponsePeerListV4(response_peers_v4), + peers6: ResponsePeerListV6(response_peers_v6), }); (request_sender_meta, response) diff --git a/aquatic_http/src/lib/protocol/response.rs b/aquatic_http/src/lib/protocol/response.rs index dc4621d..1fb905e 100644 --- a/aquatic_http/src/lib/protocol/response.rs +++ b/aquatic_http/src/lib/protocol/response.rs @@ -1,4 +1,4 @@ -use std::net::IpAddr; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use hashbrown::HashMap; use serde::Serialize; @@ -9,7 +9,6 @@ use super::common::*; use super::utils::*; -#[derive(Clone, Copy, Debug, Serialize)] pub struct ResponsePeer { pub ip_address: IpAddr, pub port: u16 @@ -28,6 +27,36 @@ impl ResponsePeer { } +#[derive(Clone, Copy, Debug, Serialize)] +pub struct ResponsePeerV4 { + pub ip_address: Ipv4Addr, + pub port: u16 +} + + +#[derive(Clone, Copy, Debug, Serialize)] +pub struct ResponsePeerV6 { + pub ip_address: Ipv6Addr, + pub port: u16 +} + + +#[derive(Debug, Clone, Serialize)] +#[serde(transparent)] +pub struct ResponsePeerListV4( + #[serde(serialize_with = "serialize_response_peers_ipv4")] + pub Vec +); + + +#[derive(Debug, Clone, Serialize)] +#[serde(transparent)] +pub struct ResponsePeerListV6( + #[serde(serialize_with = "serialize_response_peers_ipv6")] + pub Vec +); + + #[derive(Debug, Clone, Serialize)] pub struct ScrapeStatistics { pub complete: usize, @@ -42,10 +71,8 @@ pub struct AnnounceResponse { pub announce_interval: usize, pub complete: usize, pub incomplete: usize, - #[serde( - serialize_with = "serialize_response_peers_compact" - )] - pub peers: Vec, + pub peers: ResponsePeerListV4, + pub peers6: ResponsePeerListV6, } diff --git a/aquatic_http/src/lib/protocol/utils.rs b/aquatic_http/src/lib/protocol/utils.rs index 91e6e0f..ff681d7 100644 --- a/aquatic_http/src/lib/protocol/utils.rs +++ b/aquatic_http/src/lib/protocol/utils.rs @@ -1,8 +1,6 @@ -use std::net::IpAddr; - use serde::Serializer; -use super::response::ResponsePeer; +use super::response::{ResponsePeerV4, ResponsePeerV6}; /// Not for serde @@ -42,22 +40,30 @@ pub fn serialize_20_bytes( } -pub fn serialize_response_peers_compact( - response_peers: &[ResponsePeer], +pub fn serialize_response_peers_ipv4( + response_peers: &[ResponsePeerV4], serializer: S ) -> Result where S: Serializer { let mut bytes = Vec::with_capacity(response_peers.len() * 6); for peer in response_peers { - match peer.ip_address { - IpAddr::V4(ip) => { - bytes.extend_from_slice(&u32::from(ip).to_be_bytes()); - bytes.extend_from_slice(&peer.port.to_be_bytes()) - }, - IpAddr::V6(_) => { - continue; - } - } + bytes.extend_from_slice(&u32::from(peer.ip_address).to_be_bytes()); + bytes.extend_from_slice(&peer.port.to_be_bytes()) + } + + serializer.serialize_bytes(&bytes) +} + + +pub fn serialize_response_peers_ipv6( + response_peers: &[ResponsePeerV6], + serializer: S +) -> Result where S: Serializer { + let mut bytes = Vec::with_capacity(response_peers.len() * 6); + + for peer in response_peers { + bytes.extend_from_slice(&u128::from(peer.ip_address).to_be_bytes()); + bytes.extend_from_slice(&peer.port.to_be_bytes()) } serializer.serialize_bytes(&bytes)