mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws: avoid X-Forwarded-For parsing since we only need to know IPv4/IPv6
This commit is contained in:
parent
3b94b8e588
commit
a62b2033a5
6 changed files with 65 additions and 122 deletions
|
|
@ -17,7 +17,7 @@ of sub-implementations for different protocols:
|
||||||
|--------------|--------------------------------------------|------------------------------|
|
|--------------|--------------------------------------------|------------------------------|
|
||||||
| aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) |
|
| aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) |
|
||||||
| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
|
| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
|
||||||
| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
|
| aquatic_ws | [WebTorrent] over TLS ([rustls], optional) | Linux 5.8+ (using [glommio]) |
|
||||||
|
|
||||||
Features at a glance:
|
Features at a glance:
|
||||||
|
|
||||||
|
|
|
||||||
6
TODO.md
6
TODO.md
|
|
@ -2,12 +2,6 @@
|
||||||
|
|
||||||
## High priority
|
## High priority
|
||||||
|
|
||||||
* ws
|
|
||||||
* reverse proxy support for non-TLS connections (parse x-forwarded-for)
|
|
||||||
* how does this interact with IPv4/IPv6 differentiation?
|
|
||||||
* is it possible to skip determining peer IP's altogether or is it necessary
|
|
||||||
for IPv4/IPv6 separation / preventing peer mixups or abuse?
|
|
||||||
|
|
||||||
## Medium priority
|
## Medium priority
|
||||||
|
|
||||||
* quit whole program if any thread panics
|
* quit whole program if any thread panics
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,28 @@
|
||||||
use std::sync::Arc;
|
use std::{net::IpAddr, sync::Arc};
|
||||||
|
|
||||||
use aquatic_common::access_list::AccessListArcSwap;
|
use aquatic_common::access_list::AccessListArcSwap;
|
||||||
use aquatic_common::CanonicalSocketAddr;
|
|
||||||
|
|
||||||
pub use aquatic_common::ValidUntil;
|
pub use aquatic_common::ValidUntil;
|
||||||
use aquatic_ws_protocol::{InfoHash, PeerId};
|
use aquatic_ws_protocol::{InfoHash, PeerId};
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
pub enum IpVersion {
|
||||||
|
V4,
|
||||||
|
V6,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IpVersion {
|
||||||
|
pub fn canonical_from_ip(ip: IpAddr) -> IpVersion {
|
||||||
|
match ip {
|
||||||
|
IpAddr::V4(_) => Self::V4,
|
||||||
|
IpAddr::V6(addr) => match addr.octets() {
|
||||||
|
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, _, _, _, _] => Self::V4,
|
||||||
|
_ => Self::V6,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub struct State {
|
pub struct State {
|
||||||
pub access_list: Arc<AccessListArcSwap>,
|
pub access_list: Arc<AccessListArcSwap>,
|
||||||
|
|
@ -17,7 +34,7 @@ pub struct PendingScrapeId(pub usize);
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub struct ConsumerId(pub usize);
|
pub struct ConsumerId(pub usize);
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||||
pub struct ConnectionId(pub usize);
|
pub struct ConnectionId(pub usize);
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
|
@ -26,7 +43,7 @@ pub struct ConnectionMeta {
|
||||||
/// sending back response through correct channel to correct worker.
|
/// sending back response through correct channel to correct worker.
|
||||||
pub out_message_consumer_id: ConsumerId,
|
pub out_message_consumer_id: ConsumerId,
|
||||||
pub connection_id: ConnectionId,
|
pub connection_id: ConnectionId,
|
||||||
pub peer_addr: CanonicalSocketAddr,
|
pub ip_version: IpVersion,
|
||||||
pub pending_scrape_id: Option<PendingScrapeId>,
|
pub pending_scrape_id: Option<PendingScrapeId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -35,6 +52,6 @@ pub enum SwarmControlMessage {
|
||||||
ConnectionClosed {
|
ConnectionClosed {
|
||||||
info_hash: InfoHash,
|
info_hash: InfoHash,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
peer_addr: CanonicalSocketAddr,
|
ip_version: IpVersion,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,9 @@ use aquatic_common::cli::LogLevel;
|
||||||
use aquatic_toml_config::TomlConfig;
|
use aquatic_toml_config::TomlConfig;
|
||||||
|
|
||||||
/// aquatic_ws configuration
|
/// aquatic_ws configuration
|
||||||
|
///
|
||||||
|
/// Running behind a reverse proxy is supported, but IPv4 peer requests have
|
||||||
|
/// to be proxied to IPv4 requests, and IPv6 requests to IPv6 requests.
|
||||||
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
|
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
|
||||||
#[serde(default, deny_unknown_fields)]
|
#[serde(default, deny_unknown_fields)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
|
|
@ -70,13 +73,8 @@ pub struct NetworkConfig {
|
||||||
pub websocket_max_message_size: usize,
|
pub websocket_max_message_size: usize,
|
||||||
pub websocket_max_frame_size: usize,
|
pub websocket_max_frame_size: usize,
|
||||||
|
|
||||||
/// Trust X-Forwarded-For headers to get peer IP. Only use this if you are
|
/// Return a HTTP 200 Ok response when receiving GET /health. Can not be
|
||||||
/// running aquatic_ws behind a reverse proxy that sets them and your
|
/// combined with enable_tls.
|
||||||
/// instance is not accessible by other means.
|
|
||||||
pub trust_x_forwarded_for: bool,
|
|
||||||
|
|
||||||
/// Return a HTTP 200 Ok response when receiving GET /health, but only
|
|
||||||
/// when not running over TLS
|
|
||||||
pub enable_http_health_checks: bool,
|
pub enable_http_health_checks: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -94,8 +92,6 @@ impl Default for NetworkConfig {
|
||||||
websocket_max_message_size: 64 * 1024,
|
websocket_max_message_size: 64 * 1024,
|
||||||
websocket_max_frame_size: 16 * 1024,
|
websocket_max_frame_size: 16 * 1024,
|
||||||
|
|
||||||
trust_x_forwarded_for: false,
|
|
||||||
|
|
||||||
enable_http_health_checks: false,
|
enable_http_health_checks: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::net::{IpAddr, SocketAddr};
|
|
||||||
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
|
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
@ -11,7 +10,7 @@ use anyhow::Context;
|
||||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
||||||
use aquatic_common::privileges::PrivilegeDropper;
|
use aquatic_common::privileges::PrivilegeDropper;
|
||||||
use aquatic_common::rustls_config::RustlsConfig;
|
use aquatic_common::rustls_config::RustlsConfig;
|
||||||
use aquatic_common::{CanonicalSocketAddr, PanicSentinel};
|
use aquatic_common::PanicSentinel;
|
||||||
use aquatic_ws_protocol::*;
|
use aquatic_ws_protocol::*;
|
||||||
use async_tungstenite::WebSocketStream;
|
use async_tungstenite::WebSocketStream;
|
||||||
use futures::stream::{SplitSink, SplitStream};
|
use futures::stream::{SplitSink, SplitStream};
|
||||||
|
|
@ -48,8 +47,7 @@ struct ConnectionReference {
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
peer_id: Option<PeerId>,
|
peer_id: Option<PeerId>,
|
||||||
announced_info_hashes: HashSet<InfoHash>,
|
announced_info_hashes: HashSet<InfoHash>,
|
||||||
/// May need to be parsed from X-Forwarded-For headers
|
ip_version: IpVersion,
|
||||||
peer_addr: Option<CanonicalSocketAddr>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_socket_worker(
|
pub async fn run_socket_worker(
|
||||||
|
|
@ -116,6 +114,15 @@ pub async fn run_socket_worker(
|
||||||
while let Some(stream) = incoming.next().await {
|
while let Some(stream) = incoming.next().await {
|
||||||
match stream {
|
match stream {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
|
let ip_version = match stream.peer_addr() {
|
||||||
|
Ok(addr) => IpVersion::canonical_from_ip(addr.ip()),
|
||||||
|
Err(err) => {
|
||||||
|
::log::info!("could not extract ip version (v4 or v6): {:#}", err);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE);
|
let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE);
|
||||||
let out_message_sender = Rc::new(out_message_sender);
|
let out_message_sender = Rc::new(out_message_sender);
|
||||||
|
|
||||||
|
|
@ -125,7 +132,7 @@ pub async fn run_socket_worker(
|
||||||
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
|
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
|
||||||
peer_id: None,
|
peer_id: None,
|
||||||
announced_info_hashes: Default::default(),
|
announced_info_hashes: Default::default(),
|
||||||
peer_addr: None,
|
ip_version,
|
||||||
});
|
});
|
||||||
|
|
||||||
::log::info!("accepting stream, assigning id {}", key);
|
::log::info!("accepting stream, assigning id {}", key);
|
||||||
|
|
@ -143,6 +150,7 @@ pub async fn run_socket_worker(
|
||||||
out_message_consumer_id,
|
out_message_consumer_id,
|
||||||
ConnectionId(key),
|
ConnectionId(key),
|
||||||
opt_tls_config,
|
opt_tls_config,
|
||||||
|
ip_version,
|
||||||
stream,
|
stream,
|
||||||
).await {
|
).await {
|
||||||
::log::debug!("connection error: {:#}", err);
|
::log::debug!("connection error: {:#}", err);
|
||||||
|
|
@ -156,12 +164,12 @@ pub async fn run_socket_worker(
|
||||||
|
|
||||||
// Tell swarm workers to remove peer
|
// Tell swarm workers to remove peer
|
||||||
if let Some(reference) = opt_reference {
|
if let Some(reference) = opt_reference {
|
||||||
if let (Some(peer_id), Some(peer_addr)) = (reference.peer_id, reference.peer_addr) {
|
if let Some(peer_id) = reference.peer_id {
|
||||||
for info_hash in reference.announced_info_hashes {
|
for info_hash in reference.announced_info_hashes {
|
||||||
let message = SwarmControlMessage::ConnectionClosed {
|
let message = SwarmControlMessage::ConnectionClosed {
|
||||||
info_hash,
|
info_hash,
|
||||||
peer_id,
|
peer_id,
|
||||||
peer_addr,
|
ip_version: reference.ip_version,
|
||||||
};
|
};
|
||||||
|
|
||||||
let consumer_index =
|
let consumer_index =
|
||||||
|
|
@ -259,29 +267,9 @@ async fn run_connection(
|
||||||
out_message_consumer_id: ConsumerId,
|
out_message_consumer_id: ConsumerId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
opt_tls_config: Option<Arc<RustlsConfig>>,
|
opt_tls_config: Option<Arc<RustlsConfig>>,
|
||||||
|
ip_version: IpVersion,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let remote_addr = stream
|
|
||||||
.peer_addr()
|
|
||||||
.map_err(|err| anyhow::anyhow!("could not extract peer address: {:#}", err))?;
|
|
||||||
|
|
||||||
let peer_addr = if config.network.trust_x_forwarded_for {
|
|
||||||
let ip = parse_x_forwarded_for_ip(&stream).await?;
|
|
||||||
// Using the reverse proxy connection port here should be fine, since
|
|
||||||
// we only use the CanonicalPeerAddr to differentiate connections from
|
|
||||||
// each other as well as to determine if they run on IPv4 or IPv6,
|
|
||||||
// not for sending responses or passing on to peers.
|
|
||||||
let port = remote_addr.port();
|
|
||||||
|
|
||||||
CanonicalSocketAddr::new(SocketAddr::new(ip, port))
|
|
||||||
} else {
|
|
||||||
CanonicalSocketAddr::new(remote_addr)
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(connection_reference) = connection_slab.borrow_mut().get_mut(connection_id.0) {
|
|
||||||
connection_reference.peer_addr = Some(peer_addr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(tls_config) = opt_tls_config {
|
if let Some(tls_config) = opt_tls_config {
|
||||||
let tls_acceptor: TlsAcceptor = tls_config.into();
|
let tls_acceptor: TlsAcceptor = tls_config.into();
|
||||||
|
|
||||||
|
|
@ -299,10 +287,14 @@ async fn run_connection(
|
||||||
out_message_consumer_id,
|
out_message_consumer_id,
|
||||||
connection_id,
|
connection_id,
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
ip_version,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
|
// Implementing this over TLS is too cumbersome, since the crate used
|
||||||
|
// for TLS streams doesn't support peak and tungstenite doesn't
|
||||||
|
// properly support sending a HTTP error response in accept_hdr
|
||||||
|
// callback.
|
||||||
if config.network.enable_http_health_checks {
|
if config.network.enable_http_health_checks {
|
||||||
let mut peek_buf = [0u8; 11];
|
let mut peek_buf = [0u8; 11];
|
||||||
|
|
||||||
|
|
@ -340,55 +332,12 @@ async fn run_connection(
|
||||||
out_message_consumer_id,
|
out_message_consumer_id,
|
||||||
connection_id,
|
connection_id,
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
ip_version,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn parse_x_forwarded_for_ip(stream: &TcpStream) -> anyhow::Result<IpAddr> {
|
|
||||||
let mut peek_buf = [0u8; 1024];
|
|
||||||
|
|
||||||
let mut position = 0usize;
|
|
||||||
|
|
||||||
for _ in 0..16 {
|
|
||||||
let bytes_read = stream
|
|
||||||
.peek(&mut peek_buf[position..])
|
|
||||||
.await
|
|
||||||
.map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?;
|
|
||||||
|
|
||||||
position += bytes_read;
|
|
||||||
|
|
||||||
if bytes_read == 0 {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"zero bytes read while parsing x-forwarded-for"
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut headers = [httparse::EMPTY_HEADER; 32];
|
|
||||||
let mut req = httparse::Request::new(&mut headers);
|
|
||||||
|
|
||||||
if req.parse(&peek_buf)?.is_complete() {
|
|
||||||
for header in req.headers.iter() {
|
|
||||||
if header.name == "X-Forwarded-For" {
|
|
||||||
let ip: IpAddr = ::std::str::from_utf8(header.value)?.parse()?;
|
|
||||||
|
|
||||||
// ip.is_global() { // FIXME
|
|
||||||
if true {
|
|
||||||
return Ok(ip);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(anyhow::anyhow!(
|
|
||||||
"Could not determine source IP through X-Forwarded-For headers"
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_stream_agnostic_connection<
|
async fn run_stream_agnostic_connection<
|
||||||
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
|
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
|
||||||
>(
|
>(
|
||||||
|
|
@ -403,7 +352,7 @@ async fn run_stream_agnostic_connection<
|
||||||
out_message_consumer_id: ConsumerId,
|
out_message_consumer_id: ConsumerId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
stream: S,
|
stream: S,
|
||||||
peer_addr: CanonicalSocketAddr,
|
ip_version: IpVersion,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let ws_config = tungstenite::protocol::WebSocketConfig {
|
let ws_config = tungstenite::protocol::WebSocketConfig {
|
||||||
max_frame_size: Some(config.network.websocket_max_frame_size),
|
max_frame_size: Some(config.network.websocket_max_frame_size),
|
||||||
|
|
@ -429,7 +378,7 @@ async fn run_stream_agnostic_connection<
|
||||||
pending_scrape_slab,
|
pending_scrape_slab,
|
||||||
out_message_consumer_id,
|
out_message_consumer_id,
|
||||||
ws_in,
|
ws_in,
|
||||||
peer_addr,
|
ip_version,
|
||||||
connection_id,
|
connection_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -450,7 +399,6 @@ async fn run_stream_agnostic_connection<
|
||||||
connection_slab,
|
connection_slab,
|
||||||
ws_out,
|
ws_out,
|
||||||
pending_scrape_slab,
|
pending_scrape_slab,
|
||||||
peer_addr,
|
|
||||||
connection_id,
|
connection_id,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -475,7 +423,7 @@ struct ConnectionReader<S> {
|
||||||
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
||||||
out_message_consumer_id: ConsumerId,
|
out_message_consumer_id: ConsumerId,
|
||||||
ws_in: SplitStream<WebSocketStream<S>>,
|
ws_in: SplitStream<WebSocketStream<S>>,
|
||||||
peer_addr: CanonicalSocketAddr,
|
ip_version: IpVersion,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -658,7 +606,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
||||||
ConnectionMeta {
|
ConnectionMeta {
|
||||||
connection_id: self.connection_id,
|
connection_id: self.connection_id,
|
||||||
out_message_consumer_id: self.out_message_consumer_id,
|
out_message_consumer_id: self.out_message_consumer_id,
|
||||||
peer_addr: self.peer_addr,
|
ip_version: self.ip_version,
|
||||||
pending_scrape_id,
|
pending_scrape_id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -670,7 +618,6 @@ struct ConnectionWriter<S> {
|
||||||
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
|
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
|
||||||
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
||||||
peer_addr: CanonicalSocketAddr,
|
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -681,10 +628,6 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
|
||||||
anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed")
|
anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if meta.peer_addr != self.peer_addr {
|
|
||||||
return Err(anyhow::anyhow!("peer addresses didn't match"));
|
|
||||||
}
|
|
||||||
|
|
||||||
match out_message {
|
match out_message {
|
||||||
OutMessage::ScrapeResponse(out_message) => {
|
OutMessage::ScrapeResponse(out_message) => {
|
||||||
let pending_scrape_id = meta
|
let pending_scrape_id = meta
|
||||||
|
|
@ -753,11 +696,7 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
|
||||||
}
|
}
|
||||||
Ok(Err(err)) => Err(err.into()),
|
Ok(Err(err)) => Err(err.into()),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
::log::info!(
|
::log::info!("send_out_message: sending to peer took to long: {}", err);
|
||||||
"send_out_message: send to {} took to long: {}",
|
|
||||||
self.peer_addr.get(),
|
|
||||||
err
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -208,14 +208,11 @@ where
|
||||||
SwarmControlMessage::ConnectionClosed {
|
SwarmControlMessage::ConnectionClosed {
|
||||||
info_hash,
|
info_hash,
|
||||||
peer_id,
|
peer_id,
|
||||||
peer_addr,
|
ip_version,
|
||||||
} => {
|
} => {
|
||||||
::log::debug!(
|
::log::debug!("Removing peer from torrents because connection was closed");
|
||||||
"Removing peer {} from torrents because connection was closed",
|
|
||||||
peer_addr.get()
|
|
||||||
);
|
|
||||||
|
|
||||||
if peer_addr.is_ipv4() {
|
if let IpVersion::V4 = ip_version {
|
||||||
if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) {
|
if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) {
|
||||||
torrent_data.remove_peer(peer_id);
|
torrent_data.remove_peer(peer_id);
|
||||||
}
|
}
|
||||||
|
|
@ -305,18 +302,18 @@ fn handle_announce_request(
|
||||||
request_sender_meta: ConnectionMeta,
|
request_sender_meta: ConnectionMeta,
|
||||||
request: AnnounceRequest,
|
request: AnnounceRequest,
|
||||||
) {
|
) {
|
||||||
let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() {
|
let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version {
|
||||||
torrent_maps.ipv4.entry(request.info_hash).or_default()
|
torrent_maps.ipv4.entry(request.info_hash).or_default()
|
||||||
} else {
|
} else {
|
||||||
torrent_maps.ipv6.entry(request.info_hash).or_default()
|
torrent_maps.ipv6.entry(request.info_hash).or_default()
|
||||||
};
|
};
|
||||||
|
|
||||||
// If there is already a peer with this peer_id, check that socket
|
// If there is already a peer with this peer_id, check that connection id
|
||||||
// addr is same as that of request sender. Otherwise, ignore request.
|
// is same as that of request sender. Otherwise, ignore request. Since
|
||||||
// Since peers have access to each others peer_id's, they could send
|
// peers have access to each others peer_id's, they could send requests
|
||||||
// requests using them, causing all sorts of issues.
|
// using them, causing all sorts of issues.
|
||||||
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
|
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
|
||||||
if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr {
|
if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -454,7 +451,7 @@ fn handle_scrape_request(
|
||||||
files: HashMap::with_capacity(num_to_take),
|
files: HashMap::with_capacity(num_to_take),
|
||||||
};
|
};
|
||||||
|
|
||||||
let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() {
|
let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version {
|
||||||
&mut torrent_maps.ipv4
|
&mut torrent_maps.ipv4
|
||||||
} else {
|
} else {
|
||||||
&mut torrent_maps.ipv6
|
&mut torrent_maps.ipv6
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue