diff --git a/crates/udp/src/config.rs b/crates/udp/src/config.rs index df83279..8dfcfce 100644 --- a/crates/udp/src/config.rs +++ b/crates/udp/src/config.rs @@ -11,36 +11,14 @@ use aquatic_toml_config::TomlConfig; #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[serde(default, deny_unknown_fields)] pub struct Config { - /// Number of socket worker. One per physical core is recommended. - /// - /// Socket workers receive requests from clients and parse them. - /// Responses to connect requests are sent back immediately. Announce and - /// scrape requests are passed on to swarm workers, which generate - /// responses and send them back to the socket worker, which sends them - /// to the client. + /// Number of socket worker. One per virtual CPU is recommended pub socket_workers: usize, - /// Number of swarm workers. One is enough in almost all cases - /// - /// Swarm workers receive parsed announce and scrape requests from socket - /// workers, generate responses and send them back to the socket workers. - pub swarm_workers: usize, pub log_level: LogLevel, - /// Maximum number of items in each channel passing requests/responses - /// between workers. A value of zero is no longer allowed. - pub worker_channel_size: usize, - /// How long to block waiting for requests in swarm workers. - /// - /// Higher values means that with zero traffic, the worker will not - /// unnecessarily cause the CPU to wake up as often. However, high values - /// (something like larger than 1000) combined with very low traffic can - /// cause delays in torrent cleaning. - pub request_channel_recv_timeout_ms: u64, pub network: NetworkConfig, pub protocol: ProtocolConfig, pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, - /// Access list configuration /// /// The file is read on start and when the program receives `SIGUSR1`. If @@ -48,26 +26,19 @@ pub struct Config { /// emitting of an error-level log message, while successful updates of the /// access list result in emitting of an info-level log message. pub access_list: AccessListConfig, - #[cfg(feature = "cpu-pinning")] - pub cpu_pinning: aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc, } impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - swarm_workers: 1, log_level: LogLevel::Error, - worker_channel_size: 1_024, - request_channel_recv_timeout_ms: 100, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), - #[cfg(feature = "cpu-pinning")] - cpu_pinning: Default::default(), } } } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 65a490e..c0a7d71 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -13,8 +13,6 @@ use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; use aquatic_common::access_list::update_access_list; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; use common::{State, Statistics}; @@ -31,13 +29,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let statistics = Statistics::new(&config); let connection_validator = ConnectionValidator::new(&config)?; let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - - let mut join_handles = Vec::new(); + let (statistics_sender, statistics_receiver) = unbounded(); update_access_list(&config.access_list, &state.access_list)?; - let (statistics_sender, statistics_receiver) = unbounded(); + let mut join_handles = Vec::new(); + // Spawn socket worker threads for i in 0..config.socket_workers { let state = state.clone(); let config = config.clone(); @@ -49,14 +47,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let handle = Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::SocketWorker(i), - ); - workers::socket::run_socket_worker( config, state, @@ -71,6 +61,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Socket(i), handle)); } + // Spawn cleaning thread { let state = state.clone(); let config = config.clone(); @@ -94,6 +85,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Cleaning, handle)); } + // Spawn statistics thread if config.statistics.active() { let state = state.clone(); let config = config.clone(); @@ -101,14 +93,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let handle = Builder::new() .name("statistics".into()) .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - workers::statistics::run_statistics_worker( config, state, @@ -121,6 +105,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Statistics, handle)); } + // Spawn prometheus endpoint thread #[cfg(feature = "prometheus")] if config.statistics.active() && config.statistics.run_prometheus_endpoint { let handle = aquatic_common::spawn_prometheus_endpoint( @@ -141,14 +126,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let handle: JoinHandle> = Builder::new() .name("signals".into()) .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - for signal in &mut signals { match signal { SIGUSR1 => { @@ -165,6 +142,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Signals, handle)); } + // Quit application if any worker returns or panics loop { for (i, (_, handle)) in join_handles.iter().enumerate() { if handle.is_finished() { diff --git a/crates/udp/src/workers/statistics/collector.rs b/crates/udp/src/workers/statistics/collector.rs index 1680695..93fe11d 100644 --- a/crates/udp/src/workers/statistics/collector.rs +++ b/crates/udp/src/workers/statistics/collector.rs @@ -25,7 +25,6 @@ pub struct StatisticsCollector { statistics: Statistics, ip_version: IpVersion, last_update: Instant, - pending_histograms: Vec>, last_complete_histogram: PeerHistogramStatistics, } @@ -34,19 +33,13 @@ impl StatisticsCollector { Self { statistics, last_update: Instant::now(), - pending_histograms: Vec::new(), last_complete_histogram: Default::default(), ip_version, } } - pub fn add_histogram(&mut self, config: &Config, histogram: Histogram) { - self.pending_histograms.push(histogram); - - if self.pending_histograms.len() == config.swarm_workers { - self.last_complete_histogram = - PeerHistogramStatistics::new(self.pending_histograms.drain(..).sum()); - } + pub fn add_histogram(&mut self, histogram: Histogram) { + self.last_complete_histogram = PeerHistogramStatistics::new(histogram); } pub fn collect_from_shared( diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs index beafa2d..4814bb7 100644 --- a/crates/udp/src/workers/statistics/mod.rs +++ b/crates/udp/src/workers/statistics/mod.rs @@ -81,8 +81,8 @@ pub fn run_statistics_worker( for message in statistics_receiver.try_iter() { match message { - StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h), - StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h), + StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(h), + StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(h), StatisticsMessage::PeerAdded(peer_id) => { if process_peer_client_data { peers