Merge pull request #152 from greatest-ape/work-2023-10-24

ws & http: reload tls config on SIGUSR1
This commit is contained in:
Joakim Frostegård 2023-10-24 21:40:54 +02:00 committed by GitHub
commit b473bb6fba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 118 additions and 16 deletions

View file

@ -14,11 +14,18 @@
* Add support for reporting peer client information * Add support for reporting peer client information
### aquatic_http
#### Added
* Reload TLS certificate (and key) on SIGUSR1
### aquatic_ws ### aquatic_ws
#### Added #### Added
* Add support for reporting peer client information * Add support for reporting peer client information
* Reload TLS certificate (and key) on SIGUSR1
#### Changed #### Changed

2
Cargo.lock generated
View file

@ -118,6 +118,7 @@ dependencies = [
"aquatic_common", "aquatic_common",
"aquatic_http_protocol", "aquatic_http_protocol",
"aquatic_toml_config", "aquatic_toml_config",
"arc-swap",
"cfg-if", "cfg-if",
"either", "either",
"futures", "futures",
@ -311,6 +312,7 @@ dependencies = [
"aquatic_peer_id", "aquatic_peer_id",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_ws_protocol", "aquatic_ws_protocol",
"arc-swap",
"async-tungstenite", "async-tungstenite",
"cfg-if", "cfg-if",
"futures", "futures",

View file

@ -28,6 +28,7 @@ aquatic_http_protocol.workspace = true
aquatic_toml_config.workspace = true aquatic_toml_config.workspace = true
anyhow = "1" anyhow = "1"
arc-swap = "1"
cfg-if = "1" cfg-if = "1"
either = "1" either = "1"
futures = "0.3" futures = "0.3"

View file

@ -32,7 +32,7 @@ pub struct Config {
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
/// Access list configuration /// Access list configuration
/// ///
/// The file is read on start and when the program receives `SIGUSR1`. If /// The file is read on start and when the program receives `SIGUSR1`. If
/// initial parsing fails, the program exits. Later failures result in in /// initial parsing fails, the program exits. Later failures result in in
/// emitting of an error-level log message, while successful updates of the /// emitting of an error-level log message, while successful updates of the
@ -77,6 +77,12 @@ pub struct NetworkConfig {
/// Maximum number of pending TCP connections /// Maximum number of pending TCP connections
pub tcp_backlog: i32, pub tcp_backlog: i32,
/// Path to TLS certificate (DER-encoded X.509) /// Path to TLS certificate (DER-encoded X.509)
///
/// The TLS files are read on start and when the program receives `SIGUSR1`.
/// If initial parsing fails, the program exits. Later failures result in
/// in emitting of an error-level log message, while successful updates
/// result in emitting of an info-level log message. Updates only affect
/// new connections.
pub tls_certificate_path: PathBuf, pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf, pub tls_private_key_path: PathBuf,

View file

@ -9,6 +9,7 @@ use aquatic_common::{
rustls_config::create_rustls_config, rustls_config::create_rustls_config,
PanicSentinelWatcher, ServerStartInstant, PanicSentinelWatcher, ServerStartInstant,
}; };
use arc_swap::ArcSwap;
use common::State; use common::State;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{ use signal_hook::{
@ -57,10 +58,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let tls_config = Arc::new(create_rustls_config( let tls_config = Arc::new(ArcSwap::from_pointee(create_rustls_config(
&config.network.tls_certificate_path, &config.network.tls_certificate_path,
&config.network.tls_private_key_path, &config.network.tls_private_key_path,
)?); )?));
let server_start_instant = ServerStartInstant::new(); let server_start_instant = ServerStartInstant::new();
@ -144,6 +145,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list); let _ = update_access_list(&config.access_list, &state.access_list);
match create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
) {
Ok(config) => {
tls_config.store(Arc::new(config));
::log::info!("successfully updated tls config");
}
Err(err) => ::log::error!("could not update tls config: {:#}", err),
}
} }
SIGTERM => { SIGTERM => {
if sentinel_watcher.panic_was_triggered() { if sentinel_watcher.panic_was_triggered() {

View file

@ -15,6 +15,7 @@ use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{ use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics, FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
}; };
use arc_swap::ArcSwap;
use either::Either; use either::Either;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
@ -59,7 +60,7 @@ pub async fn run_socket_worker(
_sentinel: PanicSentinel, _sentinel: PanicSentinel,
config: Config, config: Config,
state: State, state: State,
tls_config: Arc<RustlsConfig>, tls_config: Arc<ArcSwap<RustlsConfig>>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>, request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
priv_dropper: PrivilegeDropper, priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
@ -208,12 +209,12 @@ impl Connection {
request_senders: Rc<Senders<ChannelRequest>>, request_senders: Rc<Senders<ChannelRequest>>,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
connection_id: ConnectionId, connection_id: ConnectionId,
tls_config: Arc<RustlsConfig>, tls_config: Arc<ArcSwap<RustlsConfig>>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>, connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TcpStream, stream: TcpStream,
peer_addr: CanonicalSocketAddr, peer_addr: CanonicalSocketAddr,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let tls_acceptor: TlsAcceptor = tls_config.into(); let tls_acceptor: TlsAcceptor = tls_config.load_full().into();
let stream = tls_acceptor.accept(stream).await?; let stream = tls_acceptor.accept(stream).await?;
let mut response_buffer = [0; RESPONSE_BUFFER_SIZE]; let mut response_buffer = [0; RESPONSE_BUFFER_SIZE];

View file

@ -43,7 +43,7 @@ pub struct Config {
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
/// Access list configuration /// Access list configuration
/// ///
/// The file is read on start and when the program receives `SIGUSR1`. If /// The file is read on start and when the program receives `SIGUSR1`. If
/// initial parsing fails, the program exits. Later failures result in in /// initial parsing fails, the program exits. Later failures result in in
/// emitting of an error-level log message, while successful updates of the /// emitting of an error-level log message, while successful updates of the

View file

@ -30,6 +30,7 @@ aquatic_ws_protocol.workspace = true
anyhow = "1" anyhow = "1"
async-tungstenite = "0.23" async-tungstenite = "0.23"
arc-swap = "1"
cfg-if = "1" cfg-if = "1"
futures = "0.3" futures = "0.3"
futures-lite = "1" futures-lite = "1"

View file

@ -16,7 +16,7 @@ use aquatic_toml_config::TomlConfig;
#[serde(default, deny_unknown_fields)] #[serde(default, deny_unknown_fields)]
pub struct Config { pub struct Config {
/// Number of socket workers. /// Number of socket workers.
/// ///
/// On servers with 1-7 physical cores, using a worker per core is /// On servers with 1-7 physical cores, using a worker per core is
/// recommended. With more cores, using two workers less than the /// recommended. With more cores, using two workers less than the
/// number of cores is recommended. /// number of cores is recommended.
@ -26,7 +26,7 @@ pub struct Config {
/// swarm workers, encode them and send them back over the socket. /// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize, pub socket_workers: usize,
/// Number of swarm workers. /// Number of swarm workers.
/// ///
/// A single worker is recommended for servers with 1-7 physical cores. /// A single worker is recommended for servers with 1-7 physical cores.
/// With more cores, using two workers is recommended. /// With more cores, using two workers is recommended.
/// ///
@ -39,7 +39,7 @@ pub struct Config {
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
/// Access list configuration /// Access list configuration
/// ///
/// The file is read on start and when the program receives `SIGUSR1`. If /// The file is read on start and when the program receives `SIGUSR1`. If
/// initial parsing fails, the program exits. Later failures result in in /// initial parsing fails, the program exits. Later failures result in in
/// emitting of an error-level log message, while successful updates of the /// emitting of an error-level log message, while successful updates of the
@ -85,6 +85,12 @@ pub struct NetworkConfig {
pub tcp_backlog: i32, pub tcp_backlog: i32,
/// Enable TLS /// Enable TLS
///
/// The TLS files are read on start and when the program receives `SIGUSR1`.
/// If initial parsing fails, the program exits. Later failures result in
/// in emitting of an error-level log message, while successful updates
/// result in emitting of an info-level log message. Updates only affect
/// new connections.
pub enable_tls: bool, pub enable_tls: bool,
/// Path to TLS certificate (DER-encoded X.509) /// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf, pub tls_certificate_path: PathBuf,
@ -152,6 +158,11 @@ pub struct CleaningConfig {
pub connection_cleaning_interval: u64, pub connection_cleaning_interval: u64,
/// Close connections if no responses have been sent to them for this long (seconds) /// Close connections if no responses have been sent to them for this long (seconds)
pub max_connection_idle: u32, pub max_connection_idle: u32,
/// After updating TLS certificates, close connections running with
/// previous certificates after this long (seconds)
///
/// Countdown starts at next connection cleaning.
pub close_after_tls_update_grace_period: u32,
} }
impl Default for CleaningConfig { impl Default for CleaningConfig {
@ -161,6 +172,7 @@ impl Default for CleaningConfig {
max_peer_age: 1800, max_peer_age: 1800,
max_connection_idle: 60 * 5, max_connection_idle: 60 * 5,
connection_cleaning_interval: 30, connection_cleaning_interval: 30,
close_after_tls_update_grace_period: 60 * 60 * 60,
} }
} }
} }

View file

@ -10,6 +10,7 @@ use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_fo
use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::cpu_pinning::WorkerIndex;
use aquatic_common::rustls_config::create_rustls_config; use aquatic_common::rustls_config::create_rustls_config;
use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use aquatic_common::{PanicSentinelWatcher, ServerStartInstant};
use arc_swap::ArcSwap;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{ use signal_hook::{
consts::{SIGTERM, SIGUSR1}, consts::{SIGTERM, SIGUSR1},
@ -76,13 +77,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let opt_tls_config = if config.network.enable_tls { let opt_tls_config = if config.network.enable_tls {
Some(Arc::new( Some(Arc::new(ArcSwap::from_pointee(
create_rustls_config( create_rustls_config(
&config.network.tls_certificate_path, &config.network.tls_certificate_path,
&config.network.tls_private_key_path, &config.network.tls_private_key_path,
) )
.with_context(|| "create rustls config")?, .with_context(|| "create rustls config")?,
)) )))
} else {
None
};
let mut opt_tls_cert_data = if config.network.enable_tls {
Some(
::std::fs::read(&config.network.tls_certificate_path)
.with_context(|| "open tls certificate file")?,
)
} else { } else {
None None
}; };
@ -181,6 +190,29 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list); let _ = update_access_list(&config.access_list, &state.access_list);
if let Some(tls_config) = opt_tls_config.as_ref() {
match ::std::fs::read(&config.network.tls_certificate_path) {
Ok(data) if &data == opt_tls_cert_data.as_ref().unwrap() => {
::log::info!("skipping tls config update: certificate identical to currently loaded");
}
Ok(data) => {
match create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
) {
Ok(config) => {
tls_config.store(Arc::new(config));
opt_tls_cert_data = Some(data);
::log::info!("successfully updated tls config");
}
Err(err) => ::log::error!("could not update tls config: {:#}", err),
}
}
Err(err) => ::log::error!("couldn't read tls certificate file: {:#}", err),
}
}
} }
SIGTERM => { SIGTERM => {
if sentinel_watcher.panic_was_triggered() { if sentinel_watcher.panic_was_triggered() {

View file

@ -13,6 +13,7 @@ use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{PanicSentinel, ServerStartInstant}; use aquatic_common::{PanicSentinel, ServerStartInstant};
use aquatic_peer_id::PeerClient; use aquatic_peer_id::PeerClient;
use aquatic_ws_protocol::*; use aquatic_ws_protocol::*;
use arc_swap::ArcSwap;
use async_tungstenite::WebSocketStream; use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream}; use futures::stream::{SplitSink, SplitStream};
use futures::{AsyncWriteExt, StreamExt}; use futures::{AsyncWriteExt, StreamExt};
@ -53,13 +54,15 @@ struct ConnectionReference {
announced_info_hashes: HashMap<InfoHash, PeerId>, announced_info_hashes: HashMap<InfoHash, PeerId>,
ip_version: IpVersion, ip_version: IpVersion,
opt_peer_client: Option<(PeerClient, String)>, opt_peer_client: Option<(PeerClient, String)>,
opt_tls_config: Option<Arc<RustlsConfig>>,
valid_until_after_tls_update: Option<ValidUntil>,
} }
pub async fn run_socket_worker( pub async fn run_socket_worker(
_sentinel: PanicSentinel, _sentinel: PanicSentinel,
config: Config, config: Config,
state: State, state: State,
opt_tls_config: Option<Arc<RustlsConfig>>, opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>, control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>,
@ -110,11 +113,12 @@ pub async fn run_socket_worker(
// Periodically clean connections // Periodically clean connections
TimerActionRepeat::repeat_into( TimerActionRepeat::repeat_into(
enclose!((config, connection_slab) move || { enclose!((config, connection_slab, opt_tls_config) move || {
clean_connections( clean_connections(
config.clone(), config.clone(),
connection_slab.clone(), connection_slab.clone(),
server_start_instant, server_start_instant,
opt_tls_config.clone(),
) )
}), }),
tq_prioritized, tq_prioritized,
@ -157,6 +161,8 @@ pub async fn run_socket_worker(
announced_info_hashes: Default::default(), announced_info_hashes: Default::default(),
ip_version, ip_version,
opt_peer_client: None, opt_peer_client: None,
opt_tls_config: opt_tls_config.as_ref().map(|c| c.load_full()),
valid_until_after_tls_update: None,
}); });
::log::trace!("accepting stream, assigning id {}", key); ::log::trace!("accepting stream, assigning id {}", key);
@ -261,10 +267,31 @@ async fn clean_connections(
config: Rc<Config>, config: Rc<Config>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>, connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
) -> Option<Duration> { ) -> Option<Duration> {
let now = server_start_instant.seconds_elapsed(); let now = server_start_instant.seconds_elapsed();
let opt_current_tls_config = opt_tls_config.map(|c| c.load_full());
connection_slab.borrow_mut().retain(|_, reference| { connection_slab.borrow_mut().retain(|_, reference| {
if let Some(valid_until) = reference.valid_until_after_tls_update {
if !valid_until.valid(now) {
if let Some(handle) = reference.task_handle.as_ref() {
handle.cancel();
}
return false;
}
} else if let Some(false) = opt_current_tls_config
.as_ref()
.zip(reference.opt_tls_config.as_ref())
.map(|(a, b)| Arc::ptr_eq(a, b))
{
reference.valid_until_after_tls_update = Some(ValidUntil::new(
server_start_instant,
config.cleaning.close_after_tls_update_grace_period,
));
}
if reference.valid_until.valid(now) { if reference.valid_until.valid(now) {
#[cfg(feature = "prometheus")] #[cfg(feature = "prometheus")]
if let Some((peer_client, prefix)) = &reference.opt_peer_client { if let Some((peer_client, prefix)) = &reference.opt_peer_client {
@ -370,12 +397,12 @@ async fn run_connection(
server_start_instant: ServerStartInstant, server_start_instant: ServerStartInstant,
out_message_consumer_id: ConsumerId, out_message_consumer_id: ConsumerId,
connection_id: ConnectionId, connection_id: ConnectionId,
opt_tls_config: Option<Arc<RustlsConfig>>, opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
ip_version: IpVersion, ip_version: IpVersion,
mut stream: TcpStream, mut stream: TcpStream,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if let Some(tls_config) = opt_tls_config { if let Some(tls_config) = opt_tls_config {
let tls_acceptor: TlsAcceptor = tls_config.into(); let tls_acceptor: TlsAcceptor = tls_config.load_full().into();
let stream = tls_acceptor.accept(stream).await?; let stream = tls_acceptor.accept(stream).await?;