Merge pull request #66 from greatest-ape/privdrop-2022-04-04

Improve privilege dropping; quit whole program if any thread panics; other improvements
This commit is contained in:
Joakim Frostegård 2022-04-06 19:55:29 +02:00 committed by GitHub
commit b5dc677d97
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 464 additions and 415 deletions

27
Cargo.lock generated
View file

@ -47,26 +47,13 @@ checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
name = "aquatic"
version = "0.2.0"
dependencies = [
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http",
"aquatic_udp",
"aquatic_ws",
"mimalloc",
]
[[package]]
name = "aquatic_cli_helpers"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_toml_config",
"git-testament",
"log",
"serde",
"simple_logger",
"toml",
]
[[package]]
name = "aquatic_common"
version = "0.2.0"
@ -76,6 +63,7 @@ dependencies = [
"aquatic_toml_config",
"arc-swap",
"duplicate",
"git-testament",
"glommio",
"hashbrown 0.12.0",
"hex",
@ -88,6 +76,8 @@ dependencies = [
"rustls 0.20.4",
"rustls-pemfile",
"serde",
"simple_logger",
"toml",
]
[[package]]
@ -95,7 +85,6 @@ name = "aquatic_http"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http_protocol",
"aquatic_toml_config",
@ -127,7 +116,6 @@ name = "aquatic_http_load_test"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http_protocol",
"aquatic_toml_config",
@ -149,7 +137,6 @@ name = "aquatic_http_private"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http_protocol",
"aquatic_toml_config",
@ -163,6 +150,7 @@ dependencies = [
"rand",
"rustls 0.20.4",
"serde",
"signal-hook",
"socket2 0.4.4",
"sqlx",
"tokio",
@ -216,7 +204,6 @@ name = "aquatic_udp"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_toml_config",
"aquatic_udp_protocol",
@ -243,7 +230,6 @@ name = "aquatic_udp_bench"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_toml_config",
"aquatic_udp",
@ -262,7 +248,6 @@ name = "aquatic_udp_load_test"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_toml_config",
"aquatic_udp_protocol",
@ -292,7 +277,6 @@ name = "aquatic_ws"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_toml_config",
"aquatic_ws_protocol",
@ -324,7 +308,6 @@ name = "aquatic_ws_load_test"
version = "0.2.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_toml_config",
"aquatic_ws_protocol",

View file

@ -2,7 +2,6 @@
members = [
"aquatic",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http",
"aquatic_http_load_test",

19
TODO.md
View file

@ -6,12 +6,19 @@
* Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead
* stored procedure
* test ip format
* check user token length
* site will likely want num_seeders and num_leechers for all torrents..
## Medium priority
* rename request workers to swarm workers
* quit whole program if any thread panics
* But it would be nice not to panic in workers, but to return errors instead.
Once JoinHandle::is_finished is available in stable Rust (#90470), an
option would be to
* Save JoinHandles
* When preparing to quit because of PanicSentinel sending SIGTERM, loop
through them, extract error and log it
* config: fail on unrecognized keys?
* Run cargo-deny in CI
@ -47,9 +54,6 @@
# Not important
* aquatic_http:
* optimize?
* get_peer_addr only once (takes 1.2% of runtime)
* queue response: allocating takes 2.8% of runtime
* consider better error type for request parsing, so that better error
messages can be sent back (e.g., "full scrapes are not supported")
* test torrent transfer with real clients
@ -58,15 +62,6 @@
positive number.
* aquatic_ws
* mio
* shard torrent state. this could decrease dropped messages too, since
request handlers won't send large batches of them
* connection cleaning interval
* use access list cache
* use write event interest for handshakes too
* deregistering before closing is required by mio, but it hurts performance
* blocked on https://github.com/snapview/tungstenite-rs/issues/51
* connection closing: send tls close message etc?
* write new version of extract_response_peers which checks for equality with
peer sending request???

View file

@ -13,7 +13,7 @@ readme = "../README.md"
name = "aquatic"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_http = { version = "0.2.0", path = "../aquatic_http" }
aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" }
aquatic_ws = { version = "0.2.0", path = "../aquatic_ws" }

View file

@ -1,4 +1,4 @@
use aquatic_cli_helpers::{print_help, run_app_with_cli_and_config, Options};
use aquatic_common::cli::{print_help, run_app_with_cli_and_config, Options};
use aquatic_http::config::Config as HttpConfig;
use aquatic_udp::config::Config as UdpConfig;
use aquatic_ws::config::Config as WsConfig;

View file

@ -1,19 +0,0 @@
[package]
name = "aquatic_cli_helpers"
version = "0.2.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2021"
license = "Apache-2.0"
description = "aquatic BitTorrent tracker CLI helpers"
repository = "https://github.com/greatest-ape/aquatic"
readme = "../README.md"
[dependencies]
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
anyhow = "1"
git-testament = "0.2"
log = "0.4"
serde = { version = "1", features = ["derive"] }
simple_logger = { version = "2", features = ["stderr"] }
toml = "0.5"

View file

@ -23,6 +23,7 @@ ahash = "0.7"
anyhow = "1"
arc-swap = "1"
duplicate = "0.4"
git-testament = "0.2"
hashbrown = "0.12"
hex = "0.4"
indexmap-amortized = "1"
@ -31,6 +32,8 @@ log = "0.4"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
simple_logger = { version = "2", features = ["stderr"] }
toml = "0.5"
# Optional
glommio = { version = "0.7", optional = true }

View file

@ -22,7 +22,7 @@ pub enum LogLevel {
impl Default for LogLevel {
fn default() -> Self {
Self::Error
Self::Warn
}
}

View file

@ -194,7 +194,9 @@ pub mod glommio {
// 15 -> 14 and 15
// 14 -> 12 and 13
// 13 -> 10 and 11
CpuPinningDirection::Descending => num_cpu_cores - 2 * (num_cpu_cores - core_index),
CpuPinningDirection::Descending => {
num_cpu_cores - 2 * (num_cpu_cores - core_index)
}
};
get_cpu_set()?

View file

@ -1,10 +1,13 @@
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use ahash::RandomState;
use rand::Rng;
pub mod access_list;
pub mod cli;
pub mod cpu_pinning;
pub mod privileges;
#[cfg(feature = "rustls-config")]
@ -30,6 +33,44 @@ impl ValidUntil {
}
}
pub struct PanicSentinelWatcher(Arc<AtomicBool>);
impl PanicSentinelWatcher {
pub fn create_with_sentinel() -> (Self, PanicSentinel) {
let triggered = Arc::new(AtomicBool::new(false));
let sentinel = PanicSentinel(triggered.clone());
(Self(triggered), sentinel)
}
pub fn panic_was_triggered(&self) -> bool {
self.0.load(Ordering::SeqCst)
}
}
/// Raises SIGTERM when dropped
///
/// Pass to threads to have panics in them cause whole program to exit.
#[derive(Clone)]
pub struct PanicSentinel(Arc<AtomicBool>);
impl Drop for PanicSentinel {
fn drop(&mut self) {
if ::std::thread::panicking() {
let already_triggered = self.0.fetch_or(true, Ordering::SeqCst);
if !already_triggered {
if unsafe { libc::raise(15) } == -1 {
panic!(
"Could not raise SIGTERM: {:#}",
::std::io::Error::last_os_error()
)
}
}
}
}
}
/// Extract response peers
///
/// If there are more peers in map than `max_num_peers_to_take`, do a

View file

@ -1,22 +1,23 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
path::PathBuf,
sync::{Arc, Barrier},
};
use aquatic_toml_config::TomlConfig;
use anyhow::Context;
use privdrop::PrivDrop;
use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct PrivilegeConfig {
/// Chroot and switch user after binding to sockets
/// Chroot and switch group and user after binding to sockets
pub drop_privileges: bool,
/// Chroot to this path
pub chroot_path: String,
pub chroot_path: PathBuf,
/// Group to switch to after chrooting
pub group: String,
/// User to switch to after chrooting
pub user: String,
}
@ -25,41 +26,39 @@ impl Default for PrivilegeConfig {
fn default() -> Self {
Self {
drop_privileges: false,
chroot_path: ".".to_string(),
chroot_path: ".".into(),
user: "nobody".to_string(),
group: "nogroup".to_string(),
}
}
}
pub fn drop_privileges_after_socket_binding(
config: &PrivilegeConfig,
num_bound_sockets: Arc<AtomicUsize>,
target_num: usize,
) -> anyhow::Result<()> {
if config.drop_privileges {
let mut counter = 0usize;
#[derive(Clone)]
pub struct PrivilegeDropper {
barrier: Arc<Barrier>,
config: Arc<PrivilegeConfig>,
}
loop {
let num_bound = num_bound_sockets.load(Ordering::SeqCst);
impl PrivilegeDropper {
pub fn new(config: PrivilegeConfig, num_sockets: usize) -> Self {
Self {
barrier: Arc::new(Barrier::new(num_sockets)),
config: Arc::new(config),
}
}
if num_bound == target_num {
pub fn after_socket_creation(self) -> anyhow::Result<()> {
if self.config.drop_privileges {
if self.barrier.wait().is_leader() {
PrivDrop::default()
.chroot(config.chroot_path.clone())
.user(config.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
.chroot(self.config.chroot_path.clone())
.group(self.config.group.clone())
.user(self.config.user.clone())
.apply()
.with_context(|| "couldn't drop privileges after socket creation")?;
}
}
}
Ok(())
Ok(())
}
}

View file

@ -16,7 +16,6 @@ name = "aquatic_http"
name = "aquatic_http"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] }
aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }

View file

@ -1,10 +1,13 @@
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig, cpu_pinning::asc::CpuPinningConfigAsc};
use aquatic_common::{
access_list::AccessListConfig, cpu_pinning::asc::CpuPinningConfigAsc,
privileges::PrivilegeConfig,
};
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
/// aquatic_http configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
@ -42,7 +45,7 @@ impl Default for Config {
}
}
impl aquatic_cli_helpers::Config for Config {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}

View file

@ -4,13 +4,17 @@ use aquatic_common::{
glommio::{get_worker_placement, set_affinity_for_util_worker},
WorkerIndex,
},
privileges::drop_privileges_after_socket_binding,
privileges::PrivilegeDropper,
rustls_config::create_rustls_config,
PanicSentinelWatcher,
};
use common::State;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{consts::SIGUSR1, iterator::Signals};
use std::sync::{atomic::AtomicUsize, Arc};
use signal_hook::{
consts::{SIGTERM, SIGUSR1},
iterator::Signals,
};
use std::sync::Arc;
use crate::config::Config;
@ -24,17 +28,91 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
let num_peers = config.socket_workers + config.request_workers;
{
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let priv_dropper = priv_dropper.clone();
::std::thread::spawn(move || run_inner(config, state));
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
let executor = builder
.spawn(move || async move {
workers::socket::run_socket_worker(
sentinel,
config,
state,
tls_config,
request_mesh_builder,
response_mesh_builder,
priv_dropper,
)
.await
})
.map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?;
executors.push(executor);
}
for i in 0..(config.request_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::RequestWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder
.spawn(move || async move {
workers::request::run_request_worker(
sentinel,
config,
state,
request_mesh_builder,
response_mesh_builder,
)
.await
})
.map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?;
executors.push(executor);
}
if config.cpu_pinning.active {
@ -50,107 +128,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
SIGTERM => {
if sentinel_watcher.panic_was_triggered() {
return Err(anyhow::anyhow!("worker thread panicked"));
} else {
return Ok(());
}
}
_ => unreachable!(),
}
}
Ok(())
}
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
let executor = builder.spawn(move || async move {
workers::socket::run_socket_worker(
config,
state,
tls_config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
)
.await
});
executors.push(executor);
}
for i in 0..(config.request_workers) {
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::RequestWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder.spawn(move || async move {
workers::request::run_request_worker(
config,
state,
request_mesh_builder,
response_mesh_builder,
)
.await
});
executors.push(executor);
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
if config.cpu_pinning.active {
set_affinity_for_util_worker(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
)?;
}
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}

View file

@ -1,4 +1,4 @@
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_common::cli::run_app_with_cli_and_config;
use aquatic_http::config::Config;
#[global_allocator]

View file

@ -17,8 +17,8 @@ use rand::SeedableRng;
use smartstring::{LazyCompact, SmartString};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::extract_response_peers;
use aquatic_common::ValidUntil;
use aquatic_common::{extract_response_peers, PanicSentinel};
use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr};
use aquatic_http_protocol::common::*;
use aquatic_http_protocol::request::*;
@ -175,6 +175,7 @@ impl TorrentMaps {
}
pub async fn run_request_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,

View file

@ -2,13 +2,14 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::CanonicalSocketAddr;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel};
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{
@ -53,18 +54,18 @@ struct ConnectionReference {
}
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
tls_config: Arc<RustlsConfig>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
response_mesh_builder: MeshBuilder<ChannelResponse, Partial>,
num_bound_sockets: Arc<AtomicUsize>,
priv_dropper: PrivilegeDropper,
) {
let config = Rc::new(config);
let access_list = state.access_list;
let listener = create_tcp_listener(&config);
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let request_senders = Rc::new(request_senders);
@ -485,29 +486,37 @@ fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usi
(info_hash.0[0] as usize) % config.request_workers
}
fn create_tcp_listener(config: &Config) -> TcpListener {
fn create_tcp_listener(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<TcpListener> {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))
.expect("create socket");
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
if config.network.only_ipv6 {
socket.set_only_v6(true).expect("socket: set only ipv6");
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
.with_context(|| format!("socket: bind to {}", config.network.address))?;
socket
.listen(config.network.tcp_backlog)
.unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err));
.with_context(|| format!("socket: listen on {}", config.network.address))?;
unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }
priv_dropper.after_socket_creation()?;
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
}

View file

@ -13,7 +13,6 @@ readme = "../README.md"
name = "aquatic_http_load_test"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] }
aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }

View file

@ -1,6 +1,6 @@
use std::net::SocketAddr;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
@ -26,7 +26,7 @@ pub struct Config {
pub cpu_pinning: CpuPinningConfigDesc,
}
impl aquatic_cli_helpers::Config for Config {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}

View file

@ -24,7 +24,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
const MBITS_FACTOR: f64 = 1.0 / ((1024.0 * 1024.0) / 8.0);
pub fn main() {
aquatic_cli_helpers::run_app_with_cli_and_config::<Config>(
aquatic_common::cli::run_app_with_cli_and_config::<Config>(
"aquatic_http_load_test: BitTorrent load tester",
env!("CARGO_PKG_VERSION"),
run,
@ -70,6 +70,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
)?;
LocalExecutorBuilder::new(placement)
.name("load-test")
.spawn(move || async move {
run_socket_thread(config, tls_config, state).await.unwrap();
})

View file

@ -14,7 +14,6 @@ name = "aquatic_http_private"
name = "aquatic_http_private"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] }
aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol", features = ["with-axum"] }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
@ -30,6 +29,7 @@ mimalloc = { version = "0.1", default-features = false }
rand = { version = "0.8", features = ["small_rng"] }
rustls = "0.20"
serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }

View file

@ -4,7 +4,7 @@ use aquatic_common::privileges::PrivilegeConfig;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
/// aquatic_http_private configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
@ -18,6 +18,7 @@ pub struct Config {
/// generate responses and send them back to the socket workers.
pub request_workers: usize,
pub worker_channel_size: usize,
/// Number of database connections to establish in each socket worker
pub db_connections_per_worker: u32,
pub log_level: LogLevel,
pub network: NetworkConfig,
@ -32,7 +33,7 @@ impl Default for Config {
socket_workers: 1,
request_workers: 1,
worker_channel_size: 128,
db_connections_per_worker: 1,
db_connections_per_worker: 4,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
@ -42,7 +43,7 @@ impl Default for Config {
}
}
impl aquatic_cli_helpers::Config for Config {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}

View file

@ -4,9 +4,12 @@ mod workers;
use std::{collections::VecDeque, sync::Arc};
use aquatic_common::rustls_config::create_rustls_config;
use aquatic_common::{
privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher,
};
use common::ChannelRequestSender;
use dotenv::dotenv;
use signal_hook::{consts::SIGTERM, iterator::Signals};
use tokio::sync::mpsc::channel;
use config::Config;
@ -15,6 +18,8 @@ pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tr
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> anyhow::Result<()> {
let mut signals = Signals::new([SIGTERM])?;
dotenv().ok();
let tls_config = Arc::new(create_rustls_config(
@ -32,37 +37,58 @@ pub fn run(config: Config) -> anyhow::Result<()> {
request_receivers.push_back(request_receiver);
}
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let mut handles = Vec::new();
for _ in 0..config.socket_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let tls_config = tls_config.clone();
let request_sender = ChannelRequestSender::new(request_senders.clone());
let priv_dropper = priv_dropper.clone();
let handle = ::std::thread::Builder::new()
.name("socket".into())
.spawn(move || {
workers::socket::run_socket_worker(config, tls_config, request_sender)
workers::socket::run_socket_worker(
sentinel,
config,
tls_config,
request_sender,
priv_dropper,
)
})?;
handles.push(handle);
}
for _ in 0..config.request_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let request_receiver = request_receivers.pop_front().unwrap();
let handle = ::std::thread::Builder::new()
.name("request".into())
.spawn(move || workers::request::run_request_worker(config, request_receiver))?;
.spawn(move || {
workers::request::run_request_worker(sentinel, config, request_receiver)
})?;
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??;
for signal in &mut signals {
match signal {
SIGTERM => {
if sentinel_watcher.panic_was_triggered() {
return Err(anyhow::anyhow!("worker thread panicked"));
} else {
return Ok(());
}
}
_ => unreachable!(),
}
}
Ok(())

View file

@ -1,4 +1,4 @@
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_common::cli::run_app_with_cli_and_config;
use aquatic_http_private::config::Config;
#[global_allocator]

View file

@ -11,7 +11,7 @@ use tokio::sync::mpsc::Receiver;
use tokio::task::LocalSet;
use tokio::time;
use aquatic_common::{extract_response_peers, CanonicalSocketAddr, ValidUntil};
use aquatic_common::{extract_response_peers, CanonicalSocketAddr, PanicSentinel, ValidUntil};
use aquatic_http_protocol::response::{
AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6,
};
@ -22,6 +22,7 @@ use crate::config::Config;
use common::*;
pub fn run_request_worker(
_sentinel: PanicSentinel,
config: Config,
request_receiver: Receiver<ChannelAnnounceRequest>,
) -> anyhow::Result<()> {

View file

@ -8,7 +8,7 @@ use std::{
};
use anyhow::Context;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{privileges::PrivilegeDropper, rustls_config::RustlsConfig, PanicSentinel};
use axum::{extract::connect_info::Connected, routing::get, Extension, Router};
use hyper::server::conn::AddrIncoming;
use sqlx::mysql::MySqlPoolOptions;
@ -23,11 +23,13 @@ impl<'a> Connected<&'a tls::TlsStream> for SocketAddr {
}
pub fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
tls_config: Arc<RustlsConfig>,
request_sender: ChannelRequestSender,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
let tcp_listener = create_tcp_listener(config.network.address)?;
let tcp_listener = create_tcp_listener(config.network.address, priv_dropper)?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
@ -71,7 +73,10 @@ async fn run_app(
Ok(())
}
fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result<TcpListener> {
fn create_tcp_listener(
addr: SocketAddr,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<TcpListener> {
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
@ -93,5 +98,7 @@ fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result<TcpListener> {
.listen(1024)
.with_context(|| format!("listen on {}", addr))?;
priv_dropper.after_socket_creation()?;
Ok(socket.into())
}

View file

@ -19,7 +19,6 @@ name = "aquatic_udp"
cpu-pinning = ["aquatic_common/with-hwloc"]
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" }

View file

@ -3,7 +3,7 @@ use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
use aquatic_toml_config::TomlConfig;
/// aquatic_udp configuration
@ -58,7 +58,7 @@ impl Default for Config {
}
}
impl aquatic_cli_helpers::Config for Config {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}

View file

@ -2,20 +2,20 @@ pub mod common;
pub mod config;
pub mod workers;
use aquatic_common::PanicSentinelWatcher;
use config::Config;
use std::collections::BTreeMap;
use std::sync::{atomic::AtomicUsize, Arc};
use std::thread::Builder;
use anyhow::Context;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use aquatic_common::privileges::PrivilegeDropper;
use crossbeam_channel::{bounded, unbounded};
use aquatic_common::access_list::update_access_list;
use signal_hook::consts::SIGUSR1;
use signal_hook::consts::{SIGTERM, SIGUSR1};
use signal_hook::iterator::Signals;
use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State};
@ -30,9 +30,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let mut request_senders = Vec::new();
let mut request_receivers = BTreeMap::new();
@ -63,6 +64,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
}
for i in 0..config.request_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let request_receiver = request_receivers.remove(&i).unwrap().clone();
@ -80,6 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
);
workers::request::run_request_worker(
sentinel,
config,
state,
request_receiver,
@ -91,12 +94,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
}
for i in 0..config.socket_workers {
let sentinel = sentinel.clone();
let state = state.clone();
let config = config.clone();
let request_sender =
ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone());
let response_receiver = response_receivers.remove(&i).unwrap();
let num_bound_sockets = num_bound_sockets.clone();
let priv_dropper = priv_dropper.clone();
Builder::new()
.name(format!("socket-{:02}", i + 1))
@ -110,23 +114,25 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
);
workers::socket::run_socket_worker(
sentinel,
state,
config,
i,
request_sender,
response_receiver,
num_bound_sockets,
priv_dropper,
);
})
.with_context(|| "spawn socket worker")?;
}
if config.statistics.active() {
let sentinel = sentinel.clone();
let state = state.clone();
let config = config.clone();
Builder::new()
.name("statistics-collector".to_string())
.name("statistics".into())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
@ -136,18 +142,11 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::Util,
);
workers::statistics::run_statistics_worker(config, state);
workers::statistics::run_statistics_worker(sentinel, config, state);
})
.with_context(|| "spawn statistics worker")?;
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
@ -161,6 +160,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
SIGTERM => {
if sentinel_watcher.panic_was_triggered() {
return Err(anyhow::anyhow!("worker thread panicked"));
}
break;
}
_ => unreachable!(),
}
}

View file

@ -2,7 +2,7 @@
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn main() {
aquatic_cli_helpers::run_app_with_cli_and_config::<aquatic_udp::config::Config>(
aquatic_common::cli::run_app_with_cli_and_config::<aquatic_udp::config::Config>(
aquatic_udp::APP_NAME,
aquatic_udp::APP_VERSION,
aquatic_udp::run,

View file

@ -11,6 +11,7 @@ use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::AmortizedIndexMap;
use aquatic_common::CanonicalSocketAddr;
use aquatic_common::PanicSentinel;
use aquatic_common::ValidUntil;
use crossbeam_channel::Receiver;
use rand::{rngs::SmallRng, SeedableRng};
@ -121,6 +122,7 @@ impl TorrentMaps {
}
pub fn run_request_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,

View file

@ -1,12 +1,11 @@
use std::collections::BTreeMap;
use std::io::{Cursor, ErrorKind};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use std::vec::Drain;
use anyhow::Context;
use aquatic_common::privileges::PrivilegeDropper;
use crossbeam_channel::Receiver;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
@ -15,8 +14,8 @@ use slab::Slab;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListCache;
use aquatic_common::ValidUntil;
use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr};
use aquatic_common::{PanicSentinel, ValidUntil};
use aquatic_udp_protocol::*;
use socket2::{Domain, Protocol, Socket, Type};
@ -152,17 +151,19 @@ impl PendingScrapeResponseSlab {
}
pub fn run_socket_worker(
_sentinel: PanicSentinel,
state: State,
config: Config,
token_num: usize,
request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
num_bound_sockets: Arc<AtomicUsize>,
priv_dropper: PrivilegeDropper,
) {
let mut rng = StdRng::from_entropy();
let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut socket = UdpSocket::from_std(create_socket(&config));
let mut socket =
UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket"));
let mut poll = Poll::new().expect("create poll");
let interests = Interest::READABLE;
@ -171,8 +172,6 @@ pub fn run_socket_worker(
.register(&mut socket, Token(token_num), interests)
.unwrap();
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connections = ConnectionMap::default();
let mut pending_scrape_responses = PendingScrapeResponseSlab::default();
@ -520,27 +519,29 @@ fn send_response(
}
}
pub fn create_socket(config: &Config) -> ::std::net::UdpSocket {
pub fn create_socket(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<::std::net::UdpSocket> {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?
};
if config.network.only_ipv6 {
socket.set_only_v6(true).expect("socket: set only ipv6");
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
.with_context(|| "socket: set nonblocking")?;
let recv_buffer_size = config.network.socket_recv_buffer_size;
@ -554,7 +555,13 @@ pub fn create_socket(config: &Config) -> ::std::net::UdpSocket {
}
}
socket.into()
socket
.bind(&config.network.address.into())
.with_context(|| format!("socket: bind to {}", config.network.address))?;
priv_dropper.after_socket_creation()?;
Ok(socket.into())
}
#[cfg(test)]

View file

@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::PanicSentinel;
use num_format::{Locale, ToFormattedString};
use serde::Serialize;
use time::format_description::well_known::Rfc2822;
@ -135,7 +136,7 @@ struct TemplateData {
peer_update_interval: String,
}
pub fn run_statistics_worker(config: Config, state: State) {
pub fn run_statistics_worker(_sentinel: PanicSentinel, config: Config, state: State) {
let tt = if config.statistics.write_html_to_file {
let mut tt = TinyTemplate::new();

View file

@ -11,7 +11,6 @@ readme = "../README.md"
name = "aquatic_udp_bench"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" }

View file

@ -24,7 +24,7 @@ impl Default for BenchConfig {
}
}
impl aquatic_cli_helpers::Config for BenchConfig {}
impl aquatic_common::cli::Config for BenchConfig {}
#[cfg(test)]
mod tests {

View file

@ -7,13 +7,14 @@
//! Scrape: 1 873 545 requests/second, 533.75 ns/request
//! ```
use aquatic_common::PanicSentinelWatcher;
use aquatic_udp::workers::request::run_request_worker;
use crossbeam_channel::unbounded;
use num_format::{Locale, ToFormattedString};
use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
use std::time::Duration;
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_common::cli::run_app_with_cli_and_config;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp_protocol::*;
@ -41,6 +42,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
// Setup common state, spawn request handlers
let mut aquatic_config = Config::default();
let (_, sentinel) = PanicSentinelWatcher::create_with_sentinel();
aquatic_config.cleaning.torrent_cleaning_interval = 60 * 60 * 24;
@ -55,6 +57,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
::std::thread::spawn(move || {
run_request_worker(
sentinel,
config,
state,
request_receiver,

View file

@ -16,7 +16,6 @@ cpu-pinning = ["aquatic_common/with-hwloc"]
name = "aquatic_udp_load_test"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" }

View file

@ -2,7 +2,7 @@ use std::net::SocketAddr;
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig;

View file

@ -1,7 +1,7 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize;
use std::sync::{atomic::Ordering, Arc};
use std::thread;
use std::thread::{self, Builder};
use std::time::{Duration, Instant};
#[cfg(feature = "cpu-pinning")]
@ -22,7 +22,7 @@ use worker::*;
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
pub fn main() {
aquatic_cli_helpers::run_app_with_cli_and_config::<Config>(
aquatic_common::cli::run_app_with_cli_and_config::<Config>(
"aquatic_udp_load_test: BitTorrent load tester",
env!("CARGO_PKG_VERSION"),
run,
@ -30,8 +30,8 @@ pub fn main() {
)
}
impl aquatic_cli_helpers::Config for Config {
fn get_log_level(&self) -> Option<aquatic_cli_helpers::LogLevel> {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
@ -79,7 +79,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let config = config.clone();
let state = state.clone();
thread::spawn(move || {
Builder::new().name("load-test".into()).spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
@ -89,7 +89,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
);
run_worker_thread(state, pareto, &config, addr)
});
})?;
}
#[cfg(feature = "cpu-pinning")]

View file

@ -16,7 +16,6 @@ name = "aquatic_ws"
name = "aquatic_ws"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" }

View file

@ -5,7 +5,7 @@ use aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
use aquatic_toml_config::TomlConfig;
/// aquatic_ws configuration
@ -44,7 +44,7 @@ impl Default for Config {
}
}
impl aquatic_cli_helpers::Config for Config {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}

View file

@ -2,16 +2,20 @@ pub mod common;
pub mod config;
pub mod workers;
use std::sync::{atomic::AtomicUsize, Arc};
use std::sync::Arc;
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex;
use aquatic_common::rustls_config::create_rustls_config;
use aquatic_common::PanicSentinelWatcher;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{consts::SIGUSR1, iterator::Signals};
use signal_hook::{
consts::{SIGTERM, SIGUSR1},
iterator::Signals,
};
use aquatic_common::access_list::update_access_list;
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use aquatic_common::privileges::PrivilegeDropper;
use common::*;
use config::Config;
@ -22,17 +26,91 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const SHARED_IN_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
let num_peers = config.socket_workers + config.request_workers;
{
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let priv_dropper = priv_dropper.clone();
::std::thread::spawn(move || run_workers(config, state));
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
let executor = builder
.spawn(move || async move {
workers::socket::run_socket_worker(
sentinel,
config,
state,
tls_config,
request_mesh_builder,
response_mesh_builder,
priv_dropper,
)
.await
})
.map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?;
executors.push(executor);
}
for i in 0..(config.request_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::RequestWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder
.spawn(move || async move {
workers::request::run_request_worker(
sentinel,
config,
state,
request_mesh_builder,
response_mesh_builder,
)
.await
})
.map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?;
executors.push(executor);
}
if config.cpu_pinning.active {
@ -48,107 +126,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
SIGTERM => {
if sentinel_watcher.panic_was_triggered() {
return Err(anyhow::anyhow!("worker thread panicked"));
} else {
return Ok(());
}
}
_ => unreachable!(),
}
}
Ok(())
}
fn run_workers(config: Config, state: State) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
let executor = builder.spawn(move || async move {
workers::socket::run_socket_worker(
config,
state,
tls_config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
)
.await
});
executors.push(executor);
}
for i in 0..(config.request_workers) {
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
WorkerIndex::RequestWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder.spawn(move || async move {
workers::request::run_request_worker(
config,
state,
request_mesh_builder,
response_mesh_builder,
)
.await
});
executors.push(executor);
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
if config.cpu_pinning.active {
set_affinity_for_util_worker(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
)?;
}
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}

View file

@ -1,4 +1,4 @@
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_common::cli::run_app_with_cli_and_config;
use aquatic_ws::config::Config;
#[global_allocator]

View file

@ -12,7 +12,7 @@ use glommio::timer::TimerActionRepeat;
use hashbrown::HashMap;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::{extract_response_peers, AmortizedIndexMap};
use aquatic_common::{extract_response_peers, AmortizedIndexMap, PanicSentinel};
use aquatic_ws_protocol::*;
use crate::common::*;
@ -128,6 +128,7 @@ impl TorrentMaps {
}
pub async fn run_request_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,

View file

@ -3,13 +3,14 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::CanonicalSocketAddr;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel};
use aquatic_ws_protocol::*;
use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream};
@ -48,19 +49,18 @@ struct ConnectionReference {
}
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
tls_config: Arc<RustlsConfig>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
priv_dropper: PrivilegeDropper,
) {
let config = Rc::new(config);
let access_list = state.access_list;
let listener = create_tcp_listener(&config);
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap();
let in_message_senders = Rc::new(in_message_senders);
@ -544,7 +544,10 @@ fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) ->
(info_hash.0[0] as usize) % config.request_workers
}
fn create_tcp_listener(config: &Config) -> TcpListener {
fn create_tcp_listener(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<TcpListener> {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
} else {
@ -552,21 +555,27 @@ fn create_tcp_listener(config: &Config) -> TcpListener {
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))
.expect("create socket");
.with_context(|| "create socket")?;
if config.network.only_ipv6 {
socket.set_only_v6(true).expect("socket: set only ipv6");
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
.with_context(|| format!("socket: bind to {}", config.network.address))?;
socket
.listen(config.network.tcp_backlog)
.unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err));
.with_context(|| format!("socket: listen {}", config.network.address))?;
unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }
priv_dropper.after_socket_creation()?;
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
}

View file

@ -13,7 +13,6 @@ readme = "../README.md"
name = "aquatic_ws_load_test"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" }

View file

@ -1,6 +1,6 @@
use std::net::SocketAddr;
use aquatic_cli_helpers::LogLevel;
use aquatic_common::cli::LogLevel;
use aquatic_common::cpu_pinning::desc::CpuPinningConfigDesc;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
@ -19,7 +19,7 @@ pub struct Config {
pub cpu_pinning: CpuPinningConfigDesc,
}
impl aquatic_cli_helpers::Config for Config {
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}

View file

@ -21,7 +21,7 @@ use network::*;
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
pub fn main() {
aquatic_cli_helpers::run_app_with_cli_and_config::<Config>(
aquatic_common::cli::run_app_with_cli_and_config::<Config>(
"aquatic_ws_load_test: WebTorrent load tester",
env!("CARGO_PKG_VERSION"),
run,
@ -67,6 +67,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
)?;
LocalExecutorBuilder::new(placement)
.name("load-test")
.spawn(move || async move {
run_socket_thread(config, tls_config, state).await.unwrap();
})