mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws: make TLS optional, allow HTTP health checks without TLS only
This commit is contained in:
parent
018f32e9e9
commit
a16ce91d46
5 changed files with 116 additions and 43 deletions
|
|
@ -65,9 +65,9 @@ Generate configuration files. They come with comments and differ between protoco
|
||||||
Make adjustments to the files. You will likely want to adjust `address`
|
Make adjustments to the files. You will likely want to adjust `address`
|
||||||
(listening address) under the `network` section.
|
(listening address) under the `network` section.
|
||||||
|
|
||||||
Note that both `aquatic_http` and `aquatic_ws` require configuring TLS
|
Note that both `aquatic_http` and `aquatic_ws` require configuring certificate
|
||||||
certificate and private key files. More details are available in the
|
and private key files to run over TLS (which is optional for `aquatic_ws`).
|
||||||
respective configuration files.
|
More details are available in the respective configuration files.
|
||||||
|
|
||||||
#### Workers
|
#### Workers
|
||||||
|
|
||||||
|
|
|
||||||
2
TODO.md
2
TODO.md
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
## High priority
|
## High priority
|
||||||
|
|
||||||
|
ws: does support for non-tls connections affect performance?
|
||||||
|
|
||||||
## Medium priority
|
## Medium priority
|
||||||
|
|
||||||
* quit whole program if any thread panics
|
* quit whole program if any thread panics
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,8 @@ pub struct NetworkConfig {
|
||||||
/// Maximum number of pending TCP connections
|
/// Maximum number of pending TCP connections
|
||||||
pub tcp_backlog: i32,
|
pub tcp_backlog: i32,
|
||||||
|
|
||||||
|
/// Enable TLS
|
||||||
|
pub enable_tls: bool,
|
||||||
/// Path to TLS certificate (DER-encoded X.509)
|
/// Path to TLS certificate (DER-encoded X.509)
|
||||||
pub tls_certificate_path: PathBuf,
|
pub tls_certificate_path: PathBuf,
|
||||||
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
|
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
|
||||||
|
|
@ -68,8 +70,8 @@ pub struct NetworkConfig {
|
||||||
pub websocket_max_message_size: usize,
|
pub websocket_max_message_size: usize,
|
||||||
pub websocket_max_frame_size: usize,
|
pub websocket_max_frame_size: usize,
|
||||||
|
|
||||||
/// Return a HTTP 200 Ok response when request is GET /health over plain
|
/// Return a HTTP 200 Ok response when receiving GET /health, but only
|
||||||
/// HTTP (no TLS!)
|
/// when not running over TLS
|
||||||
pub enable_http_health_check: bool,
|
pub enable_http_health_check: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -80,6 +82,7 @@ impl Default for NetworkConfig {
|
||||||
only_ipv6: false,
|
only_ipv6: false,
|
||||||
tcp_backlog: 1024,
|
tcp_backlog: 1024,
|
||||||
|
|
||||||
|
enable_tls: false,
|
||||||
tls_certificate_path: "".into(),
|
tls_certificate_path: "".into(),
|
||||||
tls_private_key_path: "".into(),
|
tls_private_key_path: "".into(),
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,12 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
pub const SHARED_IN_CHANNEL_SIZE: usize = 1024;
|
pub const SHARED_IN_CHANNEL_SIZE: usize = 1024;
|
||||||
|
|
||||||
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
|
if config.network.enable_tls && config.network.enable_http_health_check {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"configuration: network.enable_tls and network.enable_http_health_check can't both be set to true"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
|
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
|
||||||
|
|
||||||
let state = State::default();
|
let state = State::default();
|
||||||
|
|
@ -41,10 +47,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
|
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
|
||||||
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
|
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
|
||||||
|
|
||||||
let tls_config = Arc::new(create_rustls_config(
|
let opt_tls_config = config
|
||||||
|
.network
|
||||||
|
.enable_tls
|
||||||
|
.then_some(Arc::new(create_rustls_config(
|
||||||
&config.network.tls_certificate_path,
|
&config.network.tls_certificate_path,
|
||||||
&config.network.tls_private_key_path,
|
&config.network.tls_private_key_path,
|
||||||
)?);
|
)?));
|
||||||
|
|
||||||
let mut executors = Vec::new();
|
let mut executors = Vec::new();
|
||||||
|
|
||||||
|
|
@ -52,7 +61,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let sentinel = sentinel.clone();
|
let sentinel = sentinel.clone();
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
let tls_config = tls_config.clone();
|
let opt_tls_config = opt_tls_config.clone();
|
||||||
let control_mesh_builder = control_mesh_builder.clone();
|
let control_mesh_builder = control_mesh_builder.clone();
|
||||||
let request_mesh_builder = request_mesh_builder.clone();
|
let request_mesh_builder = request_mesh_builder.clone();
|
||||||
let response_mesh_builder = response_mesh_builder.clone();
|
let response_mesh_builder = response_mesh_builder.clone();
|
||||||
|
|
@ -72,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
sentinel,
|
sentinel,
|
||||||
config,
|
config,
|
||||||
state,
|
state,
|
||||||
tls_config,
|
opt_tls_config,
|
||||||
control_mesh_builder,
|
control_mesh_builder,
|
||||||
request_mesh_builder,
|
request_mesh_builder,
|
||||||
response_mesh_builder,
|
response_mesh_builder,
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ use async_tungstenite::WebSocketStream;
|
||||||
use futures::stream::{SplitSink, SplitStream};
|
use futures::stream::{SplitSink, SplitStream};
|
||||||
use futures::{AsyncWriteExt, StreamExt};
|
use futures::{AsyncWriteExt, StreamExt};
|
||||||
use futures_lite::future::race;
|
use futures_lite::future::race;
|
||||||
use futures_rustls::server::TlsStream;
|
|
||||||
use futures_rustls::TlsAcceptor;
|
use futures_rustls::TlsAcceptor;
|
||||||
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
|
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
|
||||||
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
|
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
|
||||||
|
|
@ -55,7 +54,7 @@ pub async fn run_socket_worker(
|
||||||
_sentinel: PanicSentinel,
|
_sentinel: PanicSentinel,
|
||||||
config: Config,
|
config: Config,
|
||||||
state: State,
|
state: State,
|
||||||
tls_config: Arc<RustlsConfig>,
|
opt_tls_config: Option<Arc<RustlsConfig>>,
|
||||||
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
|
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
|
||||||
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
|
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
|
||||||
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
|
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
|
||||||
|
|
@ -139,9 +138,13 @@ pub async fn run_socket_worker(
|
||||||
peer_addr,
|
peer_addr,
|
||||||
});
|
});
|
||||||
|
|
||||||
::log::info!("accepting stream: {}", key);
|
::log::info!(
|
||||||
|
"accepting stream from {}, assigning id {}",
|
||||||
|
peer_addr.get(),
|
||||||
|
key
|
||||||
|
);
|
||||||
|
|
||||||
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move {
|
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move {
|
||||||
if let Err(err) = run_connection(
|
if let Err(err) = run_connection(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
access_list,
|
access_list,
|
||||||
|
|
@ -153,13 +156,15 @@ pub async fn run_socket_worker(
|
||||||
out_message_receiver,
|
out_message_receiver,
|
||||||
out_message_consumer_id,
|
out_message_consumer_id,
|
||||||
ConnectionId(key),
|
ConnectionId(key),
|
||||||
tls_config,
|
opt_tls_config,
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
peer_addr
|
||||||
).await {
|
).await {
|
||||||
::log::debug!("Connection::run() error: {:#}", err);
|
::log::debug!("connection error: {:#}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up after closed connection
|
||||||
|
|
||||||
// Remove reference in separate statement to avoid
|
// Remove reference in separate statement to avoid
|
||||||
// multiple RefCell borrows
|
// multiple RefCell borrows
|
||||||
let opt_reference = connection_slab.borrow_mut().try_remove(key);
|
let opt_reference = connection_slab.borrow_mut().try_remove(key);
|
||||||
|
|
@ -268,10 +273,31 @@ async fn run_connection(
|
||||||
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
|
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
|
||||||
out_message_consumer_id: ConsumerId,
|
out_message_consumer_id: ConsumerId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
tls_config: Arc<RustlsConfig>,
|
opt_tls_config: Option<Arc<RustlsConfig>>,
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
peer_addr: CanonicalSocketAddr,
|
peer_addr: CanonicalSocketAddr,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
if let Some(tls_config) = opt_tls_config {
|
||||||
|
let tls_acceptor: TlsAcceptor = tls_config.into();
|
||||||
|
|
||||||
|
let stream = tls_acceptor.accept(stream).await?;
|
||||||
|
|
||||||
|
run_stream_agnostic_connection(
|
||||||
|
config.clone(),
|
||||||
|
access_list,
|
||||||
|
in_message_senders,
|
||||||
|
tq_prioritized,
|
||||||
|
tq_regular,
|
||||||
|
connection_slab.clone(),
|
||||||
|
out_message_sender,
|
||||||
|
out_message_receiver,
|
||||||
|
out_message_consumer_id,
|
||||||
|
connection_id,
|
||||||
|
stream,
|
||||||
|
peer_addr,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
} else {
|
||||||
if config.network.enable_http_health_check {
|
if config.network.enable_http_health_check {
|
||||||
let mut peek_buf = [0u8; 11];
|
let mut peek_buf = [0u8; 11];
|
||||||
|
|
||||||
|
|
@ -284,7 +310,9 @@ async fn run_connection(
|
||||||
stream
|
stream
|
||||||
.write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk")
|
.write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk")
|
||||||
.await
|
.await
|
||||||
.map_err(|err| anyhow::anyhow!("error sending health check response: {:#}", err))?;
|
.map_err(|err| {
|
||||||
|
anyhow::anyhow!("error sending health check response: {:#}", err)
|
||||||
|
})?;
|
||||||
stream.flush().await.map_err(|err| {
|
stream.flush().await.map_err(|err| {
|
||||||
anyhow::anyhow!("error flushing health check response: {:#}", err)
|
anyhow::anyhow!("error flushing health check response: {:#}", err)
|
||||||
})?;
|
})?;
|
||||||
|
|
@ -295,9 +323,40 @@ async fn run_connection(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let tls_acceptor: TlsAcceptor = tls_config.into();
|
run_stream_agnostic_connection(
|
||||||
let stream = tls_acceptor.accept(stream).await?;
|
config.clone(),
|
||||||
|
access_list,
|
||||||
|
in_message_senders,
|
||||||
|
tq_prioritized,
|
||||||
|
tq_regular,
|
||||||
|
connection_slab.clone(),
|
||||||
|
out_message_sender,
|
||||||
|
out_message_receiver,
|
||||||
|
out_message_consumer_id,
|
||||||
|
connection_id,
|
||||||
|
stream,
|
||||||
|
peer_addr,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_stream_agnostic_connection<
|
||||||
|
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
|
||||||
|
>(
|
||||||
|
config: Rc<Config>,
|
||||||
|
access_list: Arc<AccessListArcSwap>,
|
||||||
|
in_message_senders: Rc<Senders<(ConnectionMeta, InMessage)>>,
|
||||||
|
tq_prioritized: TaskQueueHandle,
|
||||||
|
tq_regular: TaskQueueHandle,
|
||||||
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
|
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
|
||||||
|
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
|
||||||
|
out_message_consumer_id: ConsumerId,
|
||||||
|
connection_id: ConnectionId,
|
||||||
|
stream: S,
|
||||||
|
peer_addr: CanonicalSocketAddr,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
let ws_config = tungstenite::protocol::WebSocketConfig {
|
let ws_config = tungstenite::protocol::WebSocketConfig {
|
||||||
max_frame_size: Some(config.network.websocket_max_frame_size),
|
max_frame_size: Some(config.network.websocket_max_frame_size),
|
||||||
max_message_size: Some(config.network.websocket_max_message_size),
|
max_message_size: Some(config.network.websocket_max_message_size),
|
||||||
|
|
@ -359,7 +418,7 @@ async fn run_connection(
|
||||||
race(reader_handle, writer_handle).await.unwrap()
|
race(reader_handle, writer_handle).await.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ConnectionReader {
|
struct ConnectionReader<S> {
|
||||||
config: Rc<Config>,
|
config: Rc<Config>,
|
||||||
access_list_cache: AccessListCache,
|
access_list_cache: AccessListCache,
|
||||||
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
|
|
@ -367,12 +426,12 @@ struct ConnectionReader {
|
||||||
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
|
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
|
||||||
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
||||||
out_message_consumer_id: ConsumerId,
|
out_message_consumer_id: ConsumerId,
|
||||||
ws_in: SplitStream<WebSocketStream<TlsStream<TcpStream>>>,
|
ws_in: SplitStream<WebSocketStream<S>>,
|
||||||
peer_addr: CanonicalSocketAddr,
|
peer_addr: CanonicalSocketAddr,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectionReader {
|
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
||||||
async fn run_in_message_loop(&mut self) -> anyhow::Result<()> {
|
async fn run_in_message_loop(&mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
::log::debug!("read_in_message");
|
::log::debug!("read_in_message");
|
||||||
|
|
@ -557,17 +616,17 @@ impl ConnectionReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ConnectionWriter {
|
struct ConnectionWriter<S> {
|
||||||
config: Rc<Config>,
|
config: Rc<Config>,
|
||||||
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
|
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
|
||||||
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
ws_out: SplitSink<WebSocketStream<TlsStream<TcpStream>>, tungstenite::Message>,
|
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
|
||||||
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
|
||||||
peer_addr: CanonicalSocketAddr,
|
peer_addr: CanonicalSocketAddr,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectionWriter {
|
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
|
||||||
async fn run_out_message_loop(&mut self) -> anyhow::Result<()> {
|
async fn run_out_message_loop(&mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| {
|
let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue