diff --git a/README.md b/README.md index 20dc68f..e35c7b1 100644 --- a/README.md +++ b/README.md @@ -65,9 +65,9 @@ Generate configuration files. They come with comments and differ between protoco Make adjustments to the files. You will likely want to adjust `address` (listening address) under the `network` section. -Note that both `aquatic_http` and `aquatic_ws` require configuring TLS -certificate and private key files. More details are available in the -respective configuration files. +Note that both `aquatic_http` and `aquatic_ws` require configuring certificate +and private key files to run over TLS (which is optional for `aquatic_ws`). +More details are available in the respective configuration files. #### Workers diff --git a/TODO.md b/TODO.md index 5396993..2a3d1d4 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,8 @@ ## High priority +ws: does support for non-tls connections affect performance? + ## Medium priority * quit whole program if any thread panics diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 9a061c7..7e41bd0 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -60,6 +60,8 @@ pub struct NetworkConfig { /// Maximum number of pending TCP connections pub tcp_backlog: i32, + /// Enable TLS + pub enable_tls: bool, /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) @@ -68,8 +70,8 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, - /// Return a HTTP 200 Ok response when request is GET /health over plain - /// HTTP (no TLS!) + /// Return a HTTP 200 Ok response when receiving GET /health, but only + /// when not running over TLS pub enable_http_health_check: bool, } @@ -80,6 +82,7 @@ impl Default for NetworkConfig { only_ipv6: false, tcp_backlog: 1024, + enable_tls: false, tls_certificate_path: "".into(), tls_private_key_path: "".into(), diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 66cc57b..4fe33b3 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -26,6 +26,12 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + if config.network.enable_tls && config.network.enable_http_health_check { + return Err(anyhow::anyhow!( + "configuration: network.enable_tls and network.enable_http_health_check can't both be set to true" + )); + } + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; let state = State::default(); @@ -41,10 +47,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let tls_config = Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?); + let opt_tls_config = config + .network + .enable_tls + .then_some(Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?)); let mut executors = Vec::new(); @@ -52,7 +61,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); - let tls_config = tls_config.clone(); + let opt_tls_config = opt_tls_config.clone(); let control_mesh_builder = control_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); @@ -72,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, - tls_config, + opt_tls_config, control_mesh_builder, request_mesh_builder, response_mesh_builder, diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index f90cada..49ab4fc 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -16,7 +16,6 @@ use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; -use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; @@ -55,7 +54,7 @@ pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, state: State, - tls_config: Arc, + opt_tls_config: Option>, control_message_mesh_builder: MeshBuilder, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, @@ -139,9 +138,13 @@ pub async fn run_socket_worker( peer_addr, }); - ::log::info!("accepting stream: {}", key); + ::log::info!( + "accepting stream from {}, assigning id {}", + peer_addr.get(), + key + ); - let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move { + let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( config.clone(), access_list, @@ -153,13 +156,15 @@ pub async fn run_socket_worker( out_message_receiver, out_message_consumer_id, ConnectionId(key), - tls_config, + opt_tls_config, stream, - peer_addr, + peer_addr ).await { - ::log::debug!("Connection::run() error: {:#}", err); + ::log::debug!("connection error: {:#}", err); } + // Clean up after closed connection + // Remove reference in separate statement to avoid // multiple RefCell borrows let opt_reference = connection_slab.borrow_mut().try_remove(key); @@ -268,36 +273,90 @@ async fn run_connection( out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, - tls_config: Arc, + opt_tls_config: Option>, mut stream: TcpStream, peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { - if config.network.enable_http_health_check { - let mut peek_buf = [0u8; 11]; + if let Some(tls_config) = opt_tls_config { + let tls_acceptor: TlsAcceptor = tls_config.into(); - stream - .peek(&mut peek_buf) - .await - .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; + let stream = tls_acceptor.accept(stream).await?; + + run_stream_agnostic_connection( + config.clone(), + access_list, + in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), + out_message_sender, + out_message_receiver, + out_message_consumer_id, + connection_id, + stream, + peer_addr, + ) + .await + } else { + if config.network.enable_http_health_check { + let mut peek_buf = [0u8; 11]; - if &peek_buf == b"GET /health" { stream - .write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk") + .peek(&mut peek_buf) .await - .map_err(|err| anyhow::anyhow!("error sending health check response: {:#}", err))?; - stream.flush().await.map_err(|err| { - anyhow::anyhow!("error flushing health check response: {:#}", err) - })?; + .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; - return Err(anyhow::anyhow!( - "client requested health check, skipping websocket negotiation" - )); + if &peek_buf == b"GET /health" { + stream + .write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk") + .await + .map_err(|err| { + anyhow::anyhow!("error sending health check response: {:#}", err) + })?; + stream.flush().await.map_err(|err| { + anyhow::anyhow!("error flushing health check response: {:#}", err) + })?; + + return Err(anyhow::anyhow!( + "client requested health check, skipping websocket negotiation" + )); + } } + + run_stream_agnostic_connection( + config.clone(), + access_list, + in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), + out_message_sender, + out_message_receiver, + out_message_consumer_id, + connection_id, + stream, + peer_addr, + ) + .await } +} - let tls_acceptor: TlsAcceptor = tls_config.into(); - let stream = tls_acceptor.accept(stream).await?; - +async fn run_stream_agnostic_connection< + S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, +>( + config: Rc, + access_list: Arc, + in_message_senders: Rc>, + tq_prioritized: TaskQueueHandle, + tq_regular: TaskQueueHandle, + connection_slab: Rc>>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_consumer_id: ConsumerId, + connection_id: ConnectionId, + stream: S, + peer_addr: CanonicalSocketAddr, +) -> anyhow::Result<()> { let ws_config = tungstenite::protocol::WebSocketConfig { max_frame_size: Some(config.network.websocket_max_frame_size), max_message_size: Some(config.network.websocket_max_message_size), @@ -359,7 +418,7 @@ async fn run_connection( race(reader_handle, writer_handle).await.unwrap() } -struct ConnectionReader { +struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, connection_slab: Rc>>, @@ -367,12 +426,12 @@ struct ConnectionReader { out_message_sender: Rc>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, - ws_in: SplitStream>>, + ws_in: SplitStream>, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } -impl ConnectionReader { +impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { ::log::debug!("read_in_message"); @@ -557,17 +616,17 @@ impl ConnectionReader { } } -struct ConnectionWriter { +struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, connection_slab: Rc>>, - ws_out: SplitSink>, tungstenite::Message>, + ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } -impl ConnectionWriter { +impl ConnectionWriter { async fn run_out_message_loop(&mut self) -> anyhow::Result<()> { loop { let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| {