http: use separate sockets for IPv4 and IPv6 (#221)

This commit is contained in:
Joakim Frostegård 2025-01-16 22:53:56 +01:00 committed by GitHub
parent 048c297fc7
commit 94e3af2463
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 235 additions and 121 deletions

View file

@ -47,8 +47,8 @@ Generate the configuration file:
./target/release/aquatic_http -p > "aquatic-http-config.toml"
```
Make necessary adjustments to the file. You will likely want to adjust `address`
(listening address) under the `network` section.
Make necessary adjustments to the file. You will likely want to adjust
listening addresses under the `network` section.
To run over TLS, configure certificate and private key files.

View file

@ -12,6 +12,7 @@ use aquatic_http_protocol::{
use glommio::channels::shared_channel::SharedSender;
use slotmap::new_key_type;
#[allow(dead_code)]
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);

View file

@ -1,4 +1,7 @@
use std::{net::SocketAddr, path::PathBuf};
use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
path::PathBuf,
};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use aquatic_toml_config::TomlConfig;
@ -70,25 +73,24 @@ impl aquatic_common::cli::Config for Config {
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
/// Use IPv4
pub use_ipv4: bool,
/// Use IPv6
pub use_ipv6: bool,
/// IPv4 address and port
///
/// When providing an IPv4 style address, only IPv4 traffic will be
/// handled. Examples:
/// - "0.0.0.0:3000" binds to port 3000 on all network interfaces
/// - "127.0.0.1:3000" binds to port 3000 on the loopback interface
/// (localhost)
/// Examples:
/// - Use 0.0.0.0:3000 to bind to all interfaces on port 3000
/// - Use 127.0.0.1:3000 to bind to the loopback interface (localhost) on
/// port 3000
pub address_ipv4: SocketAddrV4,
/// IPv6 address and port
///
/// When it comes to IPv6-style addresses, behaviour is more complex and
/// differs between operating systems. On Linux, to accept both IPv4 and
/// IPv6 traffic on any interface, use "[::]:3000". Set the "only_ipv6"
/// flag below to limit traffic to IPv6. To bind to the loopback interface
/// and only accept IPv6 packets, use "[::1]:3000" and set the only_ipv6
/// flag. Receiving both IPv4 and IPv6 traffic on loopback is currently
/// not supported. For other operating systems, please refer to their
/// respective documentation.
pub address: SocketAddr,
/// Only allow access over IPv6
pub only_ipv6: bool,
/// Examples:
/// - Use [::]:3000 to bind to all interfaces on port 3000
/// - Use [::1]:3000 to bind to the loopback interface (localhost) on
/// port 3000
pub address_ipv6: SocketAddrV6,
/// Maximum number of pending TCP connections
pub tcp_backlog: i32,
/// Enable TLS
@ -125,21 +127,30 @@ pub struct NetworkConfig {
/// header. Works with typical multi-IP setups (e.g., "X-Forwarded-For")
/// as well as for single-IP setups (e.g., nginx "X-Real-IP")
pub reverse_proxy_ip_header_format: ReverseProxyPeerIpHeaderFormat,
/// Set flag on IPv6 socket to only accept IPv6 traffic.
///
/// This should typically be set to true unless your OS does not support
/// double-stack sockets (that is, sockets that receive both IPv4 and IPv6
/// packets).
pub set_only_ipv6: bool,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)),
use_ipv4: true,
use_ipv6: true,
address_ipv4: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 3000),
address_ipv6: SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 3000, 0, 0),
enable_tls: false,
tls_certificate_path: "".into(),
tls_private_key_path: "".into(),
only_ipv6: false,
tcp_backlog: 1024,
keep_alive: true,
runs_behind_reverse_proxy: false,
reverse_proxy_ip_header_name: "X-Forwarded-For".into(),
reverse_proxy_ip_header_format: Default::default(),
set_only_ipv6: true,
}
}
}

View file

@ -27,6 +27,12 @@ const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1])?;
if !(config.network.use_ipv4 || config.network.use_ipv6) {
return Result::Err(anyhow::anyhow!(
"Both use_ipv4 and use_ipv6 can not be set to false"
));
}
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
@ -35,7 +41,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
config.socket_workers + config.swarm_workers,
SHARED_CHANNEL_SIZE,
);
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let num_sockets_per_worker =
if config.network.use_ipv4 { 1 } else { 0 } + if config.network.use_ipv6 { 1 } else { 0 };
let priv_dropper = PrivilegeDropper::new(
config.privileges.clone(),
config.socket_workers * num_sockets_per_worker,
);
let opt_tls_config = if config.network.enable_tls {
Some(Arc::new(ArcSwap::from_pointee(create_rustls_config(
@ -55,7 +68,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = state.clone();
let opt_tls_config = opt_tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let priv_dropper = priv_dropper.clone();
let mut priv_droppers = Vec::new();
for _ in 0..num_sockets_per_worker {
priv_droppers.push(priv_dropper.clone());
}
let handle = Builder::new()
.name(format!("socket-{:02}", i + 1))
@ -68,7 +86,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
state,
opt_tls_config,
request_mesh_builder,
priv_dropper,
priv_droppers,
server_start_instant,
i,
))

View file

@ -2,21 +2,23 @@ mod connection;
mod request;
use std::cell::RefCell;
use std::net::SocketAddr;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aquatic_common::access_list::AccessList;
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, ServerStartInstant};
use arc_swap::ArcSwap;
use arc_swap::{ArcSwap, ArcSwapAny};
use futures_lite::future::race;
use futures_lite::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role};
use glommio::channels::local_channel::{new_bounded, LocalSender};
use glommio::net::TcpListener;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::net::{TcpListener, TcpStream};
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use slotmap::HopSlotMap;
@ -36,14 +38,43 @@ pub async fn run_socket_worker(
state: State,
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
priv_dropper: PrivilegeDropper,
mut priv_droppers: Vec<PrivilegeDropper>,
server_start_instant: ServerStartInstant,
worker_index: usize,
) -> anyhow::Result<()> {
let config = Rc::new(config);
let access_list = state.access_list;
let listener = create_tcp_listener(&config, priv_dropper).context("create tcp listener")?;
let tcp_listeners = {
let opt_listener_ipv4 = if config.network.use_ipv4 {
let priv_dropper = priv_droppers
.pop()
.ok_or(anyhow::anyhow!("no enough priv droppers"))?;
let socket =
create_tcp_listener(&config, priv_dropper, config.network.address_ipv4.into())
.context("create tcp listener")?;
Some(socket)
} else {
None
};
let opt_listener_ipv6 = if config.network.use_ipv6 {
let priv_dropper = priv_droppers
.pop()
.ok_or(anyhow::anyhow!("no enough priv droppers"))?;
let socket =
create_tcp_listener(&config, priv_dropper, config.network.address_ipv6.into())
.context("create tcp listener")?;
Some(socket)
} else {
None
};
[opt_listener_ipv4, opt_listener_ipv6]
.into_iter()
.flatten()
.collect::<Vec<_>>()
};
let (request_senders, _) = request_mesh_builder
.join(Role::Producer)
@ -61,94 +92,138 @@ pub async fn run_socket_worker(
)
}));
let mut incoming = listener.incoming();
let tasks = tcp_listeners
.into_iter()
.map(|tcp_listener| {
let listener_state = ListenerState {
config: config.clone(),
access_list: state.access_list.clone(),
opt_tls_config: opt_tls_config.clone(),
server_start_instant,
connection_handles: connection_handles.clone(),
request_senders: request_senders.clone(),
worker_index,
};
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let (close_conn_sender, close_conn_receiver) = new_bounded(1);
spawn_local(listener_state.accept_connections(tcp_listener))
})
.collect::<Vec<_>>();
let valid_until = Rc::new(RefCell::new(ValidUntil::new(
server_start_instant,
config.cleaning.max_connection_idle,
)));
for task in tasks {
task.await;
}
let connection_id = connection_handles.borrow_mut().insert(ConnectionHandle {
close_conn_sender,
valid_until: valid_until.clone(),
});
Ok(())
}
spawn_local(enclose!(
(
config,
access_list,
request_senders,
opt_tls_config,
connection_handles,
valid_until
)
async move {
#[cfg(feature = "metrics")]
let active_connections_gauge = ::metrics::gauge!(
"aquatic_active_connections",
"worker_index" => worker_index.to_string(),
);
#[derive(Clone)]
struct ListenerState {
config: Rc<Config>,
access_list: Arc<ArcSwapAny<Arc<AccessList>>>,
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
server_start_instant: ServerStartInstant,
connection_handles: Rc<RefCell<HopSlotMap<ConnectionId, ConnectionHandle>>>,
request_senders: Rc<Senders<ChannelRequest>>,
worker_index: usize,
}
#[cfg(feature = "metrics")]
active_connections_gauge.increment(1.0);
impl ListenerState {
async fn accept_connections(self, listener: TcpListener) {
let mut incoming = listener.incoming();
let f1 = async { run_connection(
config,
access_list,
request_senders,
server_start_instant,
opt_tls_config,
valid_until.clone(),
stream,
worker_index,
).await
};
let f2 = async {
close_conn_receiver.recv().await;
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let (close_conn_sender, close_conn_receiver) = new_bounded(1);
Err(ConnectionError::Inactive)
};
let valid_until = Rc::new(RefCell::new(ValidUntil::new(
self.server_start_instant,
self.config.cleaning.max_connection_idle,
)));
let result = race(f1, f2).await;
let connection_id =
self.connection_handles
.borrow_mut()
.insert(ConnectionHandle {
close_conn_sender,
valid_until: valid_until.clone(),
});
#[cfg(feature = "metrics")]
active_connections_gauge.decrement(1.0);
match result {
Ok(()) => (),
Err(err@(
ConnectionError::ResponseBufferWrite(_) |
ConnectionError::ResponseBufferFull |
ConnectionError::ScrapeChannelError(_) |
ConnectionError::ResponseSenderClosed
)) => {
::log::error!("connection closed: {:#}", err);
}
Err(err@ConnectionError::RequestBufferFull) => {
::log::info!("connection closed: {:#}", err);
}
Err(err) => {
::log::debug!("connection closed: {:#}", err);
}
}
connection_handles.borrow_mut().remove(connection_id);
}
))
.detach();
}
Err(err) => {
::log::error!("accept connection: {:?}", err);
spawn_local(self.clone().handle_connection(
close_conn_receiver,
valid_until,
connection_id,
stream,
))
.detach();
}
Err(err) => {
::log::error!("accept connection: {:?}", err);
}
}
}
}
Ok(())
async fn handle_connection(
self,
close_conn_receiver: LocalReceiver<()>,
valid_until: Rc<RefCell<ValidUntil>>,
connection_id: ConnectionId,
stream: TcpStream,
) {
#[cfg(feature = "metrics")]
let active_connections_gauge = ::metrics::gauge!(
"aquatic_active_connections",
"worker_index" => self.worker_index.to_string(),
);
#[cfg(feature = "metrics")]
active_connections_gauge.increment(1.0);
let f1 = async {
run_connection(
self.config,
self.access_list,
self.request_senders,
self.server_start_instant,
self.opt_tls_config,
valid_until.clone(),
stream,
self.worker_index,
)
.await
};
let f2 = async {
close_conn_receiver.recv().await;
Err(ConnectionError::Inactive)
};
let result = race(f1, f2).await;
#[cfg(feature = "metrics")]
active_connections_gauge.decrement(1.0);
match result {
Ok(()) => (),
Err(
err @ (ConnectionError::ResponseBufferWrite(_)
| ConnectionError::ResponseBufferFull
| ConnectionError::ScrapeChannelError(_)
| ConnectionError::ResponseSenderClosed),
) => {
::log::error!("connection closed: {:#}", err);
}
Err(err @ ConnectionError::RequestBufferFull) => {
::log::info!("connection closed: {:#}", err);
}
Err(err) => {
::log::debug!("connection closed: {:#}", err);
}
}
self.connection_handles.borrow_mut().remove(connection_id);
}
}
async fn clean_connections(
@ -176,32 +251,41 @@ async fn clean_connections(
fn create_tcp_listener(
config: &Config,
priv_dropper: PrivilegeDropper,
address: SocketAddr,
) -> anyhow::Result<TcpListener> {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
let socket = if address.is_ipv4() {
socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
if config.network.set_only_ipv6 {
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
if config.network.only_ipv6 {
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
};
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.bind(&config.network.address.into())
.with_context(|| format!("socket: bind to {}", config.network.address))?;
.bind(&address.into())
.with_context(|| format!("socket: bind to {}", address))?;
socket
.listen(config.network.tcp_backlog)
.with_context(|| format!("socket: listen on {}", config.network.address))?;
.with_context(|| format!("socket: listen on {}", address))?;
priv_dropper.after_socket_creation()?;