Use CanonicalSocketAddr in ws and http; remove old option from common

This commit is contained in:
Joakim Frostegård 2022-02-03 19:29:21 +01:00
parent 380ca222de
commit 8889ab586c
9 changed files with 36 additions and 62 deletions

View file

@ -1,4 +1,4 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use ahash::RandomState; use ahash::RandomState;
@ -92,19 +92,6 @@ where
} }
} }
#[inline]
pub fn convert_ipv4_mapped_ipv6(ip_address: IpAddr) -> IpAddr {
if let IpAddr::V6(ip) = ip_address {
if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() {
ip.to_ipv4().expect("convert ipv4-mapped ip").into()
} else {
ip_address
}
} else {
ip_address
}
}
/// SocketAddr that is not an IPv6-mapped IPv4 address /// SocketAddr that is not an IPv6-mapped IPv4 address
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub struct CanonicalSocketAddr(SocketAddr); pub struct CanonicalSocketAddr(SocketAddr);

View file

@ -1,13 +1,13 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::AHashIndexMap; use aquatic_common::{AHashIndexMap, CanonicalSocketAddr};
use either::Either; use either::Either;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; pub use aquatic_common::ValidUntil;
use aquatic_http_protocol::common::*; use aquatic_http_protocol::common::*;
use aquatic_http_protocol::response::ResponsePeer; use aquatic_http_protocol::response::ResponsePeer;
@ -31,13 +31,13 @@ pub struct ConnectionId(pub usize);
pub enum ChannelRequest { pub enum ChannelRequest {
Announce { Announce {
request: AnnounceRequest, request: AnnounceRequest,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
response_consumer_id: ConsumerId, response_consumer_id: ConsumerId,
}, },
Scrape { Scrape {
request: ScrapeRequest, request: ScrapeRequest,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
response_consumer_id: ConsumerId, response_consumer_id: ConsumerId,
}, },
@ -47,12 +47,12 @@ pub enum ChannelRequest {
pub enum ChannelResponse { pub enum ChannelResponse {
Announce { Announce {
response: AnnounceResponse, response: AnnounceResponse,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
}, },
Scrape { Scrape {
response: ScrapeResponse, response: ScrapeResponse,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
}, },
} }
@ -64,7 +64,7 @@ impl ChannelResponse {
Self::Scrape { connection_id, .. } => *connection_id, Self::Scrape { connection_id, .. } => *connection_id,
} }
} }
pub fn get_peer_addr(&self) -> SocketAddr { pub fn get_peer_addr(&self) -> CanonicalSocketAddr {
match self { match self {
Self::Announce { peer_addr, .. } => *peer_addr, Self::Announce { peer_addr, .. } => *peer_addr,
Self::Scrape { peer_addr, .. } => *peer_addr, Self::Scrape { peer_addr, .. } => *peer_addr,
@ -82,7 +82,7 @@ pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for /// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker. /// sending back response through correct channel to correct worker.
pub response_consumer_id: ConsumerId, pub response_consumer_id: ConsumerId,
pub peer_addr: SocketAddr, pub peer_addr: CanonicalSocketAddr,
/// Connection id local to socket worker /// Connection id local to socket worker
pub connection_id: ConnectionId, pub connection_id: ConnectionId,
} }

View file

@ -159,11 +159,7 @@ pub fn handle_announce_request(
meta: ConnectionMeta, meta: ConnectionMeta,
request: AnnounceRequest, request: AnnounceRequest,
) -> AnnounceResponse { ) -> AnnounceResponse {
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); match meta.peer_addr.get().ip() {
::log::debug!("peer ip: {:?}", peer_ip);
match peer_ip {
IpAddr::V4(peer_ip_address) => { IpAddr::V4(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv4Addr> = let torrent_data: &mut TorrentData<Ipv4Addr> =
torrent_maps.ipv4.entry(request.info_hash).or_default(); torrent_maps.ipv4.entry(request.info_hash).or_default();
@ -323,7 +319,7 @@ pub fn handle_scrape_request(
files: BTreeMap::new(), files: BTreeMap::new(),
}; };
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); let peer_ip = meta.peer_addr.get().ip();
// If request.info_hashes is empty, don't return scrape for all // If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive. // torrents, even though reference server does it. It is too expensive.

View file

@ -1,12 +1,12 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{ use aquatic_http_protocol::response::{
@ -170,7 +170,7 @@ struct Connection {
response_receiver: LocalReceiver<ChannelResponse>, response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId, response_consumer_id: ConsumerId,
stream: TlsStream<TcpStream>, stream: TlsStream<TcpStream>,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
request_buffer: [u8; REQUEST_BUFFER_SIZE], request_buffer: [u8; REQUEST_BUFFER_SIZE],
request_buffer_position: usize, request_buffer_position: usize,
@ -191,6 +191,7 @@ impl Connection {
let peer_addr = stream let peer_addr = stream
.peer_addr() .peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into(); let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?; let stream = tls_acceptor.accept(stream).await?;

View file

@ -16,7 +16,7 @@ pub fn handle_announce_request(
request_sender_meta: ConnectionMeta, request_sender_meta: ConnectionMeta,
request: AnnounceRequest, request: AnnounceRequest,
) { ) {
let torrent_data: &mut TorrentData = if request_sender_meta.converted_peer_ip.is_ipv4() { let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() {
torrent_maps.ipv4.entry(request.info_hash).or_default() torrent_maps.ipv4.entry(request.info_hash).or_default()
} else { } else {
torrent_maps.ipv6.entry(request.info_hash).or_default() torrent_maps.ipv6.entry(request.info_hash).or_default()
@ -25,11 +25,9 @@ pub fn handle_announce_request(
// If there is already a peer with this peer_id, check that socket // If there is already a peer with this peer_id, check that socket
// addr is same as that of request sender. Otherwise, ignore request. // addr is same as that of request sender. Otherwise, ignore request.
// Since peers have access to each others peer_id's, they could send // Since peers have access to each others peer_id's, they could send
// requests using them, causing all sorts of issues. Checking naive // requests using them, causing all sorts of issues.
// (non-converted) socket addresses is enough, since state is split
// on converted peer ip.
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
if request_sender_meta.naive_peer_addr != previous_peer.connection_meta.naive_peer_addr { if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr {
return; return;
} }
} }
@ -167,7 +165,7 @@ pub fn handle_scrape_request(
files: HashMap::with_capacity(num_to_take), files: HashMap::with_capacity(num_to_take),
}; };
let torrent_map: &mut TorrentMap = if meta.converted_peer_ip.is_ipv4() { let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() {
&mut torrent_maps.ipv4 &mut torrent_maps.ipv4
} else { } else {
&mut torrent_maps.ipv6 &mut torrent_maps.ipv6

View file

@ -2,12 +2,11 @@ pub mod handlers;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::AHashIndexMap; use aquatic_common::{AHashIndexMap, CanonicalSocketAddr};
pub use aquatic_common::ValidUntil; pub use aquatic_common::ValidUntil;
@ -30,10 +29,7 @@ pub struct ConnectionMeta {
/// sending back response through correct channel to correct worker. /// sending back response through correct channel to correct worker.
pub out_message_consumer_id: ConsumerId, pub out_message_consumer_id: ConsumerId,
pub connection_id: ConnectionId, pub connection_id: ConnectionId,
/// Peer address as received from socket, meaning it wasn't converted to pub peer_addr: CanonicalSocketAddr,
/// an IPv4 address if it was a IPv4-mapped IPv6 address
pub naive_peer_addr: SocketAddr,
pub converted_peer_ip: IpAddr,
pub pending_scrape_id: Option<PendingScrapeId>, pub pending_scrape_id: Option<PendingScrapeId>,
} }

View file

@ -1,14 +1,13 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::CanonicalSocketAddr;
use aquatic_ws_protocol::*; use aquatic_ws_protocol::*;
use async_tungstenite::WebSocketStream; use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
@ -221,6 +220,7 @@ async fn run_connection(
let peer_addr = stream let peer_addr = stream
.peer_addr() .peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into(); let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?; let stream = tls_acceptor.accept(stream).await?;
@ -293,7 +293,7 @@ struct ConnectionReader {
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>, pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
out_message_consumer_id: ConsumerId, out_message_consumer_id: ConsumerId,
ws_in: SplitStream<WebSocketStream<TlsStream<TcpStream>>>, ws_in: SplitStream<WebSocketStream<TlsStream<TcpStream>>>,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
} }
@ -432,8 +432,7 @@ impl ConnectionReader {
ConnectionMeta { ConnectionMeta {
connection_id: self.connection_id, connection_id: self.connection_id,
out_message_consumer_id: self.out_message_consumer_id, out_message_consumer_id: self.out_message_consumer_id,
naive_peer_addr: self.peer_addr, peer_addr: self.peer_addr,
converted_peer_ip: convert_ipv4_mapped_ipv6(self.peer_addr.ip()),
pending_scrape_id, pending_scrape_id,
} }
} }
@ -445,7 +444,7 @@ struct ConnectionWriter {
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>, connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
ws_out: SplitSink<WebSocketStream<TlsStream<TcpStream>>, tungstenite::Message>, ws_out: SplitSink<WebSocketStream<TlsStream<TcpStream>>, tungstenite::Message>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>, pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
peer_addr: SocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
} }
@ -456,7 +455,7 @@ impl ConnectionWriter {
anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed") anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed")
})?; })?;
if meta.naive_peer_addr != self.peer_addr { if meta.peer_addr != self.peer_addr {
return Err(anyhow::anyhow!("peer addresses didn't match")); return Err(anyhow::anyhow!("peer addresses didn't match"));
} }
@ -530,7 +529,7 @@ impl ConnectionWriter {
Err(err) => { Err(err) => {
::log::info!( ::log::info!(
"send_out_message: send to {} took to long: {}", "send_out_message: send to {} took to long: {}",
self.peer_addr, self.peer_addr.get(),
err err
); );

View file

@ -153,7 +153,7 @@ impl Connection<NotRegistered> {
} }
pub fn close(self) { pub fn close(self) {
::log::debug!("will close connection to {}", self.meta.naive_peer_addr); ::log::debug!("will close connection to {}", self.meta.peer_addr.get());
match self.state { match self.state {
ConnectionState::TlsHandshaking(inner) => inner.close(), ConnectionState::TlsHandshaking(inner) => inner.close(),

View file

@ -4,13 +4,13 @@ use std::time::{Duration, Instant};
use anyhow::Context; use anyhow::Context;
use aquatic_common::access_list::AccessListQuery; use aquatic_common::access_list::AccessListQuery;
use aquatic_common::CanonicalSocketAddr;
use hashbrown::HashMap; use hashbrown::HashMap;
use mio::net::TcpListener; use mio::net::TcpListener;
use mio::{Events, Interest, Poll, Token}; use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
use tungstenite::protocol::WebSocketConfig; use tungstenite::protocol::WebSocketConfig;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_ws_protocol::*; use aquatic_ws_protocol::*;
use crate::common::*; use crate::common::*;
@ -263,20 +263,17 @@ fn accept_new_streams(
loop { loop {
match listener.accept() { match listener.accept() {
Ok((stream, _)) => { Ok((stream, _)) => {
let naive_peer_addr = if let Ok(peer_addr) = stream.peer_addr() { let peer_addr = if let Ok(peer_addr) = stream.peer_addr() {
peer_addr CanonicalSocketAddr::new(peer_addr)
} else { } else {
continue; continue;
}; };
connections.insert_and_register_new(poll, move |token| { connections.insert_and_register_new(poll, move |token| {
let converted_peer_ip = convert_ipv4_mapped_ipv6(naive_peer_addr.ip());
let meta = ConnectionMeta { let meta = ConnectionMeta {
out_message_consumer_id: ConsumerId(socket_worker_index), out_message_consumer_id: ConsumerId(socket_worker_index),
connection_id: ConnectionId(token.0), connection_id: ConnectionId(token.0),
naive_peer_addr, peer_addr,
converted_peer_ip,
pending_scrape_id: None, // FIXME pending_scrape_id: None, // FIXME
}; };
@ -348,11 +345,11 @@ where
let mut remove_connection = false; let mut remove_connection = false;
if let Some(connection) = connections.get_mut(&token) { if let Some(connection) = connections.get_mut(&token) {
if connection.get_meta().naive_peer_addr != meta.naive_peer_addr { if connection.get_meta().peer_addr != meta.peer_addr {
::log::warn!( ::log::warn!(
"socket worker error: connection socket addr {} didn't match channel {}. Token: {}.", "socket worker error: connection socket addr {} didn't match channel {}. Token: {}.",
connection.get_meta().naive_peer_addr, connection.get_meta().peer_addr.get(),
meta.naive_peer_addr, meta.peer_addr.get(),
token.0 token.0
); );