From a3a1d1606beb350aaa74e9375835e0ee02bda3d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 31 Jul 2020 05:37:58 +0200 Subject: [PATCH] WIP: udp: add ipv6 support Returning IPv6 peers doesn't really work with UDP. It is not supported by https://libtorrent.org/udp_tracker_protocol.html. There is a suggestion in https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/ of using action number 4 and returning IPv6 octets just like for IPv4 addresses. Clients seem not to support it very well, but due to a lack of alternative solutions, it is implemented here --- aquatic_common/src/lib.rs | 2 +- aquatic_http/src/lib/common.rs | 2 +- aquatic_http/src/lib/handler.rs | 4 +- aquatic_udp/src/lib/common.rs | 54 +++-- aquatic_udp/src/lib/handlers.rs | 212 +++++++++++------- aquatic_udp/src/lib/network.rs | 20 +- aquatic_udp/src/lib/tasks.rs | 19 +- .../src/converters/responses.rs | 28 ++- 8 files changed, 230 insertions(+), 111 deletions(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 889c0b5..8e71191 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -82,7 +82,7 @@ pub fn extract_response_peers( #[inline] -pub fn convert_ipv4_mapped_ipv4(ip_address: IpAddr) -> IpAddr { +pub fn convert_ipv4_mapped_ipv6(ip_address: IpAddr) -> IpAddr { if let IpAddr::V6(ip) = ip_address { if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments(){ ip.to_ipv4().expect("convert ipv4-mapped ip").into() diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 629dff7..eb3b768 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -10,7 +10,7 @@ use mio::Token; use parking_lot::Mutex; use smartstring::{SmartString, LazyCompact}; -pub use aquatic_common::{ValidUntil, convert_ipv4_mapped_ipv4}; +pub use aquatic_common::{ValidUntil, convert_ipv4_mapped_ipv6}; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::Request; diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index 7300e80..c594eca 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -96,7 +96,7 @@ pub fn handle_announce_requests( let valid_until = ValidUntil::new(config.cleaning.max_peer_age); responses.extend(requests.map(|(request_sender_meta, request)| { - let peer_ip = convert_ipv4_mapped_ipv4( + let peer_ip = convert_ipv4_mapped_ipv6( request_sender_meta.peer_addr.ip() ); @@ -262,7 +262,7 @@ pub fn handle_scrape_requests( files: BTreeMap::new(), }; - let peer_ip = convert_ipv4_mapped_ipv4( + let peer_ip = convert_ipv4_mapped_ipv6( meta.peer_addr.ip() ); diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index a36524a..1b0c9c6 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -1,5 +1,6 @@ -use std::net::{SocketAddr, IpAddr}; +use std::net::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::{Arc, atomic::AtomicUsize}; +use std::hash::Hash; use hashbrown::HashMap; use indexmap::IndexMap; @@ -12,6 +13,24 @@ pub use aquatic_udp_protocol::types::*; pub const MAX_PACKET_SIZE: usize = 4096; +pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { + fn ip_addr(self) -> IpAddr; +} + + +impl Ip for Ipv4Addr { + fn ip_addr(self) -> IpAddr { + IpAddr::V4(self) + } +} + + +impl Ip for Ipv6Addr { + fn ip_addr(self) -> IpAddr { + IpAddr::V6(self) + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionKey { @@ -52,19 +71,19 @@ impl PeerStatus { #[derive(Clone, Debug)] -pub struct Peer { - pub ip_address: IpAddr, +pub struct Peer { + pub ip_address: I, pub port: Port, pub status: PeerStatus, pub valid_until: ValidUntil } -impl Peer { +impl Peer { #[inline(always)] pub fn to_response_peer(&self) -> ResponsePeer { ResponsePeer { - ip_address: self.ip_address, + ip_address: self.ip_address.ip_addr(), port: self.port } } @@ -72,23 +91,23 @@ impl Peer { #[derive(PartialEq, Eq, Hash, Clone)] -pub struct PeerMapKey { - pub ip: IpAddr, +pub struct PeerMapKey { + pub ip: I, pub peer_id: PeerId } -pub type PeerMap = IndexMap; +pub type PeerMap = IndexMap, Peer>; -pub struct TorrentData { - pub peers: PeerMap, +pub struct TorrentData { + pub peers: PeerMap, pub num_seeders: usize, pub num_leechers: usize, } -impl Default for TorrentData { +impl Default for TorrentData { fn default() -> Self { Self { peers: IndexMap::new(), @@ -99,7 +118,14 @@ impl Default for TorrentData { } -pub type TorrentMap = HashMap; +pub type TorrentMap = HashMap>; + + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} #[derive(Default)] @@ -115,7 +141,7 @@ pub struct Statistics { #[derive(Clone)] pub struct State { pub connections: Arc>, - pub torrents: Arc>, + pub torrents: Arc>, pub statistics: Arc, } @@ -124,7 +150,7 @@ impl State { pub fn new() -> Self { Self { connections: Arc::new(Mutex::new(HashMap::new())), - torrents: Arc::new(Mutex::new(HashMap::new())), + torrents: Arc::new(Mutex::new(TorrentMaps::default())), statistics: Arc::new(Statistics::default()), } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index b78b225..a688972 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{SocketAddr, IpAddr}; use std::time::Duration; use std::vec::Drain; @@ -6,7 +6,7 @@ use crossbeam_channel::{Sender, Receiver}; use parking_lot::MutexGuard; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; -use aquatic_common::extract_response_peers; +use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; use aquatic_udp_protocol::types::*; use crate::common::*; @@ -129,7 +129,7 @@ pub fn run_request_worker( ::std::mem::drop(connections); if !(announce_requests.is_empty() && scrape_requests.is_empty()){ - let mut torrents = state.torrents.lock(); + let mut torrents= state.torrents.lock(); handle_announce_requests( &config, @@ -189,7 +189,7 @@ pub fn handle_connect_requests( #[inline] pub fn handle_announce_requests( config: &Config, - torrents: &mut MutexGuard, + torrents: &mut MutexGuard, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>, @@ -197,83 +197,116 @@ pub fn handle_announce_requests( let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); responses.extend(requests.map(|(request, src)| { - let peer_ip = src.ip(); + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - let peer_key = PeerMapKey { - ip: peer_ip, - peer_id: request.peer_id, - }; - - let peer_status = PeerStatus::from_event_and_bytes_left( - request.event, - request.bytes_left - ); - - let peer = Peer { - ip_address: peer_ip, - port: request.port, - status: peer_status, - valid_until: peer_valid_until, - }; - - let torrent_data = torrents - .entry(request.info_hash) - .or_default(); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_key, peer) + let response = match peer_ip { + IpAddr::V4(ip) => { + handle_announce_request( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ) }, - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_key, peer) - }, - PeerStatus::Stopped => { - torrent_data.peers.remove(&peer_key) + IpAddr::V6(ip) => { + handle_announce_request( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ) } }; - match opt_removed_peer.map(|peer| peer.status){ - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - }, - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - }, - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take( - config, - request.peers_wanted.0 - ); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - Peer::to_response_peer - ); - - let response = Response::Announce(AnnounceResponse { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), - peers: response_peers - }); - - (response, src) + (response.into(), src) })); } +fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMap, + request: AnnounceRequest, + peer_ip: I, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + let peer_key = PeerMapKey { + ip: peer_ip, + peer_id: request.peer_id, + }; + + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event, + request.bytes_left + ); + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: peer_status, + valid_until: peer_valid_until, + }; + + let torrent_data = torrents + .entry(request.info_hash) + .or_default(); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_key, peer) + }, + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_key, peer) + }, + PeerStatus::Stopped => { + torrent_data.peers.remove(&peer_key) + } + }; + + match opt_removed_peer.map(|peer| peer.status){ + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + }, + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + }, + _ => {} + } + + let max_num_peers_to_take = calc_max_num_peers_to_take( + config, + request.peers_wanted.0 + ); + + let response_peers = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + Peer::to_response_peer + ); + + AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), + leechers: NumberOfPeers(torrent_data.num_leechers as i32), + seeders: NumberOfPeers(torrent_data.num_seeders as i32), + peers: response_peers + } +} + + #[inline] pub fn handle_scrape_requests( - torrents: &mut MutexGuard, + torrents: &mut MutexGuard, requests: Drain<(ScrapeRequest, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>, ){ @@ -284,14 +317,29 @@ pub fn handle_scrape_requests( request.info_hashes.len() ); - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.get(info_hash){ - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + if peer_ip.is_ipv4(){ + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv4.get(info_hash){ + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } else { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv6.get(info_hash){ + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } } } @@ -336,7 +384,7 @@ pub fn create_torrent_scrape_statistics( #[cfg(test)] mod tests { - use std::net::IpAddr; + use std::net::Ipv4Addr; use std::collections::HashSet; use indexmap::IndexMap; @@ -345,8 +393,8 @@ mod tests { use super::*; - fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { - let ip_address = IpAddr::from(i.to_be_bytes()); + fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { + let ip_address = Ipv4Addr::from(i.to_be_bytes()); let peer_id = PeerId([0; 20]); let key = PeerMapKey { @@ -369,7 +417,7 @@ mod tests { let gen_num_peers = data.0; let req_num_peers = data.1 as usize; - let mut peer_map: PeerMap = IndexMap::new(); + let mut peer_map: PeerMap = IndexMap::new(); for i in 0..gen_num_peers { let (key, value) = gen_peer_map_key_and_value(i); diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 645510a..f058c52 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -1,6 +1,6 @@ use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; use std::io::{Cursor, ErrorKind}; -use std::net::SocketAddr; +use std::net::{SocketAddr, IpAddr}; use std::time::Duration; use std::vec::Drain; @@ -215,7 +215,9 @@ fn send_responses( for (response, src) in response_iterator { cursor.set_position(0); - response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap(); + let ip_version = ip_version_from_ip(src.ip()); + + response_to_bytes(&mut cursor, response, ip_version).unwrap(); let amt = cursor.position() as usize; @@ -240,4 +242,18 @@ fn send_responses( state.statistics.bytes_sent .fetch_add(bytes_sent, Ordering::SeqCst); } +} + + +fn ip_version_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => IpVersion::IPv4, + IpAddr::V6(ip) => { + if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments(){ + IpVersion::IPv4 + } else { + IpVersion::IPv6 + } + } + } } \ No newline at end of file diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index b3af837..50c4fba 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -18,7 +18,17 @@ pub fn clean_connections_and_torrents(state: &State){ ::std::mem::drop(connections); let mut torrents = state.torrents.lock(); + + clean_torrent_map(&mut torrents.ipv4, now); + clean_torrent_map(&mut torrents.ipv6, now); +} + +#[inline] +fn clean_torrent_map( + torrents: &mut TorrentMap, + now: Instant, +){ torrents.retain(|_, torrent| { let num_seeders = &mut torrent.num_seeders; let num_leechers = &mut torrent.num_leechers; @@ -93,7 +103,14 @@ pub fn gather_and_print_statistics( let torrents = &mut state.torrents.lock(); - for torrent in torrents.values(){ + for torrent in torrents.ipv4.values(){ + let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; + + if let Err(err) = peers_per_torrent.increment(num_peers){ + eprintln!("error incrementing peers_per_torrent histogram: {}", err) + } + } + for torrent in torrents.ipv6.values(){ let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; if let Err(err) = peers_per_torrent.increment(num_peers){ diff --git a/aquatic_udp_protocol/src/converters/responses.rs b/aquatic_udp_protocol/src/converters/responses.rs index bacfc34..322bf3f 100644 --- a/aquatic_udp_protocol/src/converters/responses.rs +++ b/aquatic_udp_protocol/src/converters/responses.rs @@ -7,6 +7,12 @@ use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian}; use crate::types::*; +/// Returning IPv6 peers doesn't really work with UDP. It is not supported by +/// https://libtorrent.org/udp_tracker_protocol.html. There is a suggestion in +/// https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/ +/// of using action number 4 and returning IPv6 octets just like for IPv4 +/// addresses. Clients seem not to support it very well, but due to a lack of +/// alternative solutions, it is implemented here. #[inline] pub fn response_to_bytes( bytes: &mut impl Write, @@ -20,15 +26,14 @@ pub fn response_to_bytes( bytes.write_i64::(r.connection_id.0)?; }, Response::Announce(r) => { - bytes.write_i32::(1)?; - bytes.write_i32::(r.transaction_id.0)?; - bytes.write_i32::(r.announce_interval.0)?; - bytes.write_i32::(r.leechers.0)?; - bytes.write_i32::(r.seeders.0)?; - - // Write peer IPs and ports. Silently ignore peers with wrong - // IP version if ip_version == IpVersion::IPv4 { + bytes.write_i32::(1)?; + bytes.write_i32::(r.transaction_id.0)?; + bytes.write_i32::(r.announce_interval.0)?; + bytes.write_i32::(r.leechers.0)?; + bytes.write_i32::(r.seeders.0)?; + + // Silently ignore peers with wrong IP version for peer in r.peers { if let IpAddr::V4(ip) = peer.ip_address { bytes.write_all(&ip.octets())?; @@ -36,6 +41,13 @@ pub fn response_to_bytes( } } } else { + bytes.write_i32::(4)?; + bytes.write_i32::(r.transaction_id.0)?; + bytes.write_i32::(r.announce_interval.0)?; + bytes.write_i32::(r.leechers.0)?; + bytes.write_i32::(r.seeders.0)?; + + // Silently ignore peers with wrong IP version for peer in r.peers { if let IpAddr::V6(ip) = peer.ip_address { bytes.write_all(&ip.octets())?;