diff --git a/README.md b/README.md index a9734b8..346fa84 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,6 @@ Implements: * Doesn't care about IP addresses sent in announce requests. The packet source IP is always used. * Doesn't track of the number of torrent downloads (0 is always sent). - * [IPv6 support](https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/) IPv4 and IPv6 peers are tracked separately. diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 693f31d..36affe7 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -14,22 +14,6 @@ use crate::config::Config; pub const MAX_PACKET_SIZE: usize = 8192; -pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { - fn ip_addr(self) -> IpAddr; -} - -impl Ip for Ipv4Addr { - fn ip_addr(self) -> IpAddr { - IpAddr::V4(self) - } -} - -impl Ip for Ipv6Addr { - fn ip_addr(self) -> IpAddr { - IpAddr::V6(self) - } -} - #[derive(Debug)] pub struct PendingScrapeRequest { pub slab_key: usize, @@ -50,8 +34,8 @@ pub enum ConnectedRequest { #[derive(Debug)] pub enum ConnectedResponse { - AnnounceIpv4(AnnounceResponseIpv4), - AnnounceIpv6(AnnounceResponseIpv6), + AnnounceIpv4(AnnounceResponse), + AnnounceIpv6(AnnounceResponse), Scrape(PendingScrapeResponse), } @@ -234,14 +218,14 @@ mod tests { let config = Config::default(); - let peers = ::std::iter::repeat(ResponsePeerIpv6 { + let peers = ::std::iter::repeat(ResponsePeer { ip_address: Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1), port: Port(1), }) .take(config.protocol.max_response_peers) .collect(); - let response = Response::AnnounceIpv6(AnnounceResponseIpv6 { + let response = Response::AnnounceIpv6(AnnounceResponse { transaction_id: TransactionId(1), announce_interval: AnnounceInterval(1), seeders: NumberOfPeers(1), diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 590f3e3..4c013eb 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -30,6 +30,15 @@ struct Peer { pub valid_until: ValidUntil, } +impl Peer { + fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port, + } + } +} + type PeerMap = AHashIndexMap>; struct TorrentData { @@ -111,68 +120,6 @@ impl TorrentMaps { } } -#[derive(Clone, PartialEq, Debug)] -pub struct ProtocolResponsePeer { - pub ip_address: I, - pub port: Port, -} - -impl ProtocolResponsePeer { - #[inline(always)] - fn from_peer(peer: &Peer) -> Self { - Self { - ip_address: peer.ip_address, - port: peer.port, - } - } -} - -pub struct ProtocolAnnounceResponse { - pub transaction_id: TransactionId, - pub announce_interval: AnnounceInterval, - pub leechers: NumberOfPeers, - pub seeders: NumberOfPeers, - pub peers: Vec>, -} - -impl Into for ProtocolAnnounceResponse { - fn into(self) -> ConnectedResponse { - ConnectedResponse::AnnounceIpv4(AnnounceResponseIpv4 { - transaction_id: self.transaction_id, - announce_interval: self.announce_interval, - leechers: self.leechers, - seeders: self.seeders, - peers: self - .peers - .into_iter() - .map(|peer| ResponsePeerIpv4 { - ip_address: peer.ip_address, - port: peer.port, - }) - .collect(), - }) - } -} - -impl Into for ProtocolAnnounceResponse { - fn into(self) -> ConnectedResponse { - ConnectedResponse::AnnounceIpv6(AnnounceResponseIpv6 { - transaction_id: self.transaction_id, - announce_interval: self.announce_interval, - leechers: self.leechers, - seeders: self.seeders, - peers: self - .peers - .into_iter() - .map(|peer| ResponsePeerIpv6 { - ip_address: peer.ip_address, - port: peer.port, - }) - .collect(), - }) - } -} - pub fn run_request_worker( config: Config, state: State, @@ -258,24 +205,22 @@ fn handle_announce_request( peer_valid_until: ValidUntil, ) -> ConnectedResponse { match src.get().ip() { - IpAddr::V4(ip) => handle_announce_request_inner( + IpAddr::V4(ip) => ConnectedResponse::AnnounceIpv4(handle_announce_request_inner( config, rng, &mut torrents.ipv4, request, ip, peer_valid_until, - ) - .into(), - IpAddr::V6(ip) => handle_announce_request_inner( + )), + IpAddr::V6(ip) => ConnectedResponse::AnnounceIpv6(handle_announce_request_inner( config, rng, &mut torrents.ipv6, request, ip, peer_valid_until, - ) - .into(), + )), } } @@ -286,7 +231,7 @@ fn handle_announce_request_inner( request: AnnounceRequest, peer_ip: I, peer_valid_until: ValidUntil, -) -> ProtocolAnnounceResponse { +) -> AnnounceResponse { let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); let peer = Peer { @@ -329,10 +274,10 @@ fn handle_announce_request_inner( &torrent_data.peers, max_num_peers_to_take, request.peer_id, - ProtocolResponsePeer::from_peer, + Peer::to_response_peer, ); - ProtocolAnnounceResponse { + AnnounceResponse { transaction_id: request.transaction_id, announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), leechers: NumberOfPeers(torrent_data.num_leechers as i32), @@ -448,7 +393,7 @@ mod tests { if i == 0 { opt_sender_key = Some(key); - opt_sender_peer = Some(ProtocolResponsePeer::from_peer(&peer)); + opt_sender_peer = Some(peer.to_response_peer()); } peer_map.insert(key, peer); @@ -461,7 +406,7 @@ mod tests { &peer_map, req_num_peers, opt_sender_key.unwrap_or_else(|| gen_peer_id(1)), - ProtocolResponsePeer::from_peer, + Peer::to_response_peer, ); // Check that number of returned peers is correct diff --git a/aquatic_udp_load_test/src/worker/mod.rs b/aquatic_udp_load_test/src/worker/mod.rs index 816dafc..40b0a62 100644 --- a/aquatic_udp_load_test/src/worker/mod.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -56,7 +56,7 @@ pub fn run_worker_thread( for event in events.iter() { if (event.token() == token) & event.is_readable() { while let Ok(amt) = socket.recv(&mut buffer) { - match Response::from_bytes(&buffer[0..amt]) { + match Response::from_bytes(&buffer[0..amt], addr.is_ipv4()) { Ok(response) => { match response { Response::AnnounceIpv4(ref r) => { diff --git a/aquatic_udp_protocol/src/common.rs b/aquatic_udp_protocol/src/common.rs index 77192c6..8aab39f 100644 --- a/aquatic_udp_protocol/src/common.rs +++ b/aquatic_udp_protocol/src/common.rs @@ -1,5 +1,11 @@ +use std::fmt::Debug; use std::net::{Ipv4Addr, Ipv6Addr}; +pub trait Ip: Clone + Copy + Debug + PartialEq + Eq {} + +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} + #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct AnnounceInterval(pub i32); @@ -30,15 +36,9 @@ pub struct PeerId(pub [u8; 20]); #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct PeerKey(pub u32); -#[derive(Hash, PartialEq, Eq, Clone, Debug)] -pub struct ResponsePeerIpv4 { - pub ip_address: Ipv4Addr, - pub port: Port, -} - -#[derive(Hash, PartialEq, Eq, Clone, Debug)] -pub struct ResponsePeerIpv6 { - pub ip_address: Ipv6Addr, +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ResponsePeer { + pub ip_address: I, pub port: Port, } @@ -69,17 +69,7 @@ impl quickcheck::Arbitrary for PeerId { } #[cfg(test)] -impl quickcheck::Arbitrary for ResponsePeerIpv4 { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - Self { - ip_address: quickcheck::Arbitrary::arbitrary(g), - port: Port(u16::arbitrary(g).into()), - } - } -} - -#[cfg(test)] -impl quickcheck::Arbitrary for ResponsePeerIpv6 { +impl quickcheck::Arbitrary for ResponsePeer { fn arbitrary(g: &mut quickcheck::Gen) -> Self { Self { ip_address: quickcheck::Arbitrary::arbitrary(g), diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index 8e9a280..634724f 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -21,21 +21,12 @@ pub struct ConnectResponse { } #[derive(PartialEq, Eq, Clone, Debug)] -pub struct AnnounceResponseIpv4 { +pub struct AnnounceResponse { pub transaction_id: TransactionId, pub announce_interval: AnnounceInterval, pub leechers: NumberOfPeers, pub seeders: NumberOfPeers, - pub peers: Vec, -} - -#[derive(PartialEq, Eq, Clone, Debug)] -pub struct AnnounceResponseIpv6 { - pub transaction_id: TransactionId, - pub announce_interval: AnnounceInterval, - pub leechers: NumberOfPeers, - pub seeders: NumberOfPeers, - pub peers: Vec, + pub peers: Vec>, } #[derive(PartialEq, Eq, Clone, Debug)] @@ -53,8 +44,8 @@ pub struct ErrorResponse { #[derive(PartialEq, Eq, Clone, Debug)] pub enum Response { Connect(ConnectResponse), - AnnounceIpv4(AnnounceResponseIpv4), - AnnounceIpv6(AnnounceResponseIpv6), + AnnounceIpv4(AnnounceResponse), + AnnounceIpv6(AnnounceResponse), Scrape(ScrapeResponse), Error(ErrorResponse), } @@ -65,14 +56,14 @@ impl From for Response { } } -impl From for Response { - fn from(r: AnnounceResponseIpv4) -> Self { +impl From> for Response { + fn from(r: AnnounceResponse) -> Self { Self::AnnounceIpv4(r) } } -impl From for Response { - fn from(r: AnnounceResponseIpv6) -> Self { +impl From> for Response { + fn from(r: AnnounceResponse) -> Self { Self::AnnounceIpv6(r) } } @@ -90,12 +81,6 @@ impl From for Response { } impl Response { - /// Returning IPv6 peers doesn't really work with UDP. It is not supported - /// by https://libtorrent.org/udp_tracker_protocol.html. There is a - /// suggestion in https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/ - /// of using action number 4 and returning IPv6 octets just like for IPv4 - /// addresses. Clients seem not to support it very well, but due to a lack - /// of alternative solutions, it is implemented here. #[inline] pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { @@ -116,6 +101,18 @@ impl Response { bytes.write_u16::(peer.port.0)?; } } + Response::AnnounceIpv6(r) => { + bytes.write_i32::(1)?; + bytes.write_i32::(r.transaction_id.0)?; + bytes.write_i32::(r.announce_interval.0)?; + bytes.write_i32::(r.leechers.0)?; + bytes.write_i32::(r.seeders.0)?; + + for peer in r.peers.iter() { + bytes.write_all(&peer.ip_address.octets())?; + bytes.write_u16::(peer.port.0)?; + } + } Response::Scrape(r) => { bytes.write_i32::(2)?; bytes.write_i32::(r.transaction_id.0)?; @@ -132,25 +129,13 @@ impl Response { bytes.write_all(r.message.as_bytes())?; } - Response::AnnounceIpv6(r) => { - bytes.write_i32::(4)?; - bytes.write_i32::(r.transaction_id.0)?; - bytes.write_i32::(r.announce_interval.0)?; - bytes.write_i32::(r.leechers.0)?; - bytes.write_i32::(r.seeders.0)?; - - for peer in r.peers.iter() { - bytes.write_all(&peer.ip_address.octets())?; - bytes.write_u16::(peer.port.0)?; - } - } } Ok(()) } #[inline] - pub fn from_bytes(bytes: &[u8]) -> Result { + pub fn from_bytes(bytes: &[u8], ipv4: bool) -> Result { let mut cursor = Cursor::new(bytes); let action = cursor.read_i32::()?; @@ -168,7 +153,7 @@ impl Response { .into()) } // Announce - 1 => { + 1 if ipv4 => { let announce_interval = cursor.read_i32::()?; let leechers = cursor.read_i32::()?; let seeders = cursor.read_i32::()?; @@ -183,14 +168,45 @@ impl Response { let ip_address = Ipv4Addr::from(ip_bytes); let port = (&chunk[4..]).read_u16::().unwrap(); - ResponsePeerIpv4 { + ResponsePeer { ip_address, port: Port(port), } }) .collect(); - Ok((AnnounceResponseIpv4 { + Ok((AnnounceResponse { + transaction_id: TransactionId(transaction_id), + announce_interval: AnnounceInterval(announce_interval), + leechers: NumberOfPeers(leechers), + seeders: NumberOfPeers(seeders), + peers, + }) + .into()) + } + 1 if !ipv4 => { + let announce_interval = cursor.read_i32::()?; + let leechers = cursor.read_i32::()?; + let seeders = cursor.read_i32::()?; + + let position = cursor.position() as usize; + let inner = cursor.into_inner(); + + let peers = inner[position..] + .chunks_exact(18) + .map(|chunk| { + let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); + let ip_address = Ipv6Addr::from(ip_bytes); + let port = (&chunk[16..]).read_u16::().unwrap(); + + ResponsePeer { + ip_address, + port: Port(port), + } + }) + .collect(); + + Ok((AnnounceResponse { transaction_id: TransactionId(transaction_id), announce_interval: AnnounceInterval(announce_interval), leechers: NumberOfPeers(leechers), @@ -240,38 +256,6 @@ impl Response { }) .into()) } - // IPv6 announce - 4 => { - let announce_interval = cursor.read_i32::()?; - let leechers = cursor.read_i32::()?; - let seeders = cursor.read_i32::()?; - - let position = cursor.position() as usize; - let inner = cursor.into_inner(); - - let peers = inner[position..] - .chunks_exact(18) - .map(|chunk| { - let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); - let ip_address = Ipv6Addr::from(ip_bytes); - let port = (&chunk[16..]).read_u16::().unwrap(); - - ResponsePeerIpv6 { - ip_address, - port: Port(port), - } - }) - .collect(); - - Ok((AnnounceResponseIpv6 { - transaction_id: TransactionId(transaction_id), - announce_interval: AnnounceInterval(announce_interval), - leechers: NumberOfPeers(leechers), - seeders: NumberOfPeers(seeders), - peers, - }) - .into()) - } _ => Ok((ErrorResponse { transaction_id: TransactionId(transaction_id), message: "Invalid action".into(), @@ -306,26 +290,10 @@ mod tests { } } - impl quickcheck::Arbitrary for AnnounceResponseIpv4 { + impl quickcheck::Arbitrary for AnnounceResponse { fn arbitrary(g: &mut quickcheck::Gen) -> Self { let peers = (0..u8::arbitrary(g)) - .map(|_| ResponsePeerIpv4::arbitrary(g)) - .collect(); - - Self { - transaction_id: TransactionId(i32::arbitrary(g)), - announce_interval: AnnounceInterval(i32::arbitrary(g)), - leechers: NumberOfPeers(i32::arbitrary(g)), - seeders: NumberOfPeers(i32::arbitrary(g)), - peers, - } - } - } - - impl quickcheck::Arbitrary for AnnounceResponseIpv6 { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - let peers = (0..u8::arbitrary(g)) - .map(|_| ResponsePeerIpv6::arbitrary(g)) + .map(|_| ResponsePeer::arbitrary(g)) .collect(); Self { @@ -351,11 +319,11 @@ mod tests { } } - fn same_after_conversion(response: Response) -> bool { + fn same_after_conversion(response: Response, ipv4: bool) -> bool { let mut buf = Vec::new(); response.clone().write(&mut buf).unwrap(); - let r2 = Response::from_bytes(&buf[..]).unwrap(); + let r2 = Response::from_bytes(&buf[..], ipv4).unwrap(); let success = response == r2; @@ -368,21 +336,21 @@ mod tests { #[quickcheck] fn test_connect_response_convert_identity(response: ConnectResponse) -> bool { - same_after_conversion(response.into()) + same_after_conversion(response.into(), true) } #[quickcheck] - fn test_announce_response_ipv4_convert_identity(response: AnnounceResponseIpv4) -> bool { - same_after_conversion(response.into()) + fn test_announce_response_ipv4_convert_identity(response: AnnounceResponse) -> bool { + same_after_conversion(response.into(), true) } #[quickcheck] - fn test_announce_response_ipv6_convert_identity(response: AnnounceResponseIpv6) -> bool { - same_after_conversion(response.into()) + fn test_announce_response_ipv6_convert_identity(response: AnnounceResponse) -> bool { + same_after_conversion(response.into(), false) } #[quickcheck] fn test_scrape_response_convert_identity(response: ScrapeResponse) -> bool { - same_after_conversion(response.into()) + same_after_conversion(response.into(), true) } }