mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
WIP: ws: parse X-FORWARDED-FOR headers
This commit is contained in:
parent
9f9015d51c
commit
3b94b8e588
5 changed files with 85 additions and 24 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -295,6 +295,7 @@ dependencies = [
|
||||||
"futures-rustls",
|
"futures-rustls",
|
||||||
"glommio",
|
"glommio",
|
||||||
"hashbrown 0.12.3",
|
"hashbrown 0.12.3",
|
||||||
|
"httparse",
|
||||||
"log",
|
"log",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
"privdrop",
|
"privdrop",
|
||||||
|
|
|
||||||
6
TODO.md
6
TODO.md
|
|
@ -2,7 +2,11 @@
|
||||||
|
|
||||||
## High priority
|
## High priority
|
||||||
|
|
||||||
ws: reverse proxy support for non-TLS connections (parse x-forwarded-for)
|
* ws
|
||||||
|
* reverse proxy support for non-TLS connections (parse x-forwarded-for)
|
||||||
|
* how does this interact with IPv4/IPv6 differentiation?
|
||||||
|
* is it possible to skip determining peer IP's altogether or is it necessary
|
||||||
|
for IPv4/IPv6 separation / preventing peer mixups or abuse?
|
||||||
|
|
||||||
## Medium priority
|
## Medium priority
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ futures-lite = "1"
|
||||||
futures-rustls = "0.22"
|
futures-rustls = "0.22"
|
||||||
glommio = "0.7"
|
glommio = "0.7"
|
||||||
hashbrown = { version = "0.12", features = ["serde"] }
|
hashbrown = { version = "0.12", features = ["serde"] }
|
||||||
|
httparse = "1"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
mimalloc = { version = "0.1", default-features = false }
|
mimalloc = { version = "0.1", default-features = false }
|
||||||
privdrop = "0.5"
|
privdrop = "0.5"
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,11 @@ pub struct NetworkConfig {
|
||||||
pub websocket_max_message_size: usize,
|
pub websocket_max_message_size: usize,
|
||||||
pub websocket_max_frame_size: usize,
|
pub websocket_max_frame_size: usize,
|
||||||
|
|
||||||
|
/// Trust X-Forwarded-For headers to get peer IP. Only use this if you are
|
||||||
|
/// running aquatic_ws behind a reverse proxy that sets them and your
|
||||||
|
/// instance is not accessible by other means.
|
||||||
|
pub trust_x_forwarded_for: bool,
|
||||||
|
|
||||||
/// Return a HTTP 200 Ok response when receiving GET /health, but only
|
/// Return a HTTP 200 Ok response when receiving GET /health, but only
|
||||||
/// when not running over TLS
|
/// when not running over TLS
|
||||||
pub enable_http_health_checks: bool,
|
pub enable_http_health_checks: bool,
|
||||||
|
|
@ -89,6 +94,8 @@ impl Default for NetworkConfig {
|
||||||
websocket_max_message_size: 64 * 1024,
|
websocket_max_message_size: 64 * 1024,
|
||||||
websocket_max_frame_size: 16 * 1024,
|
websocket_max_frame_size: 16 * 1024,
|
||||||
|
|
||||||
|
trust_x_forwarded_for: false,
|
||||||
|
|
||||||
enable_http_health_checks: false,
|
enable_http_health_checks: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
|
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
@ -47,7 +48,8 @@ struct ConnectionReference {
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
peer_id: Option<PeerId>,
|
peer_id: Option<PeerId>,
|
||||||
announced_info_hashes: HashSet<InfoHash>,
|
announced_info_hashes: HashSet<InfoHash>,
|
||||||
peer_addr: CanonicalSocketAddr,
|
/// May need to be parsed from X-Forwarded-For headers
|
||||||
|
peer_addr: Option<CanonicalSocketAddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_socket_worker(
|
pub async fn run_socket_worker(
|
||||||
|
|
@ -114,18 +116,6 @@ pub async fn run_socket_worker(
|
||||||
while let Some(stream) = incoming.next().await {
|
while let Some(stream) = incoming.next().await {
|
||||||
match stream {
|
match stream {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
let peer_addr = match stream.peer_addr() {
|
|
||||||
Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr),
|
|
||||||
Err(err) => {
|
|
||||||
::log::info!(
|
|
||||||
"could not extract peer address, closing connection: {:#}",
|
|
||||||
err
|
|
||||||
);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE);
|
let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE);
|
||||||
let out_message_sender = Rc::new(out_message_sender);
|
let out_message_sender = Rc::new(out_message_sender);
|
||||||
|
|
||||||
|
|
@ -135,14 +125,10 @@ pub async fn run_socket_worker(
|
||||||
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
|
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
|
||||||
peer_id: None,
|
peer_id: None,
|
||||||
announced_info_hashes: Default::default(),
|
announced_info_hashes: Default::default(),
|
||||||
peer_addr,
|
peer_addr: None,
|
||||||
});
|
});
|
||||||
|
|
||||||
::log::info!(
|
::log::info!("accepting stream, assigning id {}", key);
|
||||||
"accepting stream from {}, assigning id {}",
|
|
||||||
peer_addr.get(),
|
|
||||||
key
|
|
||||||
);
|
|
||||||
|
|
||||||
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move {
|
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move {
|
||||||
if let Err(err) = run_connection(
|
if let Err(err) = run_connection(
|
||||||
|
|
@ -158,7 +144,6 @@ pub async fn run_socket_worker(
|
||||||
ConnectionId(key),
|
ConnectionId(key),
|
||||||
opt_tls_config,
|
opt_tls_config,
|
||||||
stream,
|
stream,
|
||||||
peer_addr
|
|
||||||
).await {
|
).await {
|
||||||
::log::debug!("connection error: {:#}", err);
|
::log::debug!("connection error: {:#}", err);
|
||||||
}
|
}
|
||||||
|
|
@ -171,12 +156,12 @@ pub async fn run_socket_worker(
|
||||||
|
|
||||||
// Tell swarm workers to remove peer
|
// Tell swarm workers to remove peer
|
||||||
if let Some(reference) = opt_reference {
|
if let Some(reference) = opt_reference {
|
||||||
if let Some(peer_id) = reference.peer_id {
|
if let (Some(peer_id), Some(peer_addr)) = (reference.peer_id, reference.peer_addr) {
|
||||||
for info_hash in reference.announced_info_hashes {
|
for info_hash in reference.announced_info_hashes {
|
||||||
let message = SwarmControlMessage::ConnectionClosed {
|
let message = SwarmControlMessage::ConnectionClosed {
|
||||||
info_hash,
|
info_hash,
|
||||||
peer_id,
|
peer_id,
|
||||||
peer_addr: reference.peer_addr,
|
peer_addr,
|
||||||
};
|
};
|
||||||
|
|
||||||
let consumer_index =
|
let consumer_index =
|
||||||
|
|
@ -275,8 +260,28 @@ async fn run_connection(
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
opt_tls_config: Option<Arc<RustlsConfig>>,
|
opt_tls_config: Option<Arc<RustlsConfig>>,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
peer_addr: CanonicalSocketAddr,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
let remote_addr = stream
|
||||||
|
.peer_addr()
|
||||||
|
.map_err(|err| anyhow::anyhow!("could not extract peer address: {:#}", err))?;
|
||||||
|
|
||||||
|
let peer_addr = if config.network.trust_x_forwarded_for {
|
||||||
|
let ip = parse_x_forwarded_for_ip(&stream).await?;
|
||||||
|
// Using the reverse proxy connection port here should be fine, since
|
||||||
|
// we only use the CanonicalPeerAddr to differentiate connections from
|
||||||
|
// each other as well as to determine if they run on IPv4 or IPv6,
|
||||||
|
// not for sending responses or passing on to peers.
|
||||||
|
let port = remote_addr.port();
|
||||||
|
|
||||||
|
CanonicalSocketAddr::new(SocketAddr::new(ip, port))
|
||||||
|
} else {
|
||||||
|
CanonicalSocketAddr::new(remote_addr)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(connection_reference) = connection_slab.borrow_mut().get_mut(connection_id.0) {
|
||||||
|
connection_reference.peer_addr = Some(peer_addr);
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(tls_config) = opt_tls_config {
|
if let Some(tls_config) = opt_tls_config {
|
||||||
let tls_acceptor: TlsAcceptor = tls_config.into();
|
let tls_acceptor: TlsAcceptor = tls_config.into();
|
||||||
|
|
||||||
|
|
@ -341,6 +346,49 @@ async fn run_connection(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn parse_x_forwarded_for_ip(stream: &TcpStream) -> anyhow::Result<IpAddr> {
|
||||||
|
let mut peek_buf = [0u8; 1024];
|
||||||
|
|
||||||
|
let mut position = 0usize;
|
||||||
|
|
||||||
|
for _ in 0..16 {
|
||||||
|
let bytes_read = stream
|
||||||
|
.peek(&mut peek_buf[position..])
|
||||||
|
.await
|
||||||
|
.map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?;
|
||||||
|
|
||||||
|
position += bytes_read;
|
||||||
|
|
||||||
|
if bytes_read == 0 {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"zero bytes read while parsing x-forwarded-for"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut headers = [httparse::EMPTY_HEADER; 32];
|
||||||
|
let mut req = httparse::Request::new(&mut headers);
|
||||||
|
|
||||||
|
if req.parse(&peek_buf)?.is_complete() {
|
||||||
|
for header in req.headers.iter() {
|
||||||
|
if header.name == "X-Forwarded-For" {
|
||||||
|
let ip: IpAddr = ::std::str::from_utf8(header.value)?.parse()?;
|
||||||
|
|
||||||
|
// ip.is_global() { // FIXME
|
||||||
|
if true {
|
||||||
|
return Ok(ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(anyhow::anyhow!(
|
||||||
|
"Could not determine source IP through X-Forwarded-For headers"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_stream_agnostic_connection<
|
async fn run_stream_agnostic_connection<
|
||||||
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
|
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
|
||||||
>(
|
>(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue