udp: remove socket_worker config, adjust other code, fix statistics

This commit is contained in:
Joakim Frostegård 2024-02-10 15:54:07 +01:00
parent c4fd3c9e83
commit 71a3cb9a5a
4 changed files with 12 additions and 70 deletions

View file

@ -11,36 +11,14 @@ use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)] #[serde(default, deny_unknown_fields)]
pub struct Config { pub struct Config {
/// Number of socket worker. One per physical core is recommended. /// Number of socket worker. One per virtual CPU is recommended
///
/// Socket workers receive requests from clients and parse them.
/// Responses to connect requests are sent back immediately. Announce and
/// scrape requests are passed on to swarm workers, which generate
/// responses and send them back to the socket worker, which sends them
/// to the client.
pub socket_workers: usize, pub socket_workers: usize,
/// Number of swarm workers. One is enough in almost all cases
///
/// Swarm workers receive parsed announce and scrape requests from socket
/// workers, generate responses and send them back to the socket workers.
pub swarm_workers: usize,
pub log_level: LogLevel, pub log_level: LogLevel,
/// Maximum number of items in each channel passing requests/responses
/// between workers. A value of zero is no longer allowed.
pub worker_channel_size: usize,
/// How long to block waiting for requests in swarm workers.
///
/// Higher values means that with zero traffic, the worker will not
/// unnecessarily cause the CPU to wake up as often. However, high values
/// (something like larger than 1000) combined with very low traffic can
/// cause delays in torrent cleaning.
pub request_channel_recv_timeout_ms: u64,
pub network: NetworkConfig, pub network: NetworkConfig,
pub protocol: ProtocolConfig, pub protocol: ProtocolConfig,
pub statistics: StatisticsConfig, pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
/// Access list configuration /// Access list configuration
/// ///
/// The file is read on start and when the program receives `SIGUSR1`. If /// The file is read on start and when the program receives `SIGUSR1`. If
@ -48,26 +26,19 @@ pub struct Config {
/// emitting of an error-level log message, while successful updates of the /// emitting of an error-level log message, while successful updates of the
/// access list result in emitting of an info-level log message. /// access list result in emitting of an info-level log message.
pub access_list: AccessListConfig, pub access_list: AccessListConfig,
#[cfg(feature = "cpu-pinning")]
pub cpu_pinning: aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc,
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
socket_workers: 1, socket_workers: 1,
swarm_workers: 1,
log_level: LogLevel::Error, log_level: LogLevel::Error,
worker_channel_size: 1_024,
request_channel_recv_timeout_ms: 100,
network: NetworkConfig::default(), network: NetworkConfig::default(),
protocol: ProtocolConfig::default(), protocol: ProtocolConfig::default(),
statistics: StatisticsConfig::default(), statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(), cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(), privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(), access_list: AccessListConfig::default(),
#[cfg(feature = "cpu-pinning")]
cpu_pinning: Default::default(),
} }
} }
} }

View file

@ -13,8 +13,6 @@ use signal_hook::consts::SIGUSR1;
use signal_hook::iterator::Signals; use signal_hook::iterator::Signals;
use aquatic_common::access_list::update_access_list; use aquatic_common::access_list::update_access_list;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::privileges::PrivilegeDropper;
use common::{State, Statistics}; use common::{State, Statistics};
@ -31,13 +29,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let statistics = Statistics::new(&config); let statistics = Statistics::new(&config);
let connection_validator = ConnectionValidator::new(&config)?; let connection_validator = ConnectionValidator::new(&config)?;
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let (statistics_sender, statistics_receiver) = unbounded();
let mut join_handles = Vec::new();
update_access_list(&config.access_list, &state.access_list)?; update_access_list(&config.access_list, &state.access_list)?;
let (statistics_sender, statistics_receiver) = unbounded(); let mut join_handles = Vec::new();
// Spawn socket worker threads
for i in 0..config.socket_workers { for i in 0..config.socket_workers {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
@ -49,14 +47,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let handle = Builder::new() let handle = Builder::new()
.name(format!("socket-{:02}", i + 1)) .name(format!("socket-{:02}", i + 1))
.spawn(move || { .spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::SocketWorker(i),
);
workers::socket::run_socket_worker( workers::socket::run_socket_worker(
config, config,
state, state,
@ -71,6 +61,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Socket(i), handle)); join_handles.push((WorkerType::Socket(i), handle));
} }
// Spawn cleaning thread
{ {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
@ -94,6 +85,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Cleaning, handle)); join_handles.push((WorkerType::Cleaning, handle));
} }
// Spawn statistics thread
if config.statistics.active() { if config.statistics.active() {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
@ -101,14 +93,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let handle = Builder::new() let handle = Builder::new()
.name("statistics".into()) .name("statistics".into())
.spawn(move || { .spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
workers::statistics::run_statistics_worker( workers::statistics::run_statistics_worker(
config, config,
state, state,
@ -121,6 +105,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Statistics, handle)); join_handles.push((WorkerType::Statistics, handle));
} }
// Spawn prometheus endpoint thread
#[cfg(feature = "prometheus")] #[cfg(feature = "prometheus")]
if config.statistics.active() && config.statistics.run_prometheus_endpoint { if config.statistics.active() && config.statistics.run_prometheus_endpoint {
let handle = aquatic_common::spawn_prometheus_endpoint( let handle = aquatic_common::spawn_prometheus_endpoint(
@ -141,14 +126,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let handle: JoinHandle<anyhow::Result<()>> = Builder::new() let handle: JoinHandle<anyhow::Result<()>> = Builder::new()
.name("signals".into()) .name("signals".into())
.spawn(move || { .spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
for signal in &mut signals { for signal in &mut signals {
match signal { match signal {
SIGUSR1 => { SIGUSR1 => {
@ -165,6 +142,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Signals, handle)); join_handles.push((WorkerType::Signals, handle));
} }
// Quit application if any worker returns or panics
loop { loop {
for (i, (_, handle)) in join_handles.iter().enumerate() { for (i, (_, handle)) in join_handles.iter().enumerate() {
if handle.is_finished() { if handle.is_finished() {

View file

@ -25,7 +25,6 @@ pub struct StatisticsCollector {
statistics: Statistics, statistics: Statistics,
ip_version: IpVersion, ip_version: IpVersion,
last_update: Instant, last_update: Instant,
pending_histograms: Vec<Histogram<u64>>,
last_complete_histogram: PeerHistogramStatistics, last_complete_histogram: PeerHistogramStatistics,
} }
@ -34,19 +33,13 @@ impl StatisticsCollector {
Self { Self {
statistics, statistics,
last_update: Instant::now(), last_update: Instant::now(),
pending_histograms: Vec::new(),
last_complete_histogram: Default::default(), last_complete_histogram: Default::default(),
ip_version, ip_version,
} }
} }
pub fn add_histogram(&mut self, config: &Config, histogram: Histogram<u64>) { pub fn add_histogram(&mut self, histogram: Histogram<u64>) {
self.pending_histograms.push(histogram); self.last_complete_histogram = PeerHistogramStatistics::new(histogram);
if self.pending_histograms.len() == config.swarm_workers {
self.last_complete_histogram =
PeerHistogramStatistics::new(self.pending_histograms.drain(..).sum());
}
} }
pub fn collect_from_shared( pub fn collect_from_shared(

View file

@ -81,8 +81,8 @@ pub fn run_statistics_worker(
for message in statistics_receiver.try_iter() { for message in statistics_receiver.try_iter() {
match message { match message {
StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(h),
StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(h),
StatisticsMessage::PeerAdded(peer_id) => { StatisticsMessage::PeerAdded(peer_id) => {
if process_peer_client_data { if process_peer_client_data {
peers peers