From 8889ab586c9a660b3d4dcd31c975f525ce2bb384 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 3 Feb 2022 19:29:21 +0100 Subject: [PATCH] Use CanonicalSocketAddr in ws and http; remove old option from common --- aquatic_common/src/lib.rs | 15 +-------------- aquatic_http/src/common.rs | 18 +++++++++--------- aquatic_http/src/workers/request.rs | 8 ++------ aquatic_http/src/workers/socket.rs | 5 +++-- aquatic_ws/src/common/handlers.rs | 10 ++++------ aquatic_ws/src/common/mod.rs | 8 ++------ aquatic_ws/src/glommio/socket.rs | 15 +++++++-------- aquatic_ws/src/mio/socket/connection.rs | 2 +- aquatic_ws/src/mio/socket/mod.rs | 17 +++++++---------- 9 files changed, 36 insertions(+), 62 deletions(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 7063b6f..fdd242f 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::time::{Duration, Instant}; use ahash::RandomState; @@ -92,19 +92,6 @@ where } } -#[inline] -pub fn convert_ipv4_mapped_ipv6(ip_address: IpAddr) -> IpAddr { - if let IpAddr::V6(ip) = ip_address { - if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { - ip.to_ipv4().expect("convert ipv4-mapped ip").into() - } else { - ip_address - } - } else { - ip_address - } -} - /// SocketAddr that is not an IPv6-mapped IPv4 address #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub struct CanonicalSocketAddr(SocketAddr); diff --git a/aquatic_http/src/common.rs b/aquatic_http/src/common.rs index 5aea0dd..c925ad5 100644 --- a/aquatic_http/src/common.rs +++ b/aquatic_http/src/common.rs @@ -1,13 +1,13 @@ -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; use std::time::Instant; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::AHashIndexMap; +use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; use either::Either; use smartstring::{LazyCompact, SmartString}; -pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; +pub use aquatic_common::ValidUntil; use aquatic_http_protocol::common::*; use aquatic_http_protocol::response::ResponsePeer; @@ -31,13 +31,13 @@ pub struct ConnectionId(pub usize); pub enum ChannelRequest { Announce { request: AnnounceRequest, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, response_consumer_id: ConsumerId, }, Scrape { request: ScrapeRequest, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, response_consumer_id: ConsumerId, }, @@ -47,12 +47,12 @@ pub enum ChannelRequest { pub enum ChannelResponse { Announce { response: AnnounceResponse, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, }, Scrape { response: ScrapeResponse, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, }, } @@ -64,7 +64,7 @@ impl ChannelResponse { Self::Scrape { connection_id, .. } => *connection_id, } } - pub fn get_peer_addr(&self) -> SocketAddr { + pub fn get_peer_addr(&self) -> CanonicalSocketAddr { match self { Self::Announce { peer_addr, .. } => *peer_addr, Self::Scrape { peer_addr, .. } => *peer_addr, @@ -82,7 +82,7 @@ pub struct ConnectionMeta { /// Index of socket worker responsible for this connection. Required for /// sending back response through correct channel to correct worker. pub response_consumer_id: ConsumerId, - pub peer_addr: SocketAddr, + pub peer_addr: CanonicalSocketAddr, /// Connection id local to socket worker pub connection_id: ConnectionId, } diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/request.rs index bdd9239..2adc12f 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/request.rs @@ -159,11 +159,7 @@ pub fn handle_announce_request( meta: ConnectionMeta, request: AnnounceRequest, ) -> AnnounceResponse { - let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); - - ::log::debug!("peer ip: {:?}", peer_ip); - - match peer_ip { + match meta.peer_addr.get().ip() { IpAddr::V4(peer_ip_address) => { let torrent_data: &mut TorrentData = torrent_maps.ipv4.entry(request.info_hash).or_default(); @@ -323,7 +319,7 @@ pub fn handle_scrape_request( files: BTreeMap::new(), }; - let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); + let peer_ip = meta.peer_addr.get().ip(); // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 1f1dea4..493efd9 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -1,12 +1,12 @@ use std::cell::RefCell; use std::collections::BTreeMap; -use std::net::SocketAddr; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::CanonicalSocketAddr; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ @@ -170,7 +170,7 @@ struct Connection { response_receiver: LocalReceiver, response_consumer_id: ConsumerId, stream: TlsStream, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, request_buffer: [u8; REQUEST_BUFFER_SIZE], request_buffer_position: usize, @@ -191,6 +191,7 @@ impl Connection { let peer_addr = stream .peer_addr() .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; + let peer_addr = CanonicalSocketAddr::new(peer_addr); let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; diff --git a/aquatic_ws/src/common/handlers.rs b/aquatic_ws/src/common/handlers.rs index 94dc0e5..1d6ae0c 100644 --- a/aquatic_ws/src/common/handlers.rs +++ b/aquatic_ws/src/common/handlers.rs @@ -16,7 +16,7 @@ pub fn handle_announce_request( request_sender_meta: ConnectionMeta, request: AnnounceRequest, ) { - let torrent_data: &mut TorrentData = if request_sender_meta.converted_peer_ip.is_ipv4() { + let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() { torrent_maps.ipv4.entry(request.info_hash).or_default() } else { torrent_maps.ipv6.entry(request.info_hash).or_default() @@ -25,11 +25,9 @@ pub fn handle_announce_request( // If there is already a peer with this peer_id, check that socket // addr is same as that of request sender. Otherwise, ignore request. // Since peers have access to each others peer_id's, they could send - // requests using them, causing all sorts of issues. Checking naive - // (non-converted) socket addresses is enough, since state is split - // on converted peer ip. + // requests using them, causing all sorts of issues. if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { - if request_sender_meta.naive_peer_addr != previous_peer.connection_meta.naive_peer_addr { + if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr { return; } } @@ -167,7 +165,7 @@ pub fn handle_scrape_request( files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if meta.converted_peer_ip.is_ipv4() { + let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() { &mut torrent_maps.ipv4 } else { &mut torrent_maps.ipv6 diff --git a/aquatic_ws/src/common/mod.rs b/aquatic_ws/src/common/mod.rs index 0d58fee..0f43d2b 100644 --- a/aquatic_ws/src/common/mod.rs +++ b/aquatic_ws/src/common/mod.rs @@ -2,12 +2,11 @@ pub mod handlers; use std::fs::File; use std::io::BufReader; -use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Instant; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::AHashIndexMap; +use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; pub use aquatic_common::ValidUntil; @@ -30,10 +29,7 @@ pub struct ConnectionMeta { /// sending back response through correct channel to correct worker. pub out_message_consumer_id: ConsumerId, pub connection_id: ConnectionId, - /// Peer address as received from socket, meaning it wasn't converted to - /// an IPv4 address if it was a IPv4-mapped IPv6 address - pub naive_peer_addr: SocketAddr, - pub converted_peer_ip: IpAddr, + pub peer_addr: CanonicalSocketAddr, pub pending_scrape_id: Option, } diff --git a/aquatic_ws/src/glommio/socket.rs b/aquatic_ws/src/glommio/socket.rs index 5d47091..b9dc6fe 100644 --- a/aquatic_ws/src/glommio/socket.rs +++ b/aquatic_ws/src/glommio/socket.rs @@ -1,14 +1,13 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; -use std::net::SocketAddr; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_common::CanonicalSocketAddr; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -221,6 +220,7 @@ async fn run_connection( let peer_addr = stream .peer_addr() .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; + let peer_addr = CanonicalSocketAddr::new(peer_addr); let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; @@ -293,7 +293,7 @@ struct ConnectionReader { pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>>, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } @@ -432,8 +432,7 @@ impl ConnectionReader { ConnectionMeta { connection_id: self.connection_id, out_message_consumer_id: self.out_message_consumer_id, - naive_peer_addr: self.peer_addr, - converted_peer_ip: convert_ipv4_mapped_ipv6(self.peer_addr.ip()), + peer_addr: self.peer_addr, pending_scrape_id, } } @@ -445,7 +444,7 @@ struct ConnectionWriter { connection_slab: Rc>>, ws_out: SplitSink>, tungstenite::Message>, pending_scrape_slab: Rc>>, - peer_addr: SocketAddr, + peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } @@ -456,7 +455,7 @@ impl ConnectionWriter { anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed") })?; - if meta.naive_peer_addr != self.peer_addr { + if meta.peer_addr != self.peer_addr { return Err(anyhow::anyhow!("peer addresses didn't match")); } @@ -530,7 +529,7 @@ impl ConnectionWriter { Err(err) => { ::log::info!( "send_out_message: send to {} took to long: {}", - self.peer_addr, + self.peer_addr.get(), err ); diff --git a/aquatic_ws/src/mio/socket/connection.rs b/aquatic_ws/src/mio/socket/connection.rs index f5559d0..50f5ca2 100644 --- a/aquatic_ws/src/mio/socket/connection.rs +++ b/aquatic_ws/src/mio/socket/connection.rs @@ -153,7 +153,7 @@ impl Connection { } pub fn close(self) { - ::log::debug!("will close connection to {}", self.meta.naive_peer_addr); + ::log::debug!("will close connection to {}", self.meta.peer_addr.get()); match self.state { ConnectionState::TlsHandshaking(inner) => inner.close(), diff --git a/aquatic_ws/src/mio/socket/mod.rs b/aquatic_ws/src/mio/socket/mod.rs index 237c226..181c8f6 100644 --- a/aquatic_ws/src/mio/socket/mod.rs +++ b/aquatic_ws/src/mio/socket/mod.rs @@ -4,13 +4,13 @@ use std::time::{Duration, Instant}; use anyhow::Context; use aquatic_common::access_list::AccessListQuery; +use aquatic_common::CanonicalSocketAddr; use hashbrown::HashMap; use mio::net::TcpListener; use mio::{Events, Interest, Poll, Token}; use socket2::{Domain, Protocol, Socket, Type}; use tungstenite::protocol::WebSocketConfig; -use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_ws_protocol::*; use crate::common::*; @@ -263,20 +263,17 @@ fn accept_new_streams( loop { match listener.accept() { Ok((stream, _)) => { - let naive_peer_addr = if let Ok(peer_addr) = stream.peer_addr() { - peer_addr + let peer_addr = if let Ok(peer_addr) = stream.peer_addr() { + CanonicalSocketAddr::new(peer_addr) } else { continue; }; connections.insert_and_register_new(poll, move |token| { - let converted_peer_ip = convert_ipv4_mapped_ipv6(naive_peer_addr.ip()); - let meta = ConnectionMeta { out_message_consumer_id: ConsumerId(socket_worker_index), connection_id: ConnectionId(token.0), - naive_peer_addr, - converted_peer_ip, + peer_addr, pending_scrape_id: None, // FIXME }; @@ -348,11 +345,11 @@ where let mut remove_connection = false; if let Some(connection) = connections.get_mut(&token) { - if connection.get_meta().naive_peer_addr != meta.naive_peer_addr { + if connection.get_meta().peer_addr != meta.peer_addr { ::log::warn!( "socket worker error: connection socket addr {} didn't match channel {}. Token: {}.", - connection.get_meta().naive_peer_addr, - meta.naive_peer_addr, + connection.get_meta().peer_addr.get(), + meta.peer_addr.get(), token.0 );