diff --git a/Cargo.lock b/Cargo.lock index 555cbb5..0ff3b29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,6 +193,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_cli_helpers", + "aquatic_common", "aquatic_udp", "aquatic_udp_protocol", "crossbeam-channel", diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 25488f0..7063b6f 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::IpAddr; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::time::{Duration, Instant}; use ahash::RandomState; @@ -104,3 +104,50 @@ pub fn convert_ipv4_mapped_ipv6(ip_address: IpAddr) -> IpAddr { ip_address } } + +/// SocketAddr that is not an IPv6-mapped IPv4 address +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub struct CanonicalSocketAddr(SocketAddr); + +impl CanonicalSocketAddr { + pub fn new(addr: SocketAddr) -> Self { + match addr { + addr @ SocketAddr::V4(_) => Self(addr), + SocketAddr::V6(addr) => { + match addr.ip().octets() { + // Convert IPv4-mapped address (available in std but nightly-only) + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => Self(SocketAddr::V4( + SocketAddrV4::new(Ipv4Addr::new(a, b, c, d), addr.port()), + )), + _ => Self(addr.into()), + } + } + } + } + + pub fn get_ipv6_mapped(self) -> SocketAddr { + match self.0 { + SocketAddr::V4(addr) => { + let ip = addr.ip().to_ipv6_mapped(); + + SocketAddr::V6(SocketAddrV6::new(ip, addr.port(), 0, 0)) + } + addr => addr, + } + } + + pub fn get(self) -> SocketAddr { + self.0 + } + + pub fn get_ipv4(self) -> Option { + match self.0 { + addr @ SocketAddr::V4(_) => Some(addr), + _ => None, + } + } + + pub fn is_ipv4(&self) -> bool { + self.0.is_ipv4() + } +} diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index a32981a..693f31d 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -1,9 +1,10 @@ use std::collections::BTreeMap; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use aquatic_common::CanonicalSocketAddr; use crossbeam_channel::{Sender, TrySendError}; use aquatic_common::access_list::AccessListArcSwap; @@ -68,13 +69,13 @@ impl RequestWorkerIndex { pub struct ConnectedRequestSender { index: SocketWorkerIndex, - senders: Vec>, + senders: Vec>, } impl ConnectedRequestSender { pub fn new( index: SocketWorkerIndex, - senders: Vec>, + senders: Vec>, ) -> Self { Self { index, senders } } @@ -83,7 +84,7 @@ impl ConnectedRequestSender { &self, index: RequestWorkerIndex, request: ConnectedRequest, - addr: SocketAddr, + addr: CanonicalSocketAddr, ) { match self.senders[index.0].try_send((self.index, request, addr)) { Ok(()) => {} @@ -98,11 +99,11 @@ impl ConnectedRequestSender { } pub struct ConnectedResponseSender { - senders: Vec>, + senders: Vec>, } impl ConnectedResponseSender { - pub fn new(senders: Vec>) -> Self { + pub fn new(senders: Vec>) -> Self { Self { senders } } @@ -110,7 +111,7 @@ impl ConnectedResponseSender { &self, index: SocketWorkerIndex, response: ConnectedResponse, - addr: SocketAddr, + addr: CanonicalSocketAddr, ) { match self.senders[index.0].try_send((response, addr)) { Ok(()) => {} diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 92f4319..6a3f40e 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -2,7 +2,6 @@ use std::collections::BTreeMap; use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::Ipv6Addr; -use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -11,6 +10,7 @@ use std::time::Instant; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::AHashIndexMap; +use aquatic_common::CanonicalSocketAddr; use aquatic_common::ValidUntil; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; @@ -176,7 +176,7 @@ impl Into for ProtocolAnnounceResponse { pub fn run_request_worker( config: Config, state: State, - request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, + request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, response_sender: ConnectedResponseSender, worker_index: RequestWorkerIndex, ) { @@ -254,10 +254,10 @@ fn handle_announce_request( rng: &mut SmallRng, torrents: &mut TorrentMaps, request: AnnounceRequest, - src: SocketAddr, + src: CanonicalSocketAddr, peer_valid_until: ValidUntil, ) -> ConnectedResponse { - match src.ip() { + match src.get().ip() { IpAddr::V4(ip) => handle_announce_request_inner( config, rng, @@ -355,14 +355,14 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { fn handle_scrape_request( torrents: &mut TorrentMaps, - src: SocketAddr, + src: CanonicalSocketAddr, request: PendingScrapeRequest, ) -> PendingScrapeResponse { const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); let mut torrent_stats: BTreeMap = BTreeMap::new(); - if src.ip().is_ipv4() { + if src.is_ipv4() { torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { let s = if let Some(torrent_data) = torrents.ipv4.get(&info_hash) { create_torrent_scrape_statistics( diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 130e11b..f06a32c 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -1,6 +1,5 @@ use std::collections::BTreeMap; use std::io::{Cursor, ErrorKind}; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -16,8 +15,8 @@ use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; -use aquatic_common::AHashIndexMap; use aquatic_common::ValidUntil; +use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; use aquatic_udp_protocol::*; use socket2::{Domain, Protocol, Socket, Type}; @@ -25,19 +24,19 @@ use crate::common::*; use crate::config::Config; #[derive(Default)] -pub struct ConnectionMap(AHashIndexMap<(ConnectionId, SocketAddr), ValidUntil>); +pub struct ConnectionMap(AHashIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>); impl ConnectionMap { pub fn insert( &mut self, connection_id: ConnectionId, - socket_addr: SocketAddr, + socket_addr: CanonicalSocketAddr, valid_until: ValidUntil, ) { self.0.insert((connection_id, socket_addr), valid_until); } - pub fn contains(&self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + pub fn contains(&self, connection_id: ConnectionId, socket_addr: CanonicalSocketAddr) -> bool { self.0.contains_key(&(connection_id, socket_addr)) } @@ -157,7 +156,7 @@ pub fn run_socket_worker( config: Config, token_num: usize, request_sender: ConnectedRequestSender, - response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, + response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, num_bound_sockets: Arc, ) { let mut rng = StdRng::from_entropy(); @@ -179,7 +178,7 @@ pub fn run_socket_worker( let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); let mut access_list_cache = create_access_list_cache(&state.access_list); - let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); + let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); @@ -267,7 +266,7 @@ fn read_requests( socket: &mut UdpSocket, buffer: &mut [u8], request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, SocketAddr)>, + local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, connection_valid_until: ValidUntil, pending_scrape_valid_until: ValidUntil, ) { @@ -282,21 +281,7 @@ fn read_requests( let res_request = Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents); - let src = match src { - src @ SocketAddr::V4(_) => src, - SocketAddr::V6(src) => { - match src.ip().octets() { - // Convert IPv4-mapped address (available in std but nightly-only) - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { - SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(a, b, c, d), - src.port(), - )) - } - _ => src.into(), - } - } - }; + let src = CanonicalSocketAddr::new(src); // Update statistics for converted address if src.is_ipv4() { @@ -362,11 +347,11 @@ pub fn handle_request( access_list_cache: &mut AccessListCache, rng: &mut StdRng, request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, SocketAddr)>, + local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, connection_valid_until: ValidUntil, pending_scrape_valid_until: ValidUntil, res_request: Result, - src: SocketAddr, + src: CanonicalSocketAddr, ) { let access_list_mode = config.access_list.mode; @@ -452,9 +437,9 @@ fn send_responses( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, pending_scrape_responses: &mut PendingScrapeResponseSlab, - local_responses: Drain<(Response, SocketAddr)>, + local_responses: Drain<(Response, CanonicalSocketAddr)>, ) { for (response, addr) in local_responses { send_response(state, config, socket, buffer, response, addr); @@ -479,27 +464,17 @@ fn send_response( socket: &mut UdpSocket, buffer: &mut [u8], response: Response, - addr: SocketAddr, + addr: CanonicalSocketAddr, ) { let mut cursor = Cursor::new(buffer); - let addr_is_ipv4 = addr.is_ipv4(); + let canonical_addr_is_ipv4 = addr.is_ipv4(); let addr = if config.network.address.is_ipv4() { - if let SocketAddr::V4(addr) = addr { - SocketAddr::V4(addr) - } else { - unreachable!() - } + addr.get_ipv4() + .expect("found peer ipv6 address while running bound to ipv4 address") } else { - match addr { - SocketAddr::V4(addr) => { - let ip = addr.ip().to_ipv6_mapped(); - - SocketAddr::V6(SocketAddrV6::new(ip, addr.port(), 0, 0)) - } - addr => addr, - } + addr.get_ipv6_mapped() }; match response.write(&mut cursor) { @@ -508,7 +483,7 @@ fn send_response( match socket.send_to(&cursor.get_ref()[..amt], addr) { Ok(amt) if config.statistics.active() => { - let stats = if addr_is_ipv4 { + let stats = if canonical_addr_is_ipv4 { &state.statistics_ipv4 } else { &state.statistics_ipv6 diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index 8b6d831..09208c3 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -12,6 +12,7 @@ name = "aquatic_udp_bench" [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" +aquatic_common = "0.1.0" aquatic_udp = "0.1.0" aquatic_udp_protocol = "0.1.0" crossbeam-channel = "0.5" diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 1277c4e..e048bf4 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -1,6 +1,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::time::{Duration, Instant}; +use aquatic_common::CanonicalSocketAddr; use crossbeam_channel::{Receiver, Sender}; use indicatif::ProgressIterator; use rand::Rng; @@ -14,8 +15,8 @@ use crate::config::BenchConfig; pub fn bench_announce_handler( bench_config: &BenchConfig, - request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, - response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, + request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { @@ -79,7 +80,7 @@ pub fn create_requests( rng: &mut impl Rng, info_hashes: &[InfoHash], number: usize, -) -> Vec<(AnnounceRequest, SocketAddr)> { +) -> Vec<(AnnounceRequest, CanonicalSocketAddr)> { let pareto = Pareto::new(1., PARETO_SHAPE).unwrap(); let max_index = info_hashes.len() - 1; @@ -106,7 +107,7 @@ pub fn create_requests( requests.push(( request, - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)), + CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1))), )); } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 1062ade..e233779 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -1,6 +1,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::time::{Duration, Instant}; +use aquatic_common::CanonicalSocketAddr; use crossbeam_channel::{Receiver, Sender}; use indicatif::ProgressIterator; use rand::Rng; @@ -14,8 +15,8 @@ use crate::config::BenchConfig; pub fn bench_scrape_handler( bench_config: &BenchConfig, - request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, - response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, + request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { @@ -91,7 +92,7 @@ pub fn create_requests( info_hashes: &[InfoHash], number: usize, hashes_per_request: usize, -) -> Vec<(ScrapeRequest, SocketAddr)> { +) -> Vec<(ScrapeRequest, CanonicalSocketAddr)> { let pareto = Pareto::new(1., PARETO_SHAPE).unwrap(); let max_index = info_hashes.len() - 1; @@ -114,7 +115,7 @@ pub fn create_requests( requests.push(( request, - SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)), + CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1))), )); }