From c888017072804389cf9ce2cf30118453079091c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Apr 2022 01:26:40 +0200 Subject: [PATCH 01/23] Improve privilege dropping; run cargo fmt --- aquatic_common/src/cpu_pinning.rs | 4 +- aquatic_common/src/privileges.rs | 64 ++++++++++++++---------------- aquatic_http/src/config.rs | 5 ++- aquatic_http/src/lib.rs | 17 +++----- aquatic_http/src/workers/socket.rs | 11 ++--- aquatic_udp/src/lib.rs | 16 ++------ aquatic_udp/src/workers/socket.rs | 16 ++++---- aquatic_ws/src/lib.rs | 17 +++----- aquatic_ws/src/workers/socket.rs | 12 +++--- 9 files changed, 70 insertions(+), 92 deletions(-) diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index b82d5bb..0e03e52 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -194,7 +194,9 @@ pub mod glommio { // 15 -> 14 and 15 // 14 -> 12 and 13 // 13 -> 10 and 11 - CpuPinningDirection::Descending => num_cpu_cores - 2 * (num_cpu_cores - core_index), + CpuPinningDirection::Descending => { + num_cpu_cores - 2 * (num_cpu_cores - core_index) + } }; get_cpu_set()? diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index 0e4a627..4475830 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -1,22 +1,22 @@ use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, + path::PathBuf, + sync::{Arc, Barrier}, }; -use aquatic_toml_config::TomlConfig; use privdrop::PrivDrop; use serde::Deserialize; +use aquatic_toml_config::TomlConfig; + #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default)] pub struct PrivilegeConfig { - /// Chroot and switch user after binding to sockets + /// Chroot and switch group and user after binding to sockets pub drop_privileges: bool, /// Chroot to this path - pub chroot_path: String, + pub chroot_path: PathBuf, + /// Group to switch to after chrooting + pub group: String, /// User to switch to after chrooting pub user: String, } @@ -25,41 +25,37 @@ impl Default for PrivilegeConfig { fn default() -> Self { Self { drop_privileges: false, - chroot_path: ".".to_string(), + chroot_path: ".".into(), user: "nobody".to_string(), + group: "nobody".to_string(), } } } -pub fn drop_privileges_after_socket_binding( - config: &PrivilegeConfig, - num_bound_sockets: Arc, - target_num: usize, -) -> anyhow::Result<()> { - if config.drop_privileges { - let mut counter = 0usize; +#[derive(Clone)] +pub struct PrivilegeDropper { + barrier: Arc, + config: Arc, +} - loop { - let num_bound = num_bound_sockets.load(Ordering::SeqCst); +impl PrivilegeDropper { + pub fn new(config: PrivilegeConfig, num_sockets: usize) -> Self { + Self { + barrier: Arc::new(Barrier::new(num_sockets)), + config: Arc::new(config), + } + } - if num_bound == target_num { + pub fn after_socket_creation(&self) { + if self.config.drop_privileges { + if self.barrier.wait().is_leader() { PrivDrop::default() - .chroot(config.chroot_path.clone()) - .user(config.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); + .chroot(self.config.chroot_path.clone()) + .user(self.config.user.clone()) + .user(self.config.user.clone()) + .apply() + .expect("drop privileges"); } } } - - Ok(()) } diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index c94cf1e..0cbcea9 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -1,6 +1,9 @@ use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig, cpu_pinning::asc::CpuPinningConfigAsc}; +use aquatic_common::{ + access_list::AccessListConfig, cpu_pinning::asc::CpuPinningConfigAsc, + privileges::PrivilegeConfig, +}; use aquatic_toml_config::TomlConfig; use serde::Deserialize; diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 28ca996..7ce2065 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -4,13 +4,13 @@ use aquatic_common::{ glommio::{get_worker_placement, set_affinity_for_util_worker}, WorkerIndex, }, - privileges::drop_privileges_after_socket_binding, + privileges::PrivilegeDropper, rustls_config::create_rustls_config, }; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{consts::SIGUSR1, iterator::Signals}; -use std::sync::{atomic::AtomicUsize, Arc}; +use std::sync::Arc; use crate::config::Config; @@ -63,7 +63,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let tls_config = Arc::new(create_rustls_config( &config.network.tls_certificate_path, @@ -78,7 +78,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let tls_config = tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let num_bound_sockets = num_bound_sockets.clone(); + let priv_dropper = priv_dropper.clone(); let placement = get_worker_placement( &config.cpu_pinning, @@ -95,7 +95,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { tls_config, request_mesh_builder, response_mesh_builder, - num_bound_sockets, + priv_dropper, ) .await }); @@ -130,13 +130,6 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { executors.push(executor); } - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - if config.cpu_pinning.active { set_affinity_for_util_worker( &config.cpu_pinning, diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 3992551..50e6e98 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -2,11 +2,11 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::CanonicalSocketAddr; use aquatic_http_protocol::common::InfoHash; @@ -58,13 +58,12 @@ pub async fn run_socket_worker( tls_config: Arc, request_mesh_builder: MeshBuilder, response_mesh_builder: MeshBuilder, - num_bound_sockets: Arc, + priv_dropper: PrivilegeDropper, ) { let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config); - num_bound_sockets.fetch_add(1, Ordering::SeqCst); + let listener = create_tcp_listener(&config, priv_dropper); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); let request_senders = Rc::new(request_senders); @@ -485,7 +484,7 @@ fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usi (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config) -> TcpListener { +fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> TcpListener { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { @@ -509,5 +508,7 @@ fn create_tcp_listener(config: &Config) -> TcpListener { .listen(config.network.tcp_backlog) .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + priv_dropper.after_socket_creation(); + unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } } diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index d56d743..5a805d8 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -5,13 +5,12 @@ pub mod workers; use config::Config; use std::collections::BTreeMap; -use std::sync::{atomic::AtomicUsize, Arc}; use std::thread::Builder; use anyhow::Context; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::{bounded, unbounded}; use aquatic_common::access_list::update_access_list; @@ -32,7 +31,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let mut request_senders = Vec::new(); let mut request_receivers = BTreeMap::new(); @@ -96,7 +95,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let request_sender = ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone()); let response_receiver = response_receivers.remove(&i).unwrap(); - let num_bound_sockets = num_bound_sockets.clone(); + let priv_dropper = priv_dropper.clone(); Builder::new() .name(format!("socket-{:02}", i + 1)) @@ -115,7 +114,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { i, request_sender, response_receiver, - num_bound_sockets, + priv_dropper, ); }) .with_context(|| "spawn socket worker")?; @@ -141,13 +140,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { .with_context(|| "spawn statistics worker")?; } - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 6bf3487..0354b58 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -1,12 +1,10 @@ use std::collections::BTreeMap; use std::io::{Cursor, ErrorKind}; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use std::vec::Drain; +use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -157,12 +155,12 @@ pub fn run_socket_worker( token_num: usize, request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - num_bound_sockets: Arc, + priv_dropper: PrivilegeDropper, ) { let mut rng = StdRng::from_entropy(); let mut buffer = [0u8; MAX_PACKET_SIZE]; - let mut socket = UdpSocket::from_std(create_socket(&config)); + let mut socket = UdpSocket::from_std(create_socket(&config, priv_dropper)); let mut poll = Poll::new().expect("create poll"); let interests = Interest::READABLE; @@ -171,8 +169,6 @@ pub fn run_socket_worker( .register(&mut socket, Token(token_num), interests) .unwrap(); - num_bound_sockets.fetch_add(1, Ordering::SeqCst); - let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); @@ -520,7 +516,7 @@ fn send_response( } } -pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { +pub fn create_socket(config: &Config, priv_dropper: PrivilegeDropper) -> ::std::net::UdpSocket { let socket = if config.network.address.is_ipv4() { Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) } else { @@ -542,6 +538,8 @@ pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { .bind(&config.network.address.into()) .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + priv_dropper.after_socket_creation(); + let recv_buffer_size = config.network.socket_recv_buffer_size; if recv_buffer_size != 0 { diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 0ba6bbc..6671b00 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -2,7 +2,7 @@ pub mod common; pub mod config; pub mod workers; -use std::sync::{atomic::AtomicUsize, Arc}; +use std::sync::Arc; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; @@ -11,7 +11,7 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{consts::SIGUSR1, iterator::Signals}; use aquatic_common::access_list::update_access_list; -use aquatic_common::privileges::drop_privileges_after_socket_binding; +use aquatic_common::privileges::PrivilegeDropper; use common::*; use config::Config; @@ -61,7 +61,7 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let tls_config = Arc::new(create_rustls_config( &config.network.tls_certificate_path, @@ -76,7 +76,7 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { let tls_config = tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); - let num_bound_sockets = num_bound_sockets.clone(); + let priv_dropper = priv_dropper.clone(); let placement = get_worker_placement( &config.cpu_pinning, @@ -93,7 +93,7 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { tls_config, request_mesh_builder, response_mesh_builder, - num_bound_sockets, + priv_dropper, ) .await }); @@ -128,13 +128,6 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { executors.push(executor); } - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - if config.cpu_pinning.active { set_affinity_for_util_worker( &config.cpu_pinning, diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 7c121d4..0acdd56 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -3,11 +3,11 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::CanonicalSocketAddr; use aquatic_ws_protocol::*; @@ -53,14 +53,12 @@ pub async fn run_socket_worker( tls_config: Arc, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, - num_bound_sockets: Arc, + priv_dropper: PrivilegeDropper, ) { let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config); - - num_bound_sockets.fetch_add(1, Ordering::SeqCst); + let listener = create_tcp_listener(&config, priv_dropper); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let in_message_senders = Rc::new(in_message_senders); @@ -544,7 +542,7 @@ fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config) -> TcpListener { +fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> TcpListener { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { @@ -568,5 +566,7 @@ fn create_tcp_listener(config: &Config) -> TcpListener { .listen(config.network.tcp_backlog) .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + priv_dropper.after_socket_creation(); + unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } } From 6f30d0545398987fdee9d542b006c3d09a51633c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Apr 2022 01:33:02 +0200 Subject: [PATCH 02/23] Update TODO --- TODO.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/TODO.md b/TODO.md index aeb510a..5289208 100644 --- a/TODO.md +++ b/TODO.md @@ -2,10 +2,13 @@ ## High priority +* test priv dropping + * aquatic_http_private * Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead * stored procedure * test ip format + * check user token length * site will likely want num_seeders and num_leechers for all torrents.. ## Medium priority From 02ba4ec922698f623b44d7fef9347f5e56e481c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Apr 2022 23:03:54 +0200 Subject: [PATCH 03/23] privilege dropping: actually set group, default to "nogroup" --- aquatic_common/src/privileges.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index 4475830..d9db45f 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -27,7 +27,7 @@ impl Default for PrivilegeConfig { drop_privileges: false, chroot_path: ".".into(), user: "nobody".to_string(), - group: "nobody".to_string(), + group: "nogroup".to_string(), } } } @@ -51,7 +51,7 @@ impl PrivilegeDropper { if self.barrier.wait().is_leader() { PrivDrop::default() .chroot(self.config.chroot_path.clone()) - .user(self.config.user.clone()) + .group(self.config.group.clone()) .user(self.config.user.clone()) .apply() .expect("drop privileges"); From cb2f7483d3d846fed5fbd80994b976217522cbc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Apr 2022 23:15:29 +0200 Subject: [PATCH 04/23] Return anyhow::Result on PrivDropper waiting and socket creation --- aquatic_common/src/privileges.rs | 7 +++++-- aquatic_http/src/workers/socket.rs | 20 +++++++++---------- aquatic_udp/src/workers/socket.rs | 32 +++++++++++++++--------------- aquatic_ws/src/workers/socket.rs | 19 +++++++++--------- 4 files changed, 41 insertions(+), 37 deletions(-) diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index d9db45f..d252996 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, Barrier}, }; +use anyhow::Context; use privdrop::PrivDrop; use serde::Deserialize; @@ -46,7 +47,7 @@ impl PrivilegeDropper { } } - pub fn after_socket_creation(&self) { + pub fn after_socket_creation(&self) -> anyhow::Result<()> { if self.config.drop_privileges { if self.barrier.wait().is_leader() { PrivDrop::default() @@ -54,8 +55,10 @@ impl PrivilegeDropper { .group(self.config.group.clone()) .user(self.config.user.clone()) .apply() - .expect("drop privileges"); + .with_context(|| "drop privileges")?; } } + + Ok(()) } } diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 50e6e98..aee1f97 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -5,6 +5,7 @@ use std::rc::Rc; use std::sync::Arc; use std::time::{Duration, Instant}; +use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; @@ -63,7 +64,7 @@ pub async fn run_socket_worker( let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config, priv_dropper); + let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); let request_senders = Rc::new(request_senders); @@ -484,31 +485,30 @@ fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usi (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> TcpListener { +fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow::Result { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { socket2::Domain::IPV6 }; - let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) - .expect("create socket"); + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); + socket.set_only_v6(true).with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).expect("socket: set reuse port"); + socket.set_reuse_port(true).with_context(|| "socket: set reuse port")?; socket .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: bind to {}", config.network.address))?; socket .listen(config.network.tcp_backlog) - .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: listen on {}", config.network.address))?; - priv_dropper.after_socket_creation(); + priv_dropper.after_socket_creation()?; - unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } + Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 0354b58..ac9c556 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -4,6 +4,7 @@ use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use std::vec::Drain; +use anyhow::Context; use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::Receiver; use mio::net::UdpSocket; @@ -160,7 +161,7 @@ pub fn run_socket_worker( let mut rng = StdRng::from_entropy(); let mut buffer = [0u8; MAX_PACKET_SIZE]; - let mut socket = UdpSocket::from_std(create_socket(&config, priv_dropper)); + let mut socket = UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); let mut poll = Poll::new().expect("create poll"); let interests = Interest::READABLE; @@ -516,29 +517,22 @@ fn send_response( } } -pub fn create_socket(config: &Config, priv_dropper: PrivilegeDropper) -> ::std::net::UdpSocket { +pub fn create_socket(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow::Result<::std::net::UdpSocket> { let socket = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))? } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))? + }; if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); + socket.set_only_v6(true).with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).expect("socket: set reuse port"); + socket.set_reuse_port(true).with_context(|| "socket: set reuse port")?; socket .set_nonblocking(true) - .expect("socket: set nonblocking"); - - socket - .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); - - priv_dropper.after_socket_creation(); + .with_context(|| "socket: set nonblocking")?; let recv_buffer_size = config.network.socket_recv_buffer_size; @@ -552,7 +546,13 @@ pub fn create_socket(config: &Config, priv_dropper: PrivilegeDropper) -> ::std:: } } - socket.into() + socket + .bind(&config.network.address.into()) + .with_context(|| format!("socket: bind to {}", config.network.address))?; + + priv_dropper.after_socket_creation()?; + + Ok(socket.into()) } #[cfg(test)] diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 0acdd56..02974cd 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -6,6 +6,7 @@ use std::rc::Rc; use std::sync::Arc; use std::time::{Duration, Instant}; +use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; @@ -58,7 +59,7 @@ pub async fn run_socket_worker( let config = Rc::new(config); let access_list = state.access_list; - let listener = create_tcp_listener(&config, priv_dropper); + let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let in_message_senders = Rc::new(in_message_senders); @@ -542,7 +543,7 @@ fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> TcpListener { +fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow::Result { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { @@ -550,23 +551,23 @@ fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> TcpLi }; let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) - .expect("create socket"); + .with_context(|| "create socket")?; if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); + socket.set_only_v6(true).with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).expect("socket: set reuse port"); + socket.set_reuse_port(true).with_context(|| "socket: set reuse port")?; socket .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: bind to {}", config.network.address))?; socket .listen(config.network.tcp_backlog) - .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + .with_context(|| format!("socket: listen {}", config.network.address))?; - priv_dropper.after_socket_creation(); + priv_dropper.after_socket_creation()?; - unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } + Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) } From 49523779d905e3d1d87a75de1fb18bca9d376094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 00:41:15 +0200 Subject: [PATCH 05/23] common: add PanicSentinel, improve PrivilegeDropper anyhow context --- aquatic_common/src/lib.rs | 17 +++++++++++++++++ aquatic_common/src/privileges.rs | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index b243cfd..0a66014 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -30,6 +30,23 @@ impl ValidUntil { } } +/// Raises SIGTERM when dropped +/// +/// Pass to threads to have panics in them cause whole program to exit. +#[derive(Clone)] +pub struct PanicSentinel; + +impl Drop for PanicSentinel { + fn drop(&mut self) { + if unsafe { libc::raise(15) } == -1 { + panic!( + "Could not raise SIGTERM: {:#}", + ::std::io::Error::last_os_error() + ) + } + } +} + /// Extract response peers /// /// If there are more peers in map than `max_num_peers_to_take`, do a diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index d252996..eb41282 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -55,7 +55,7 @@ impl PrivilegeDropper { .group(self.config.group.clone()) .user(self.config.user.clone()) .apply() - .with_context(|| "drop privileges")?; + .with_context(|| "couldn't drop privileges after socket creation")?; } } From d0eec05d4ce0bd15a7649473d7ed4b2ccc19d530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 00:42:11 +0200 Subject: [PATCH 06/23] udp: use PanicSentinel --- aquatic_udp/src/lib.rs | 12 +++++++++--- aquatic_udp/src/workers/request.rs | 2 ++ aquatic_udp/src/workers/socket.rs | 19 ++++++++++++++----- aquatic_udp/src/workers/statistics.rs | 3 ++- aquatic_udp_bench/src/main.rs | 2 ++ 5 files changed, 29 insertions(+), 9 deletions(-) diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 5a805d8..663eba0 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -2,6 +2,7 @@ pub mod common; pub mod config; pub mod workers; +use aquatic_common::PanicSentinel; use config::Config; use std::collections::BTreeMap; @@ -14,7 +15,7 @@ use aquatic_common::privileges::PrivilegeDropper; use crossbeam_channel::{bounded, unbounded}; use aquatic_common::access_list::update_access_list; -use signal_hook::consts::SIGUSR1; +use signal_hook::consts::{SIGTERM, SIGUSR1}; use signal_hook::iterator::Signals; use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State}; @@ -29,7 +30,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); @@ -79,6 +80,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::request::run_request_worker( + PanicSentinel, config, state, request_receiver, @@ -109,6 +111,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( + PanicSentinel, state, config, i, @@ -135,7 +138,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker(config, state); + workers::statistics::run_statistics_worker(PanicSentinel, config, state); }) .with_context(|| "spawn statistics worker")?; } @@ -153,6 +156,9 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); } + SIGTERM => { + break; + } _ => unreachable!(), } } diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 447fab4..12eb6b9 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -11,6 +11,7 @@ use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::AmortizedIndexMap; use aquatic_common::CanonicalSocketAddr; +use aquatic_common::PanicSentinel; use aquatic_common::ValidUntil; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; @@ -121,6 +122,7 @@ impl TorrentMaps { } pub fn run_request_worker( + _sentinel: PanicSentinel, config: Config, state: State, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index ac9c556..c8b5e05 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -14,8 +14,8 @@ use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; -use aquatic_common::ValidUntil; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; +use aquatic_common::{PanicSentinel, ValidUntil}; use aquatic_udp_protocol::*; use socket2::{Domain, Protocol, Socket, Type}; @@ -151,6 +151,7 @@ impl PendingScrapeResponseSlab { } pub fn run_socket_worker( + _sentinel: PanicSentinel, state: State, config: Config, token_num: usize, @@ -161,7 +162,8 @@ pub fn run_socket_worker( let mut rng = StdRng::from_entropy(); let mut buffer = [0u8; MAX_PACKET_SIZE]; - let mut socket = UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); + let mut socket = + UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); let mut poll = Poll::new().expect("create poll"); let interests = Interest::READABLE; @@ -517,7 +519,10 @@ fn send_response( } } -pub fn create_socket(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow::Result<::std::net::UdpSocket> { +pub fn create_socket( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result<::std::net::UdpSocket> { let socket = if config.network.address.is_ipv4() { Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))? } else { @@ -525,10 +530,14 @@ pub fn create_socket(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow: }; if config.network.only_ipv6 { - socket.set_only_v6(true).with_context(|| "socket: set only ipv6")?; + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).with_context(|| "socket: set reuse port")?; + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; socket .set_nonblocking(true) diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index 20ec0e7..b54cbe8 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::Context; +use aquatic_common::PanicSentinel; use num_format::{Locale, ToFormattedString}; use serde::Serialize; use time::format_description::well_known::Rfc2822; @@ -135,7 +136,7 @@ struct TemplateData { peer_update_interval: String, } -pub fn run_statistics_worker(config: Config, state: State) { +pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: State) { let tt = if config.statistics.write_html_to_file { let mut tt = TinyTemplate::new(); diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 6f65bc9..046ab6d 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,6 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` +use aquatic_common::PanicSentinel; use aquatic_udp::workers::request::run_request_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; @@ -55,6 +56,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { ::std::thread::spawn(move || { run_request_worker( + PanicSentinel, config, state, request_receiver, From 3f46db01ff3a2eb80f335c63a700556e690196e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 01:14:47 +0200 Subject: [PATCH 07/23] Run cargo fmt --- aquatic_http/src/workers/socket.rs | 13 ++++++++++--- aquatic_ws/src/workers/socket.rs | 13 ++++++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index aee1f97..5271ec0 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -485,7 +485,10 @@ fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usi (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow::Result { +fn create_tcp_listener( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { @@ -495,10 +498,14 @@ fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> anyho let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; if config.network.only_ipv6 { - socket.set_only_v6(true).with_context(|| "socket: set only ipv6")?; + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).with_context(|| "socket: set reuse port")?; + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; socket .bind(&config.network.address.into()) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 02974cd..2da8d05 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -543,7 +543,10 @@ fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> (info_hash.0[0] as usize) % config.request_workers } -fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> anyhow::Result { +fn create_tcp_listener( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { let domain = if config.network.address.is_ipv4() { socket2::Domain::IPV4 } else { @@ -554,10 +557,14 @@ fn create_tcp_listener(config: &Config, priv_dropper: PrivilegeDropper) -> anyho .with_context(|| "create socket")?; if config.network.only_ipv6 { - socket.set_only_v6(true).with_context(|| "socket: set only ipv6")?; + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; } - socket.set_reuse_port(true).with_context(|| "socket: set reuse port")?; + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; socket .bind(&config.network.address.into()) From b61b136b0cc098766b89849441c4edc13ea47266 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 01:31:45 +0200 Subject: [PATCH 08/23] Add PanicSentinelWatcher Use to determine if SIGTERM was caused by panic in other thread --- aquatic_common/src/lib.rs | 21 ++++++++++++++++++++- aquatic_udp/src/lib.rs | 16 ++++++++++++---- aquatic_udp_bench/src/main.rs | 5 +++-- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 0a66014..79b91e3 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,4 +1,6 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::time::{Duration, Instant}; use ahash::RandomState; @@ -30,14 +32,31 @@ impl ValidUntil { } } +pub struct PanicSentinelWatcher(Arc); + +impl PanicSentinelWatcher { + pub fn create_with_sentinel() -> (Self, PanicSentinel) { + let triggered = Arc::new(AtomicBool::new(false)); + let sentinel = PanicSentinel(triggered.clone()); + + (Self(triggered), sentinel) + } + + pub fn panic_was_triggered(&self) -> bool { + self.0.load(Ordering::SeqCst) + } +} + /// Raises SIGTERM when dropped /// /// Pass to threads to have panics in them cause whole program to exit. #[derive(Clone)] -pub struct PanicSentinel; +pub struct PanicSentinel(Arc); impl Drop for PanicSentinel { fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + if unsafe { libc::raise(15) } == -1 { panic!( "Could not raise SIGTERM: {:#}", diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 663eba0..4b02340 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -2,7 +2,7 @@ pub mod common; pub mod config; pub mod workers; -use aquatic_common::PanicSentinel; +use aquatic_common::PanicSentinelWatcher; use config::Config; use std::collections::BTreeMap; @@ -32,6 +32,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let mut request_senders = Vec::new(); @@ -63,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.request_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); @@ -80,7 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::request::run_request_worker( - PanicSentinel, + sentinel, config, state, request_receiver, @@ -92,6 +94,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } for i in 0..config.socket_workers { + let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); let request_sender = @@ -111,7 +114,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { ); workers::socket::run_socket_worker( - PanicSentinel, + sentinel, state, config, i, @@ -124,6 +127,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } if config.statistics.active() { + let sentinel = sentinel.clone(); let state = state.clone(); let config = config.clone(); @@ -138,7 +142,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::Util, ); - workers::statistics::run_statistics_worker(PanicSentinel, config, state); + workers::statistics::run_statistics_worker(sentinel, config, state); }) .with_context(|| "spawn statistics worker")?; } @@ -157,6 +161,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let _ = update_access_list(&config.access_list, &state.access_list); } SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } + break; } _ => unreachable!(), diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 046ab6d..7a83e5b 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,7 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` -use aquatic_common::PanicSentinel; +use aquatic_common::PanicSentinelWatcher; use aquatic_udp::workers::request::run_request_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; @@ -42,6 +42,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers let mut aquatic_config = Config::default(); + let (_, sentinel) = PanicSentinelWatcher::create_with_sentinel(); aquatic_config.cleaning.torrent_cleaning_interval = 60 * 60 * 24; @@ -56,7 +57,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { ::std::thread::spawn(move || { run_request_worker( - PanicSentinel, + sentinel, config, state, request_receiver, From 10997596fac76cee59090ce58276f28af107a780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 01:52:06 +0200 Subject: [PATCH 09/23] PanicSentinel: send SIGTERM only once --- aquatic_common/src/lib.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 79b91e3..a5f25aa 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -55,13 +55,15 @@ pub struct PanicSentinel(Arc); impl Drop for PanicSentinel { fn drop(&mut self) { - self.0.store(true, Ordering::SeqCst); + let already_triggered = self.0.fetch_or(true, Ordering::SeqCst); - if unsafe { libc::raise(15) } == -1 { - panic!( - "Could not raise SIGTERM: {:#}", - ::std::io::Error::last_os_error() - ) + if !already_triggered { + if unsafe { libc::raise(15) } == -1 { + panic!( + "Could not raise SIGTERM: {:#}", + ::std::io::Error::last_os_error() + ) + } } } } From 94ee4027e84bdd10aed3570b11930a148cab42fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 01:53:00 +0200 Subject: [PATCH 10/23] Update TODO --- TODO.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/TODO.md b/TODO.md index 5289208..115af5a 100644 --- a/TODO.md +++ b/TODO.md @@ -15,6 +15,12 @@ * rename request workers to swarm workers * quit whole program if any thread panics + * Once JoinHandle::is_finished is available in stable Rust (#90470) + * Save JoinHandles + * When preparing to quit because of PanicSentinel sending SIGTERM, loop + through them, extract error and log it + * Dont use std::process::exit unless worker thread destructors have already + been called * config: fail on unrecognized keys? * Run cargo-deny in CI From ffa7c7532fa0845864df520f67934283128d9cf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 01:56:45 +0200 Subject: [PATCH 11/23] PanicSentinel: only set flag if dropped while panicking --- aquatic_common/src/lib.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index a5f25aa..c3cf62e 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -55,14 +55,16 @@ pub struct PanicSentinel(Arc); impl Drop for PanicSentinel { fn drop(&mut self) { - let already_triggered = self.0.fetch_or(true, Ordering::SeqCst); + if ::std::thread::panicking() { + let already_triggered = self.0.fetch_or(true, Ordering::SeqCst); - if !already_triggered { - if unsafe { libc::raise(15) } == -1 { - panic!( - "Could not raise SIGTERM: {:#}", - ::std::io::Error::last_os_error() - ) + if !already_triggered { + if unsafe { libc::raise(15) } == -1 { + panic!( + "Could not raise SIGTERM: {:#}", + ::std::io::Error::last_os_error() + ) + } } } } From b4d1c465958e5c58fa615d64e4fb8ed072399599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 02:23:06 +0200 Subject: [PATCH 12/23] Update TODO --- TODO.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/TODO.md b/TODO.md index 115af5a..5e9e648 100644 --- a/TODO.md +++ b/TODO.md @@ -15,12 +15,12 @@ * rename request workers to swarm workers * quit whole program if any thread panics - * Once JoinHandle::is_finished is available in stable Rust (#90470) + * But it would be nice not to panic in workers, but to return errors instead. + Once JoinHandle::is_finished is available in stable Rust (#90470), an + option would be to * Save JoinHandles * When preparing to quit because of PanicSentinel sending SIGTERM, loop through them, extract error and log it - * Dont use std::process::exit unless worker thread destructors have already - been called * config: fail on unrecognized keys? * Run cargo-deny in CI From 117244f1c797227e08aa5f2a8ea2f96710b11c12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 18:38:50 +0200 Subject: [PATCH 13/23] aquatic_http: use PanicSentinel, simplify lib.rs --- aquatic_http/src/lib.rs | 106 +++++++++++++--------------- aquatic_http/src/workers/request.rs | 3 +- aquatic_http/src/workers/socket.rs | 3 +- 3 files changed, 54 insertions(+), 58 deletions(-) diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 7ce2065..f1b8ca7 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -6,10 +6,14 @@ use aquatic_common::{ }, privileges::PrivilegeDropper, rustls_config::create_rustls_config, + PanicSentinelWatcher, }; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; +use signal_hook::{ + consts::{SIGTERM, SIGUSR1}, + iterator::Signals, +}; use std::sync::Arc; use crate::config::Config; @@ -24,45 +28,18 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); const SHARED_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; - - { - let config = config.clone(); - let state = state.clone(); - - ::std::thread::spawn(move || run_inner(config, state)); - } - - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - )?; - } - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) -} - -pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let tls_config = Arc::new(create_rustls_config( @@ -73,6 +50,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let mut executors = Vec::new(); for i in 0..(config.socket_workers) { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let tls_config = tls_config.clone(); @@ -88,22 +66,26 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { )?; let builder = LocalExecutorBuilder::new(placement).name("socket"); - let executor = builder.spawn(move || async move { - workers::socket::run_socket_worker( - config, - state, - tls_config, - request_mesh_builder, - response_mesh_builder, - priv_dropper, - ) - .await - }); + let executor = builder + .spawn(move || async move { + workers::socket::run_socket_worker( + sentinel, + config, + state, + tls_config, + request_mesh_builder, + response_mesh_builder, + priv_dropper, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; executors.push(executor); } for i in 0..(config.request_workers) { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); @@ -117,15 +99,18 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { )?; let builder = LocalExecutorBuilder::new(placement).name("request"); - let executor = builder.spawn(move || async move { - workers::request::run_request_worker( - config, - state, - request_mesh_builder, - response_mesh_builder, - ) - .await - }); + let executor = builder + .spawn(move || async move { + workers::request::run_request_worker( + sentinel, + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; executors.push(executor); } @@ -138,11 +123,20 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { )?; } - for executor in executors { - executor - .expect("failed to spawn local executor") - .join() - .unwrap(); + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } else { + return Ok(()); + } + } + _ => unreachable!(), + } } Ok(()) diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/request.rs index 8ac677e..e664c4e 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/request.rs @@ -17,8 +17,8 @@ use rand::SeedableRng; use smartstring::{LazyCompact, SmartString}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::extract_response_peers; use aquatic_common::ValidUntil; +use aquatic_common::{extract_response_peers, PanicSentinel}; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::*; @@ -175,6 +175,7 @@ impl TorrentMaps { } pub async fn run_request_worker( + _sentinel: PanicSentinel, config: Config, state: State, request_mesh_builder: MeshBuilder, diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 5271ec0..4b7a7f6 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -9,7 +9,7 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::CanonicalSocketAddr; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ @@ -54,6 +54,7 @@ struct ConnectionReference { } pub async fn run_socket_worker( + _sentinel: PanicSentinel, config: Config, state: State, tls_config: Arc, From f50c8970b5e8e2df3ea2e50a346d2090628ea0a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 18:44:09 +0200 Subject: [PATCH 14/23] aquatic_ws: use PanicSentinel, simplify lib.rs --- aquatic_ws/src/lib.rs | 106 ++++++++++++++---------------- aquatic_ws/src/workers/request.rs | 3 +- aquatic_ws/src/workers/socket.rs | 3 +- 3 files changed, 54 insertions(+), 58 deletions(-) diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 6671b00..b6ffc22 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -7,8 +7,12 @@ use std::sync::Arc; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; +use aquatic_common::PanicSentinelWatcher; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -use signal_hook::{consts::SIGUSR1, iterator::Signals}; +use signal_hook::{ + consts::{SIGTERM, SIGUSR1}, + iterator::Signals, +}; use aquatic_common::access_list::update_access_list; use aquatic_common::privileges::PrivilegeDropper; @@ -22,45 +26,18 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; + let state = State::default(); update_access_list(&config.access_list, &state.access_list)?; - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; - - { - let config = config.clone(); - let state = state.clone(); - - ::std::thread::spawn(move || run_workers(config, state)); - } - - if config.cpu_pinning.active { - set_affinity_for_util_worker( - &config.cpu_pinning, - config.socket_workers, - config.request_workers, - )?; - } - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) -} - -fn run_workers(config: Config, state: State) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let tls_config = Arc::new(create_rustls_config( @@ -71,6 +48,7 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { let mut executors = Vec::new(); for i in 0..(config.socket_workers) { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let tls_config = tls_config.clone(); @@ -86,22 +64,26 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { )?; let builder = LocalExecutorBuilder::new(placement).name("socket"); - let executor = builder.spawn(move || async move { - workers::socket::run_socket_worker( - config, - state, - tls_config, - request_mesh_builder, - response_mesh_builder, - priv_dropper, - ) - .await - }); + let executor = builder + .spawn(move || async move { + workers::socket::run_socket_worker( + sentinel, + config, + state, + tls_config, + request_mesh_builder, + response_mesh_builder, + priv_dropper, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; executors.push(executor); } for i in 0..(config.request_workers) { + let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); let request_mesh_builder = request_mesh_builder.clone(); @@ -115,15 +97,18 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { )?; let builder = LocalExecutorBuilder::new(placement).name("request"); - let executor = builder.spawn(move || async move { - workers::request::run_request_worker( - config, - state, - request_mesh_builder, - response_mesh_builder, - ) - .await - }); + let executor = builder + .spawn(move || async move { + workers::request::run_request_worker( + sentinel, + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await + }) + .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; executors.push(executor); } @@ -136,11 +121,20 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { )?; } - for executor in executors { - executor - .expect("failed to spawn local executor") - .join() - .unwrap(); + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } else { + return Ok(()); + } + } + _ => unreachable!(), + } } Ok(()) diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/request.rs index 693ac7d..92f07ad 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/request.rs @@ -12,7 +12,7 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{extract_response_peers, AmortizedIndexMap}; +use aquatic_common::{extract_response_peers, AmortizedIndexMap, PanicSentinel}; use aquatic_ws_protocol::*; use crate::common::*; @@ -128,6 +128,7 @@ impl TorrentMaps { } pub async fn run_request_worker( + _sentinel: PanicSentinel, config: Config, state: State, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 2da8d05..cfda49a 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -10,7 +10,7 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::CanonicalSocketAddr; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -49,6 +49,7 @@ struct ConnectionReference { } pub async fn run_socket_worker( + _sentinel: PanicSentinel, config: Config, state: State, tls_config: Arc, From a4c7e79dc9e718e0f84888b9c240be2042865a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 18:48:23 +0200 Subject: [PATCH 15/23] http_private: use PanicSentinel --- Cargo.lock | 1 + aquatic_http_private/Cargo.toml | 1 + aquatic_http_private/src/lib.rs | 29 ++++++++++++++----- .../src/workers/request/mod.rs | 3 +- .../src/workers/socket/mod.rs | 3 +- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c33496..e9141fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,7 @@ dependencies = [ "rand", "rustls 0.20.4", "serde", + "signal-hook", "socket2 0.4.4", "sqlx", "tokio", diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index 0d59f6a..7ec86c8 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -30,6 +30,7 @@ mimalloc = { version = "0.1", default-features = false } rand = { version = "0.8", features = ["small_rng"] } rustls = "0.20" serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } socket2 = { version = "0.4", features = ["all"] } sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } tokio = { version = "1", features = ["full"] } diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index 8e9d7a7..45d3170 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -4,9 +4,10 @@ mod workers; use std::{collections::VecDeque, sync::Arc}; -use aquatic_common::rustls_config::create_rustls_config; +use aquatic_common::{rustls_config::create_rustls_config, PanicSentinelWatcher}; use common::ChannelRequestSender; use dotenv::dotenv; +use signal_hook::{consts::SIGTERM, iterator::Signals}; use tokio::sync::mpsc::channel; use config::Config; @@ -15,6 +16,8 @@ pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tr pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> anyhow::Result<()> { + let mut signals = Signals::new([SIGTERM])?; + dotenv().ok(); let tls_config = Arc::new(create_rustls_config( @@ -32,9 +35,11 @@ pub fn run(config: Config) -> anyhow::Result<()> { request_receivers.push_back(request_receiver); } + let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let mut handles = Vec::new(); for _ in 0..config.socket_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let tls_config = tls_config.clone(); let request_sender = ChannelRequestSender::new(request_senders.clone()); @@ -42,27 +47,37 @@ pub fn run(config: Config) -> anyhow::Result<()> { let handle = ::std::thread::Builder::new() .name("socket".into()) .spawn(move || { - workers::socket::run_socket_worker(config, tls_config, request_sender) + workers::socket::run_socket_worker(sentinel, config, tls_config, request_sender) })?; handles.push(handle); } for _ in 0..config.request_workers { + let sentinel = sentinel.clone(); let config = config.clone(); let request_receiver = request_receivers.pop_front().unwrap(); let handle = ::std::thread::Builder::new() .name("request".into()) - .spawn(move || workers::request::run_request_worker(config, request_receiver))?; + .spawn(move || { + workers::request::run_request_worker(sentinel, config, request_receiver) + })?; handles.push(handle); } - for handle in handles { - handle - .join() - .map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??; + for signal in &mut signals { + match signal { + SIGTERM => { + if sentinel_watcher.panic_was_triggered() { + return Err(anyhow::anyhow!("worker thread panicked")); + } else { + return Ok(()); + } + } + _ => unreachable!(), + } } Ok(()) diff --git a/aquatic_http_private/src/workers/request/mod.rs b/aquatic_http_private/src/workers/request/mod.rs index 358ead6..c684256 100644 --- a/aquatic_http_private/src/workers/request/mod.rs +++ b/aquatic_http_private/src/workers/request/mod.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::Receiver; use tokio::task::LocalSet; use tokio::time; -use aquatic_common::{extract_response_peers, CanonicalSocketAddr, ValidUntil}; +use aquatic_common::{extract_response_peers, CanonicalSocketAddr, PanicSentinel, ValidUntil}; use aquatic_http_protocol::response::{ AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6, }; @@ -22,6 +22,7 @@ use crate::config::Config; use common::*; pub fn run_request_worker( + _sentinel: PanicSentinel, config: Config, request_receiver: Receiver, ) -> anyhow::Result<()> { diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs index 2b142c7..13304aa 100644 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::Context; -use aquatic_common::rustls_config::RustlsConfig; +use aquatic_common::{rustls_config::RustlsConfig, PanicSentinel}; use axum::{extract::connect_info::Connected, routing::get, Extension, Router}; use hyper::server::conn::AddrIncoming; use sqlx::mysql::MySqlPoolOptions; @@ -23,6 +23,7 @@ impl<'a> Connected<&'a tls::TlsStream> for SocketAddr { } pub fn run_socket_worker( + _sentinel: PanicSentinel, config: Config, tls_config: Arc, request_sender: ChannelRequestSender, From 87bfec5e55487aea976691ff40552242d1bfa94b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 18:51:56 +0200 Subject: [PATCH 16/23] http_private: use PrivilegeDropper --- aquatic_http_private/src/lib.rs | 15 +++++++++++++-- aquatic_http_private/src/workers/socket/mod.rs | 12 +++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index 45d3170..88609e1 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -4,7 +4,9 @@ mod workers; use std::{collections::VecDeque, sync::Arc}; -use aquatic_common::{rustls_config::create_rustls_config, PanicSentinelWatcher}; +use aquatic_common::{ + privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher, +}; use common::ChannelRequestSender; use dotenv::dotenv; use signal_hook::{consts::SIGTERM, iterator::Signals}; @@ -36,6 +38,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); + let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let mut handles = Vec::new(); for _ in 0..config.socket_workers { @@ -43,11 +47,18 @@ pub fn run(config: Config) -> anyhow::Result<()> { let config = config.clone(); let tls_config = tls_config.clone(); let request_sender = ChannelRequestSender::new(request_senders.clone()); + let priv_dropper = priv_dropper.clone(); let handle = ::std::thread::Builder::new() .name("socket".into()) .spawn(move || { - workers::socket::run_socket_worker(sentinel, config, tls_config, request_sender) + workers::socket::run_socket_worker( + sentinel, + config, + tls_config, + request_sender, + priv_dropper, + ) })?; handles.push(handle); diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs index 13304aa..24e561b 100644 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::Context; -use aquatic_common::{rustls_config::RustlsConfig, PanicSentinel}; +use aquatic_common::{privileges::PrivilegeDropper, rustls_config::RustlsConfig, PanicSentinel}; use axum::{extract::connect_info::Connected, routing::get, Extension, Router}; use hyper::server::conn::AddrIncoming; use sqlx::mysql::MySqlPoolOptions; @@ -27,8 +27,9 @@ pub fn run_socket_worker( config: Config, tls_config: Arc, request_sender: ChannelRequestSender, + priv_dropper: PrivilegeDropper, ) -> anyhow::Result<()> { - let tcp_listener = create_tcp_listener(config.network.address)?; + let tcp_listener = create_tcp_listener(config.network.address, priv_dropper)?; let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -72,7 +73,10 @@ async fn run_app( Ok(()) } -fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result { +fn create_tcp_listener( + addr: SocketAddr, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { let domain = if addr.is_ipv4() { socket2::Domain::IPV4 } else { @@ -94,5 +98,7 @@ fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result { .listen(1024) .with_context(|| format!("listen on {}", addr))?; + priv_dropper.after_socket_creation()?; + Ok(socket.into()) } From 745a85f71da6ecb8ccce8e6043a954797932abf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 18:53:11 +0200 Subject: [PATCH 17/23] PrivilegeDropper::after_socket_creation: take ownership of self --- aquatic_common/src/privileges.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index eb41282..1e18b9f 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -47,7 +47,7 @@ impl PrivilegeDropper { } } - pub fn after_socket_creation(&self) -> anyhow::Result<()> { + pub fn after_socket_creation(self) -> anyhow::Result<()> { if self.config.drop_privileges { if self.barrier.wait().is_leader() { PrivDrop::default() From b677a104d3d9ecd49e67d19d2f124cf272fb4a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 18:55:13 +0200 Subject: [PATCH 18/23] cli helpers: change default log level to warn --- aquatic_cli_helpers/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_cli_helpers/src/lib.rs b/aquatic_cli_helpers/src/lib.rs index 8455856..4763092 100644 --- a/aquatic_cli_helpers/src/lib.rs +++ b/aquatic_cli_helpers/src/lib.rs @@ -22,7 +22,7 @@ pub enum LogLevel { impl Default for LogLevel { fn default() -> Self { - Self::Error + Self::Warn } } From bd6764afaf913f76992df14e6cc1b0561f836b04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 19:02:39 +0200 Subject: [PATCH 19/23] Move aquatic_cli_helpers code into aquatic_common and remove crate --- Cargo.lock | 26 +++---------------- Cargo.toml | 1 - aquatic/Cargo.toml | 2 +- aquatic/src/main.rs | 2 +- aquatic_cli_helpers/Cargo.toml | 19 -------------- aquatic_common/Cargo.toml | 3 +++ .../src/lib.rs => aquatic_common/src/cli.rs | 0 aquatic_common/src/lib.rs | 1 + aquatic_http/Cargo.toml | 1 - aquatic_http/src/config.rs | 4 +-- aquatic_http/src/main.rs | 2 +- aquatic_http_load_test/Cargo.toml | 1 - aquatic_http_load_test/src/config.rs | 4 +-- aquatic_http_load_test/src/main.rs | 2 +- aquatic_http_private/Cargo.toml | 1 - aquatic_http_private/src/config.rs | 4 +-- aquatic_http_private/src/main.rs | 2 +- aquatic_udp/Cargo.toml | 1 - aquatic_udp/src/config.rs | 4 +-- aquatic_udp/src/main.rs | 2 +- aquatic_udp_bench/Cargo.toml | 1 - aquatic_udp_bench/src/config.rs | 2 +- aquatic_udp_bench/src/main.rs | 2 +- aquatic_udp_load_test/Cargo.toml | 1 - aquatic_udp_load_test/src/config.rs | 2 +- aquatic_udp_load_test/src/main.rs | 6 ++--- aquatic_ws/Cargo.toml | 1 - aquatic_ws/src/config.rs | 4 +-- aquatic_ws/src/main.rs | 2 +- aquatic_ws_load_test/Cargo.toml | 1 - aquatic_ws_load_test/src/config.rs | 4 +-- aquatic_ws_load_test/src/main.rs | 2 +- 32 files changed, 34 insertions(+), 76 deletions(-) delete mode 100644 aquatic_cli_helpers/Cargo.toml rename aquatic_cli_helpers/src/lib.rs => aquatic_common/src/cli.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index e9141fb..ef35171 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,26 +47,13 @@ checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27" name = "aquatic" version = "0.2.0" dependencies = [ - "aquatic_cli_helpers", + "aquatic_common", "aquatic_http", "aquatic_udp", "aquatic_ws", "mimalloc", ] -[[package]] -name = "aquatic_cli_helpers" -version = "0.2.0" -dependencies = [ - "anyhow", - "aquatic_toml_config", - "git-testament", - "log", - "serde", - "simple_logger", - "toml", -] - [[package]] name = "aquatic_common" version = "0.2.0" @@ -76,6 +63,7 @@ dependencies = [ "aquatic_toml_config", "arc-swap", "duplicate", + "git-testament", "glommio", "hashbrown 0.12.0", "hex", @@ -88,6 +76,8 @@ dependencies = [ "rustls 0.20.4", "rustls-pemfile", "serde", + "simple_logger", + "toml", ] [[package]] @@ -95,7 +85,6 @@ name = "aquatic_http" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", @@ -127,7 +116,6 @@ name = "aquatic_http_load_test" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", @@ -149,7 +137,6 @@ name = "aquatic_http_private" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", @@ -217,7 +204,6 @@ name = "aquatic_udp" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", @@ -244,7 +230,6 @@ name = "aquatic_udp_bench" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_udp", @@ -263,7 +248,6 @@ name = "aquatic_udp_load_test" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", @@ -293,7 +277,6 @@ name = "aquatic_ws" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_ws_protocol", @@ -325,7 +308,6 @@ name = "aquatic_ws_load_test" version = "0.2.0" dependencies = [ "anyhow", - "aquatic_cli_helpers", "aquatic_common", "aquatic_toml_config", "aquatic_ws_protocol", diff --git a/Cargo.toml b/Cargo.toml index 0ee9fc5..2092976 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "aquatic", - "aquatic_cli_helpers", "aquatic_common", "aquatic_http", "aquatic_http_load_test", diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index 2fbe263..c876fad 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" name = "aquatic" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_http = { version = "0.2.0", path = "../aquatic_http" } aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" } aquatic_ws = { version = "0.2.0", path = "../aquatic_ws" } diff --git a/aquatic/src/main.rs b/aquatic/src/main.rs index c320e41..97fba0f 100644 --- a/aquatic/src/main.rs +++ b/aquatic/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::{print_help, run_app_with_cli_and_config, Options}; +use aquatic_common::cli::{print_help, run_app_with_cli_and_config, Options}; use aquatic_http::config::Config as HttpConfig; use aquatic_udp::config::Config as UdpConfig; use aquatic_ws::config::Config as WsConfig; diff --git a/aquatic_cli_helpers/Cargo.toml b/aquatic_cli_helpers/Cargo.toml deleted file mode 100644 index 7e93b6c..0000000 --- a/aquatic_cli_helpers/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "aquatic_cli_helpers" -version = "0.2.0" -authors = ["Joakim Frostegård "] -edition = "2021" -license = "Apache-2.0" -description = "aquatic BitTorrent tracker CLI helpers" -repository = "https://github.com/greatest-ape/aquatic" -readme = "../README.md" - -[dependencies] -aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } - -anyhow = "1" -git-testament = "0.2" -log = "0.4" -serde = { version = "1", features = ["derive"] } -simple_logger = { version = "2", features = ["stderr"] } -toml = "0.5" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 1f6eb26..a487088 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -23,6 +23,7 @@ ahash = "0.7" anyhow = "1" arc-swap = "1" duplicate = "0.4" +git-testament = "0.2" hashbrown = "0.12" hex = "0.4" indexmap-amortized = "1" @@ -31,6 +32,8 @@ log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +simple_logger = { version = "2", features = ["stderr"] } +toml = "0.5" # Optional glommio = { version = "0.7", optional = true } diff --git a/aquatic_cli_helpers/src/lib.rs b/aquatic_common/src/cli.rs similarity index 100% rename from aquatic_cli_helpers/src/lib.rs rename to aquatic_common/src/cli.rs diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index c3cf62e..6888649 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -7,6 +7,7 @@ use ahash::RandomState; use rand::Rng; pub mod access_list; +pub mod cli; pub mod cpu_pinning; pub mod privileges; #[cfg(feature = "rustls-config")] diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index b9b1152..5809cd7 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -16,7 +16,6 @@ name = "aquatic_http" name = "aquatic_http" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] } aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 0cbcea9..d3cceaa 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -7,7 +7,7 @@ use aquatic_common::{ use aquatic_toml_config::TomlConfig; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; /// aquatic_http configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] @@ -45,7 +45,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_http/src/main.rs b/aquatic_http/src/main.rs index 0d4b626..7f40c2c 100644 --- a/aquatic_http/src/main.rs +++ b/aquatic_http/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_http::config::Config; #[global_allocator] diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 625a157..8403959 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -13,7 +13,6 @@ readme = "../README.md" name = "aquatic_http_load_test" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] } aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index d8906df..d104c41 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -26,7 +26,7 @@ pub struct Config { pub cpu_pinning: CpuPinningConfigDesc, } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index b7e7978..4b060e0 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -24,7 +24,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; const MBITS_FACTOR: f64 = 1.0 / ((1024.0 * 1024.0) / 8.0); pub fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( "aquatic_http_load_test: BitTorrent load tester", env!("CARGO_PKG_VERSION"), run, diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index 7ec86c8..7d400f2 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -14,7 +14,6 @@ name = "aquatic_http_private" name = "aquatic_http_private" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] } aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol", features = ["with-axum"] } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index 094c15e..ab1b0b2 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -4,7 +4,7 @@ use aquatic_common::privileges::PrivilegeConfig; use aquatic_toml_config::TomlConfig; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; /// aquatic_http_private configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] @@ -42,7 +42,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_http_private/src/main.rs b/aquatic_http_private/src/main.rs index c26aaeb..caf3cbc 100644 --- a/aquatic_http_private/src/main.rs +++ b/aquatic_http_private/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_http_private::config::Config; #[global_allocator] diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 970b717..ed6686b 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -19,7 +19,6 @@ name = "aquatic_udp" cpu-pinning = ["aquatic_common/with-hwloc"] [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 01a0917..eb3f3d1 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf}; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_udp configuration @@ -58,7 +58,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_udp/src/main.rs b/aquatic_udp/src/main.rs index b6df27c..9ec75cc 100644 --- a/aquatic_udp/src/main.rs +++ b/aquatic_udp/src/main.rs @@ -2,7 +2,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( aquatic_udp::APP_NAME, aquatic_udp::APP_VERSION, aquatic_udp::run, diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index 6a6e7e3..d844f28 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -11,7 +11,6 @@ readme = "../README.md" name = "aquatic_udp_bench" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" } diff --git a/aquatic_udp_bench/src/config.rs b/aquatic_udp_bench/src/config.rs index 2b4b2f8..a1425d8 100644 --- a/aquatic_udp_bench/src/config.rs +++ b/aquatic_udp_bench/src/config.rs @@ -24,7 +24,7 @@ impl Default for BenchConfig { } } -impl aquatic_cli_helpers::Config for BenchConfig {} +impl aquatic_common::cli::Config for BenchConfig {} #[cfg(test)] mod tests { diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 7a83e5b..89ca6da 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -14,7 +14,7 @@ use num_format::{Locale, ToFormattedString}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; use std::time::Duration; -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; use aquatic_udp_protocol::*; diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index 31029b7..662676b 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -16,7 +16,6 @@ cpu-pinning = ["aquatic_common/with-hwloc"] name = "aquatic_udp_load_test" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common" } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index e803eb8..a189307 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index dd6ad57..27ddd6a 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -22,7 +22,7 @@ use worker::*; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; pub fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( "aquatic_udp_load_test: BitTorrent load tester", env!("CARGO_PKG_VERSION"), run, @@ -30,8 +30,8 @@ pub fn main() { ) } -impl aquatic_cli_helpers::Config for Config { - fn get_log_level(&self) -> Option { +impl aquatic_common::cli::Config for Config { + fn get_log_level(&self) -> Option { Some(self.log_level) } } diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 0add3fa..65491c7 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -16,7 +16,6 @@ name = "aquatic_ws" name = "aquatic_ws" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index b1ea961..fbfe8bf 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -5,7 +5,7 @@ use aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::Deserialize; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration @@ -44,7 +44,7 @@ impl Default for Config { } } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_ws/src/main.rs b/aquatic_ws/src/main.rs index bd241e3..cb8b58f 100644 --- a/aquatic_ws/src/main.rs +++ b/aquatic_ws/src/main.rs @@ -1,4 +1,4 @@ -use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_common::cli::run_app_with_cli_and_config; use aquatic_ws::config::Config; #[global_allocator] diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index 82496b7..457a4d2 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -13,7 +13,6 @@ readme = "../README.md" name = "aquatic_ws_load_test" [dependencies] -aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] } aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" } diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 9949c65..9af7baf 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use aquatic_cli_helpers::LogLevel; +use aquatic_common::cli::LogLevel; use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc; use aquatic_toml_config::TomlConfig; use serde::Deserialize; @@ -19,7 +19,7 @@ pub struct Config { pub cpu_pinning: CpuPinningConfigDesc, } -impl aquatic_cli_helpers::Config for Config { +impl aquatic_common::cli::Config for Config { fn get_log_level(&self) -> Option { Some(self.log_level) } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index f4f16eb..1603c93 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -21,7 +21,7 @@ use network::*; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; pub fn main() { - aquatic_cli_helpers::run_app_with_cli_and_config::( + aquatic_common::cli::run_app_with_cli_and_config::( "aquatic_ws_load_test: WebTorrent load tester", env!("CARGO_PKG_VERSION"), run, From 5fb91ccc739cef3177242f375aa25db5b8181335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 19:09:46 +0200 Subject: [PATCH 20/23] http_private: db_connections_per_worker: add comment, default to 4 --- aquatic_http_private/src/config.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index ab1b0b2..b800cc8 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -18,6 +18,7 @@ pub struct Config { /// generate responses and send them back to the socket workers. pub request_workers: usize, pub worker_channel_size: usize, + /// Number of database connections to establish in each socket worker pub db_connections_per_worker: u32, pub log_level: LogLevel, pub network: NetworkConfig, @@ -32,7 +33,7 @@ impl Default for Config { socket_workers: 1, request_workers: 1, worker_channel_size: 128, - db_connections_per_worker: 1, + db_connections_per_worker: 4, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), From 0a63ee3ce739567f2472a1a414a9aeeff9ddceec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 19:15:01 +0200 Subject: [PATCH 21/23] load testers: name worker threads "load-test" --- aquatic_http_load_test/src/main.rs | 1 + aquatic_udp_load_test/src/main.rs | 6 +++--- aquatic_ws_load_test/src/main.rs | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 4b060e0..1955885 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -70,6 +70,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { )?; LocalExecutorBuilder::new(placement) + .name("load-test") .spawn(move || async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 27ddd6a..7066b5f 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -1,7 +1,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; -use std::thread; +use std::thread::{self, Builder}; use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] @@ -79,7 +79,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { let config = config.clone(); let state = state.clone(); - thread::spawn(move || { + Builder::new().name("load-test".into()).spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, @@ -89,7 +89,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { ); run_worker_thread(state, pareto, &config, addr) - }); + })?; } #[cfg(feature = "cpu-pinning")] diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 1603c93..c660267 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -67,6 +67,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { )?; LocalExecutorBuilder::new(placement) + .name("load-test") .spawn(move || async move { run_socket_thread(config, tls_config, state).await.unwrap(); }) From 3746fa76ec27b640f83e95753aa2188ec067a915 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 19:16:50 +0200 Subject: [PATCH 22/23] udp: name statistics worker thread "statistics" --- aquatic_udp/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 4b02340..db644b9 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -132,7 +132,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let config = config.clone(); Builder::new() - .name("statistics-collector".to_string()) + .name("statistics".into()) .spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( From 113fe8f5fde7f8b1f01dac6f2a443700703ebfc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 6 Apr 2022 19:55:12 +0200 Subject: [PATCH 23/23] Update TODO --- TODO.md | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/TODO.md b/TODO.md index 5e9e648..ef68315 100644 --- a/TODO.md +++ b/TODO.md @@ -2,8 +2,6 @@ ## High priority -* test priv dropping - * aquatic_http_private * Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead * stored procedure @@ -56,9 +54,6 @@ # Not important * aquatic_http: - * optimize? - * get_peer_addr only once (takes 1.2% of runtime) - * queue response: allocating takes 2.8% of runtime * consider better error type for request parsing, so that better error messages can be sent back (e.g., "full scrapes are not supported") * test torrent transfer with real clients @@ -67,15 +62,6 @@ positive number. * aquatic_ws - * mio - * shard torrent state. this could decrease dropped messages too, since - request handlers won't send large batches of them - * connection cleaning interval - * use access list cache - * use write event interest for handshakes too - * deregistering before closing is required by mio, but it hurts performance - * blocked on https://github.com/snapview/tungstenite-rs/issues/51 - * connection closing: send tls close message etc? * write new version of extract_response_peers which checks for equality with peer sending request???