diff --git a/CHANGELOG.md b/CHANGELOG.md index 40f9b78..b38bbdb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,11 +14,18 @@ * Add support for reporting peer client information +### aquatic_http + +#### Added + +* Reload TLS certificate (and key) on SIGUSR1 + ### aquatic_ws #### Added * Add support for reporting peer client information +* Reload TLS certificate (and key) on SIGUSR1 #### Changed diff --git a/Cargo.lock b/Cargo.lock index 3240b92..7a09a85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ dependencies = [ "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", + "arc-swap", "cfg-if", "either", "futures", @@ -311,6 +312,7 @@ dependencies = [ "aquatic_peer_id", "aquatic_toml_config", "aquatic_ws_protocol", + "arc-swap", "async-tungstenite", "cfg-if", "futures", diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 616f1e6..8f52edd 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -28,6 +28,7 @@ aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true anyhow = "1" +arc-swap = "1" cfg-if = "1" either = "1" futures = "0.3" diff --git a/crates/http/src/config.rs b/crates/http/src/config.rs index f7cd045..7696bf0 100644 --- a/crates/http/src/config.rs +++ b/crates/http/src/config.rs @@ -32,7 +32,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, /// Access list configuration - /// + /// /// The file is read on start and when the program receives `SIGUSR1`. If /// initial parsing fails, the program exits. Later failures result in in /// emitting of an error-level log message, while successful updates of the @@ -77,6 +77,12 @@ pub struct NetworkConfig { /// Maximum number of pending TCP connections pub tcp_backlog: i32, /// Path to TLS certificate (DER-encoded X.509) + /// + /// The TLS files are read on start and when the program receives `SIGUSR1`. + /// If initial parsing fails, the program exits. Later failures result in + /// in emitting of an error-level log message, while successful updates + /// result in emitting of an info-level log message. Updates only affect + /// new connections. pub tls_certificate_path: PathBuf, /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) pub tls_private_key_path: PathBuf, diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 7f9fc65..d4e1864 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -9,6 +9,7 @@ use aquatic_common::{ rustls_config::create_rustls_config, PanicSentinelWatcher, ServerStartInstant, }; +use arc_swap::ArcSwap; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{ @@ -57,10 +58,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let tls_config = Arc::new(create_rustls_config( + let tls_config = Arc::new(ArcSwap::from_pointee(create_rustls_config( &config.network.tls_certificate_path, &config.network.tls_private_key_path, - )?); + )?)); let server_start_instant = ServerStartInstant::new(); @@ -144,6 +145,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { match signal { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); + + match create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) { + Ok(config) => { + tls_config.store(Arc::new(config)); + + ::log::info!("successfully updated tls config"); + } + Err(err) => ::log::error!("could not update tls config: {:#}", err), + } } SIGTERM => { if sentinel_watcher.panic_was_triggered() { diff --git a/crates/http/src/workers/socket.rs b/crates/http/src/workers/socket.rs index 3c1ac49..a3751ec 100644 --- a/crates/http/src/workers/socket.rs +++ b/crates/http/src/workers/socket.rs @@ -15,6 +15,7 @@ use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ FailureResponse, Response, ScrapeResponse, ScrapeStatistics, }; +use arc_swap::ArcSwap; use either::Either; use futures::stream::FuturesUnordered; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; @@ -59,7 +60,7 @@ pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, state: State, - tls_config: Arc, + tls_config: Arc>, request_mesh_builder: MeshBuilder, priv_dropper: PrivilegeDropper, server_start_instant: ServerStartInstant, @@ -208,12 +209,12 @@ impl Connection { request_senders: Rc>, server_start_instant: ServerStartInstant, connection_id: ConnectionId, - tls_config: Arc, + tls_config: Arc>, connection_slab: Rc>>, stream: TcpStream, peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { - let tls_acceptor: TlsAcceptor = tls_config.into(); + let tls_acceptor: TlsAcceptor = tls_config.load_full().into(); let stream = tls_acceptor.accept(stream).await?; let mut response_buffer = [0; RESPONSE_BUFFER_SIZE]; diff --git a/crates/udp/src/config.rs b/crates/udp/src/config.rs index e8f68b8..f2438f0 100644 --- a/crates/udp/src/config.rs +++ b/crates/udp/src/config.rs @@ -43,7 +43,7 @@ pub struct Config { pub privileges: PrivilegeConfig, /// Access list configuration - /// + /// /// The file is read on start and when the program receives `SIGUSR1`. If /// initial parsing fails, the program exits. Later failures result in in /// emitting of an error-level log message, while successful updates of the diff --git a/crates/ws/Cargo.toml b/crates/ws/Cargo.toml index 7de91e4..a05ee15 100644 --- a/crates/ws/Cargo.toml +++ b/crates/ws/Cargo.toml @@ -30,6 +30,7 @@ aquatic_ws_protocol.workspace = true anyhow = "1" async-tungstenite = "0.23" +arc-swap = "1" cfg-if = "1" futures = "0.3" futures-lite = "1" diff --git a/crates/ws/src/config.rs b/crates/ws/src/config.rs index f033e19..301d3fc 100644 --- a/crates/ws/src/config.rs +++ b/crates/ws/src/config.rs @@ -16,7 +16,7 @@ use aquatic_toml_config::TomlConfig; #[serde(default, deny_unknown_fields)] pub struct Config { /// Number of socket workers. - /// + /// /// On servers with 1-7 physical cores, using a worker per core is /// recommended. With more cores, using two workers less than the /// number of cores is recommended. @@ -26,7 +26,7 @@ pub struct Config { /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, /// Number of swarm workers. - /// + /// /// A single worker is recommended for servers with 1-7 physical cores. /// With more cores, using two workers is recommended. /// @@ -39,7 +39,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, /// Access list configuration - /// + /// /// The file is read on start and when the program receives `SIGUSR1`. If /// initial parsing fails, the program exits. Later failures result in in /// emitting of an error-level log message, while successful updates of the @@ -85,6 +85,12 @@ pub struct NetworkConfig { pub tcp_backlog: i32, /// Enable TLS + /// + /// The TLS files are read on start and when the program receives `SIGUSR1`. + /// If initial parsing fails, the program exits. Later failures result in + /// in emitting of an error-level log message, while successful updates + /// result in emitting of an info-level log message. Updates only affect + /// new connections. pub enable_tls: bool, /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, @@ -152,6 +158,11 @@ pub struct CleaningConfig { pub connection_cleaning_interval: u64, /// Close connections if no responses have been sent to them for this long (seconds) pub max_connection_idle: u32, + /// After updating TLS certificates, close connections running with + /// previous certificates after this long (seconds) + /// + /// Countdown starts at next connection cleaning. + pub close_after_tls_update_grace_period: u32, } impl Default for CleaningConfig { @@ -161,6 +172,7 @@ impl Default for CleaningConfig { max_peer_age: 1800, max_connection_idle: 60 * 5, connection_cleaning_interval: 30, + close_after_tls_update_grace_period: 60 * 60 * 60, } } } diff --git a/crates/ws/src/lib.rs b/crates/ws/src/lib.rs index 423b006..0a536cd 100644 --- a/crates/ws/src/lib.rs +++ b/crates/ws/src/lib.rs @@ -10,6 +10,7 @@ use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_fo use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; +use arc_swap::ArcSwap; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{ consts::{SIGTERM, SIGUSR1}, @@ -76,13 +77,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let opt_tls_config = if config.network.enable_tls { - Some(Arc::new( + Some(Arc::new(ArcSwap::from_pointee( create_rustls_config( &config.network.tls_certificate_path, &config.network.tls_private_key_path, ) .with_context(|| "create rustls config")?, - )) + ))) + } else { + None + }; + let mut opt_tls_cert_data = if config.network.enable_tls { + Some( + ::std::fs::read(&config.network.tls_certificate_path) + .with_context(|| "open tls certificate file")?, + ) } else { None }; @@ -181,6 +190,29 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { match signal { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); + + if let Some(tls_config) = opt_tls_config.as_ref() { + match ::std::fs::read(&config.network.tls_certificate_path) { + Ok(data) if &data == opt_tls_cert_data.as_ref().unwrap() => { + ::log::info!("skipping tls config update: certificate identical to currently loaded"); + } + Ok(data) => { + match create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) { + Ok(config) => { + tls_config.store(Arc::new(config)); + opt_tls_cert_data = Some(data); + + ::log::info!("successfully updated tls config"); + } + Err(err) => ::log::error!("could not update tls config: {:#}", err), + } + } + Err(err) => ::log::error!("couldn't read tls certificate file: {:#}", err), + } + } } SIGTERM => { if sentinel_watcher.panic_was_triggered() { diff --git a/crates/ws/src/workers/socket.rs b/crates/ws/src/workers/socket.rs index 58bfaa3..ff3797a 100644 --- a/crates/ws/src/workers/socket.rs +++ b/crates/ws/src/workers/socket.rs @@ -13,6 +13,7 @@ use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::{PanicSentinel, ServerStartInstant}; use aquatic_peer_id::PeerClient; use aquatic_ws_protocol::*; +use arc_swap::ArcSwap; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; use futures::{AsyncWriteExt, StreamExt}; @@ -53,13 +54,15 @@ struct ConnectionReference { announced_info_hashes: HashMap, ip_version: IpVersion, opt_peer_client: Option<(PeerClient, String)>, + opt_tls_config: Option>, + valid_until_after_tls_update: Option, } pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, state: State, - opt_tls_config: Option>, + opt_tls_config: Option>>, control_message_mesh_builder: MeshBuilder, in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, @@ -110,11 +113,12 @@ pub async fn run_socket_worker( // Periodically clean connections TimerActionRepeat::repeat_into( - enclose!((config, connection_slab) move || { + enclose!((config, connection_slab, opt_tls_config) move || { clean_connections( config.clone(), connection_slab.clone(), server_start_instant, + opt_tls_config.clone(), ) }), tq_prioritized, @@ -157,6 +161,8 @@ pub async fn run_socket_worker( announced_info_hashes: Default::default(), ip_version, opt_peer_client: None, + opt_tls_config: opt_tls_config.as_ref().map(|c| c.load_full()), + valid_until_after_tls_update: None, }); ::log::trace!("accepting stream, assigning id {}", key); @@ -261,10 +267,31 @@ async fn clean_connections( config: Rc, connection_slab: Rc>>, server_start_instant: ServerStartInstant, + opt_tls_config: Option>>, ) -> Option { let now = server_start_instant.seconds_elapsed(); + let opt_current_tls_config = opt_tls_config.map(|c| c.load_full()); connection_slab.borrow_mut().retain(|_, reference| { + if let Some(valid_until) = reference.valid_until_after_tls_update { + if !valid_until.valid(now) { + if let Some(handle) = reference.task_handle.as_ref() { + handle.cancel(); + } + + return false; + } + } else if let Some(false) = opt_current_tls_config + .as_ref() + .zip(reference.opt_tls_config.as_ref()) + .map(|(a, b)| Arc::ptr_eq(a, b)) + { + reference.valid_until_after_tls_update = Some(ValidUntil::new( + server_start_instant, + config.cleaning.close_after_tls_update_grace_period, + )); + } + if reference.valid_until.valid(now) { #[cfg(feature = "prometheus")] if let Some((peer_client, prefix)) = &reference.opt_peer_client { @@ -370,12 +397,12 @@ async fn run_connection( server_start_instant: ServerStartInstant, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, - opt_tls_config: Option>, + opt_tls_config: Option>>, ip_version: IpVersion, mut stream: TcpStream, ) -> anyhow::Result<()> { if let Some(tls_config) = opt_tls_config { - let tls_acceptor: TlsAcceptor = tls_config.into(); + let tls_acceptor: TlsAcceptor = tls_config.load_full().into(); let stream = tls_acceptor.accept(stream).await?;