diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 8a29d21..41c52e8 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -18,35 +18,6 @@ pub mod network; pub const MAX_PACKET_SIZE: usize = 8192; -#[derive(Debug)] -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape { - request: ScrapeRequest, - /// Currently only used by glommio implementation - original_indices: Vec, - }, -} - -#[derive(Debug)] -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape { - response: ScrapeResponse, - /// Currently only used by glommio implementation - original_indices: Vec, - }, -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape { response, .. } => Response::Scrape(response), - } - } -} - pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { fn ip_addr(self) -> IpAddr; } @@ -63,6 +34,89 @@ impl Ip for Ipv6Addr { } } +#[derive(Debug)] +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape { + request: ScrapeRequest, + /// Currently only used by glommio implementation + original_indices: Vec, + }, +} + +#[derive(Debug)] +pub enum ConnectedResponse { + AnnounceIpv4(AnnounceResponseIpv4), + AnnounceIpv6(AnnounceResponseIpv6), + Scrape { + response: ScrapeResponse, + /// Currently only used by glommio implementation + original_indices: Vec, + }, +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::AnnounceIpv4(response) => Response::AnnounceIpv4(response), + Self::AnnounceIpv6(response) => Response::AnnounceIpv6(response), + Self::Scrape { response, .. } => Response::Scrape(response), + } + } +} + +#[derive(Clone, PartialEq, Debug)] +pub struct ProtocolResponsePeer { + pub ip_address: I, + pub port: Port, +} + +pub struct ProtocolAnnounceResponse { + pub transaction_id: TransactionId, + pub announce_interval: AnnounceInterval, + pub leechers: NumberOfPeers, + pub seeders: NumberOfPeers, + pub peers: Vec>, +} + +impl Into for ProtocolAnnounceResponse { + fn into(self) -> ConnectedResponse { + ConnectedResponse::AnnounceIpv4(AnnounceResponseIpv4 { + transaction_id: self.transaction_id, + announce_interval: self.announce_interval, + leechers: self.leechers, + seeders: self.seeders, + peers: self + .peers + .into_iter() + .map(|peer| ResponsePeerIpv4 { + ip_address: peer.ip_address, + port: peer.port, + }) + .collect(), + }) + } +} + +impl Into for ProtocolAnnounceResponse { + fn into(self) -> ConnectedResponse { + ConnectedResponse::AnnounceIpv6(AnnounceResponseIpv6 { + transaction_id: self.transaction_id, + announce_interval: self.announce_interval, + leechers: self.leechers, + seeders: self.seeders, + peers: self + .peers + .into_iter() + .map(|peer| ResponsePeerIpv6 { + ip_address: peer.ip_address, + port: peer.port, + }) + .collect(), + }) + } +} + #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, @@ -96,9 +150,9 @@ pub struct Peer { impl Peer { #[inline(always)] - pub fn to_response_peer(&self) -> ResponsePeer { - ResponsePeer { - ip_address: self.ip_address.ip_addr(), + pub fn to_response_peer(&self) -> ProtocolResponsePeer { + ProtocolResponsePeer { + ip_address: self.ip_address, port: self.port, } } @@ -230,7 +284,7 @@ pub fn ip_version_from_ip(ip: IpAddr) -> IpVersion { #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv6Addr}; + use std::net::Ipv6Addr; use crate::{common::MAX_PACKET_SIZE, config::Config}; @@ -263,14 +317,14 @@ mod tests { let config = Config::default(); - let peers = ::std::iter::repeat(ResponsePeer { - ip_address: IpAddr::V6(Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1)), + let peers = ::std::iter::repeat(ResponsePeerIpv6 { + ip_address: Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1), port: Port(1), }) .take(config.protocol.max_response_peers) .collect(); - let response = Response::Announce(AnnounceResponse { + let response = Response::AnnounceIpv6(AnnounceResponseIpv6 { transaction_id: TransactionId(1), announce_interval: AnnounceInterval(1), seeders: NumberOfPeers(1), @@ -280,7 +334,7 @@ mod tests { let mut buf = Vec::new(); - response.write(&mut buf, IpVersion::IPv6).unwrap(); + response.write(&mut buf).unwrap(); println!("Buffer len: {}", buf.len()); diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index c77ce89..1650a98 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -79,7 +79,7 @@ pub fn run_request_worker( peer_valid_until, ); - (ConnectedResponse::Announce(response), src) + (response, src) })); responses.extend(scrape_requests.drain(..).map(|(request, src)| { @@ -107,8 +107,8 @@ pub fn handle_announce_request( request: AnnounceRequest, src: SocketAddr, peer_valid_until: ValidUntil, -) -> AnnounceResponse { - match convert_ipv4_mapped_ipv6(src.ip()) { +) -> ConnectedResponse { + match src.ip() { IpAddr::V4(ip) => handle_announce_request_inner( config, rng, @@ -116,7 +116,8 @@ pub fn handle_announce_request( request, ip, peer_valid_until, - ), + ) + .into(), IpAddr::V6(ip) => handle_announce_request_inner( config, rng, @@ -124,7 +125,8 @@ pub fn handle_announce_request( request, ip, peer_valid_until, - ), + ) + .into(), } } @@ -135,7 +137,7 @@ fn handle_announce_request_inner( request: AnnounceRequest, peer_ip: I, peer_valid_until: ValidUntil, -) -> AnnounceResponse { +) -> ProtocolAnnounceResponse { let peer_key = PeerMapKey { ip: peer_ip, peer_id: request.peer_id, @@ -186,7 +188,7 @@ fn handle_announce_request_inner( Peer::to_response_peer, ); - AnnounceResponse { + ProtocolAnnounceResponse { transaction_id: request.transaction_id, announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), leechers: NumberOfPeers(torrent_data.num_leechers as i32), diff --git a/aquatic_udp/src/lib/network_mio.rs b/aquatic_udp/src/lib/network_mio.rs index 164aeb1..35769cf 100644 --- a/aquatic_udp/src/lib/network_mio.rs +++ b/aquatic_udp/src/lib/network_mio.rs @@ -1,5 +1,5 @@ use std::io::{Cursor, ErrorKind}; -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -128,6 +128,22 @@ fn read_requests( requests_received += 1; } + let src = match src { + SocketAddr::V6(src) => { + match src.ip().octets() { + // Convert IPv4-mapped address (available in std but nightly-only) + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(a, b, c, d), + src.port(), + )) + } + _ => src.into(), + } + } + src => src, + }; + handle_request( config, connections, @@ -182,16 +198,31 @@ fn send_responses( .map(|(response, addr)| (response.into(), addr)), ); - for (response, src) in response_iterator { + for (response, addr) in response_iterator { cursor.set_position(0); - let ip_version = ip_version_from_ip(src.ip()); + let addr = if config.network.address.is_ipv4() { + if let SocketAddr::V4(addr) = addr { + SocketAddr::V4(addr) + } else { + unreachable!() + } + } else { + match addr { + SocketAddr::V4(addr) => { + let ip = addr.ip().to_ipv6_mapped(); - match response.write(&mut cursor, ip_version) { + SocketAddr::V6(SocketAddrV6::new(ip, addr.port(), 0, 0)) + } + addr => addr, + } + }; + + match response.write(&mut cursor) { Ok(()) => { let amt = cursor.position() as usize; - match socket.send_to(&cursor.get_ref()[..amt], src) { + match socket.send_to(&cursor.get_ref()[..amt], addr) { Ok(amt) => { responses_sent += 1; bytes_sent += amt; diff --git a/aquatic_udp/src/lib/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs index dc2722e..7d56705 100644 --- a/aquatic_udp/src/lib/network_uring.rs +++ b/aquatic_udp/src/lib/network_uring.rs @@ -215,7 +215,7 @@ pub fn run_socket_worker( let buffer_index = user_data.get_buffer_index(); let buffer_len = result as usize; - let addr = if config.network.address.is_ipv4() { + let src = if config.network.address.is_ipv4() { SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::from(u32::from_be( sockaddrs_ipv4[buffer_index].sin_addr.s_addr, @@ -258,7 +258,7 @@ pub fn run_socket_worker( &mut local_responses, valid_until, res_request, - addr, + src, ); } } @@ -397,7 +397,7 @@ fn queue_response( let mut cursor = Cursor::new(&mut buffers[buffer_index][..]); - match response.write(&mut cursor, ip_version_from_ip(addr.ip())) { + match response.write(&mut cursor) { Ok(()) => { iovs[buffer_index].iov_len = cursor.position() as usize; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index bcc84e8..1354bdd 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -42,7 +42,7 @@ pub fn bench_announce_handler( .unwrap(); } - while let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::AnnounceIpv4(r), _)) = response_receiver.try_recv() { num_responses += 1; if let Some(last_peer) = r.peers.last() { @@ -54,7 +54,7 @@ pub fn bench_announce_handler( let total = bench_config.num_announce_requests * (round + 1); while num_responses < total { - if let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::AnnounceIpv4(r), _)) = response_receiver.recv() { num_responses += 1; if let Some(last_peer) = r.peers.last() { diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/handler.rs index 77180f7..e690ca4 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/handler.rs @@ -165,7 +165,14 @@ fn process_response( Some(request) } - Response::Announce(r) => if_torrent_peer_move_and_create_random_request( + Response::AnnounceIpv4(r) => if_torrent_peer_move_and_create_random_request( + config, + rng, + info_hashes, + torrent_peers, + r.transaction_id, + ), + Response::AnnounceIpv6(r) => if_torrent_peer_move_and_create_random_request( config, rng, info_hashes, diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index 0f0269f..358a27b 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -129,7 +129,11 @@ fn read_responses( match Response::from_bytes(&buffer[0..amt]) { Ok(response) => { match response { - Response::Announce(ref r) => { + Response::AnnounceIpv4(ref r) => { + ls.responses_announce += 1; + ls.response_peers += r.peers.len(); + } + Response::AnnounceIpv6(ref r) => { ls.responses_announce += 1; ls.response_peers += r.peers.len(); } diff --git a/aquatic_udp_protocol/src/common.rs b/aquatic_udp_protocol/src/common.rs index ac56e18..6070e29 100644 --- a/aquatic_udp_protocol/src/common.rs +++ b/aquatic_udp_protocol/src/common.rs @@ -1,4 +1,4 @@ -use std::net::IpAddr; +use std::net::{Ipv4Addr, Ipv6Addr}; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum IpVersion { @@ -37,8 +37,14 @@ pub struct PeerId(pub [u8; 20]); pub struct PeerKey(pub u32); #[derive(Hash, PartialEq, Eq, Clone, Debug)] -pub struct ResponsePeer { - pub ip_address: IpAddr, +pub struct ResponsePeerIpv4 { + pub ip_address: Ipv4Addr, + pub port: Port, +} + +#[derive(Hash, PartialEq, Eq, Clone, Debug)] +pub struct ResponsePeerIpv6 { + pub ip_address: Ipv6Addr, pub port: Port, } @@ -80,11 +86,21 @@ impl quickcheck::Arbitrary for PeerId { } #[cfg(test)] -impl quickcheck::Arbitrary for ResponsePeer { +impl quickcheck::Arbitrary for ResponsePeerIpv4 { fn arbitrary(g: &mut quickcheck::Gen) -> Self { Self { - ip_address: ::std::net::IpAddr::arbitrary(g), - port: Port(u16::arbitrary(g)), + ip_address: quickcheck::Arbitrary::arbitrary(g), + port: Port(u16::arbitrary(g).into()), + } + } +} + +#[cfg(test)] +impl quickcheck::Arbitrary for ResponsePeerIpv6 { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + Self { + ip_address: quickcheck::Arbitrary::arbitrary(g), + port: Port(u16::arbitrary(g).into()), } } } diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index b8a514a..99f3afa 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::convert::TryInto; use std::io::{self, Cursor, Write}; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr}; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; @@ -21,12 +21,21 @@ pub struct ConnectResponse { } #[derive(PartialEq, Eq, Clone, Debug)] -pub struct AnnounceResponse { +pub struct AnnounceResponseIpv4 { pub transaction_id: TransactionId, pub announce_interval: AnnounceInterval, pub leechers: NumberOfPeers, pub seeders: NumberOfPeers, - pub peers: Vec, + pub peers: Vec, +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct AnnounceResponseIpv6 { + pub transaction_id: TransactionId, + pub announce_interval: AnnounceInterval, + pub leechers: NumberOfPeers, + pub seeders: NumberOfPeers, + pub peers: Vec, } #[derive(PartialEq, Eq, Clone, Debug)] @@ -44,7 +53,8 @@ pub struct ErrorResponse { #[derive(PartialEq, Eq, Clone, Debug)] pub enum Response { Connect(ConnectResponse), - Announce(AnnounceResponse), + AnnounceIpv4(AnnounceResponseIpv4), + AnnounceIpv6(AnnounceResponseIpv6), Scrape(ScrapeResponse), Error(ErrorResponse), } @@ -55,9 +65,15 @@ impl From for Response { } } -impl From for Response { - fn from(r: AnnounceResponse) -> Self { - Self::Announce(r) +impl From for Response { + fn from(r: AnnounceResponseIpv4) -> Self { + Self::AnnounceIpv4(r) + } +} + +impl From for Response { + fn from(r: AnnounceResponseIpv6) -> Self { + Self::AnnounceIpv6(r) } } @@ -81,42 +97,23 @@ impl Response { /// addresses. Clients seem not to support it very well, but due to a lack /// of alternative solutions, it is implemented here. #[inline] - pub fn write(self, bytes: &mut impl Write, ip_version: IpVersion) -> Result<(), io::Error> { + pub fn write(self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { Response::Connect(r) => { bytes.write_i32::(0)?; bytes.write_i32::(r.transaction_id.0)?; bytes.write_i64::(r.connection_id.0)?; } - Response::Announce(r) => { - if ip_version == IpVersion::IPv4 { - bytes.write_i32::(1)?; - bytes.write_i32::(r.transaction_id.0)?; - bytes.write_i32::(r.announce_interval.0)?; - bytes.write_i32::(r.leechers.0)?; - bytes.write_i32::(r.seeders.0)?; + Response::AnnounceIpv4(r) => { + bytes.write_i32::(1)?; + bytes.write_i32::(r.transaction_id.0)?; + bytes.write_i32::(r.announce_interval.0)?; + bytes.write_i32::(r.leechers.0)?; + bytes.write_i32::(r.seeders.0)?; - // Silently ignore peers with wrong IP version - for peer in r.peers { - if let IpAddr::V4(ip) = peer.ip_address { - bytes.write_all(&ip.octets())?; - bytes.write_u16::(peer.port.0)?; - } - } - } else { - bytes.write_i32::(4)?; - bytes.write_i32::(r.transaction_id.0)?; - bytes.write_i32::(r.announce_interval.0)?; - bytes.write_i32::(r.leechers.0)?; - bytes.write_i32::(r.seeders.0)?; - - // Silently ignore peers with wrong IP version - for peer in r.peers { - if let IpAddr::V6(ip) = peer.ip_address { - bytes.write_all(&ip.octets())?; - bytes.write_u16::(peer.port.0)?; - } - } + for peer in r.peers { + bytes.write_all(&peer.ip_address.octets())?; + bytes.write_u16::(peer.port.0)?; } } Response::Scrape(r) => { @@ -135,6 +132,18 @@ impl Response { bytes.write_all(r.message.as_bytes())?; } + Response::AnnounceIpv6(r) => { + bytes.write_i32::(4)?; + bytes.write_i32::(r.transaction_id.0)?; + bytes.write_i32::(r.announce_interval.0)?; + bytes.write_i32::(r.leechers.0)?; + bytes.write_i32::(r.seeders.0)?; + + for peer in r.peers { + bytes.write_all(&peer.ip_address.octets())?; + bytes.write_u16::(peer.port.0)?; + } + } } Ok(()) @@ -171,17 +180,17 @@ impl Response { .chunks_exact(6) .map(|chunk| { let ip_bytes: [u8; 4] = (&chunk[..4]).try_into().unwrap(); - let ip_address = IpAddr::V4(Ipv4Addr::from(ip_bytes)); + let ip_address = Ipv4Addr::from(ip_bytes); let port = (&chunk[4..]).read_u16::().unwrap(); - ResponsePeer { + ResponsePeerIpv4 { ip_address, port: Port(port), } }) .collect(); - Ok((AnnounceResponse { + Ok((AnnounceResponseIpv4 { transaction_id: TransactionId(transaction_id), announce_interval: AnnounceInterval(announce_interval), leechers: NumberOfPeers(leechers), @@ -244,17 +253,17 @@ impl Response { .chunks_exact(18) .map(|chunk| { let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); - let ip_address = IpAddr::V6(Ipv6Addr::from(ip_bytes)); + let ip_address = Ipv6Addr::from(ip_bytes); let port = (&chunk[16..]).read_u16::().unwrap(); - ResponsePeer { + ResponsePeerIpv6 { ip_address, port: Port(port), } }) .collect(); - Ok((AnnounceResponse { + Ok((AnnounceResponseIpv6 { transaction_id: TransactionId(transaction_id), announce_interval: AnnounceInterval(announce_interval), leechers: NumberOfPeers(leechers), @@ -297,10 +306,26 @@ mod tests { } } - impl quickcheck::Arbitrary for AnnounceResponse { + impl quickcheck::Arbitrary for AnnounceResponseIpv4 { fn arbitrary(g: &mut quickcheck::Gen) -> Self { let peers = (0..u8::arbitrary(g)) - .map(|_| ResponsePeer::arbitrary(g)) + .map(|_| ResponsePeerIpv4::arbitrary(g)) + .collect(); + + Self { + transaction_id: TransactionId(i32::arbitrary(g)), + announce_interval: AnnounceInterval(i32::arbitrary(g)), + leechers: NumberOfPeers(i32::arbitrary(g)), + seeders: NumberOfPeers(i32::arbitrary(g)), + peers, + } + } + } + + impl quickcheck::Arbitrary for AnnounceResponseIpv6 { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + let peers = (0..u8::arbitrary(g)) + .map(|_| ResponsePeerIpv6::arbitrary(g)) .collect(); Self { @@ -326,10 +351,10 @@ mod tests { } } - fn same_after_conversion(response: Response, ip_version: IpVersion) -> bool { + fn same_after_conversion(response: Response) -> bool { let mut buf = Vec::new(); - response.clone().write(&mut buf, ip_version).unwrap(); + response.clone().write(&mut buf).unwrap(); let r2 = Response::from_bytes(&buf[..]).unwrap(); let success = response == r2; @@ -343,24 +368,21 @@ mod tests { #[quickcheck] fn test_connect_response_convert_identity(response: ConnectResponse) -> bool { - same_after_conversion(response.into(), IpVersion::IPv4) + same_after_conversion(response.into()) } #[quickcheck] - fn test_announce_response_convert_identity(data: (AnnounceResponse, IpVersion)) -> bool { - let mut r = data.0; + fn test_announce_response_ipv4_convert_identity(response: AnnounceResponseIpv4) -> bool { + same_after_conversion(response.into()) + } - if data.1 == IpVersion::IPv4 { - r.peers.retain(|peer| peer.ip_address.is_ipv4()); - } else { - r.peers.retain(|peer| peer.ip_address.is_ipv6()); - } - - same_after_conversion(r.into(), data.1) + #[quickcheck] + fn test_announce_response_ipv6_convert_identity(response: AnnounceResponseIpv6) -> bool { + same_after_conversion(response.into()) } #[quickcheck] fn test_scrape_response_convert_identity(response: ScrapeResponse) -> bool { - same_after_conversion(response.into(), IpVersion::IPv4) + same_after_conversion(response.into()) } }