diff --git a/TODO.md b/TODO.md index 25bb6d7..9024c23 100644 --- a/TODO.md +++ b/TODO.md @@ -3,8 +3,6 @@ ## High priority * udp - * fix cleaning - * fix statistics * fix config * consider ways of avoiding response peer allocations * make ConnectionValidator faster by avoiding calling time functions so often diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index ea5b928..fbd99b9 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -163,6 +163,7 @@ pub enum WorkerType { Socket(usize), Statistics, Signals, + Cleaning, #[cfg(feature = "prometheus")] Prometheus, } @@ -174,6 +175,7 @@ impl Display for WorkerType { Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)), Self::Statistics => f.write_str("Statistics worker"), Self::Signals => f.write_str("Signals worker"), + Self::Cleaning => f.write_str("Cleaning worker"), #[cfg(feature = "prometheus")] Self::Prometheus => f.write_str("Prometheus worker"), } diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 7e3f7a3..c42aa35 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -32,7 +32,7 @@ impl IpVersion { #[derive(Clone)] pub struct Statistics { pub socket: Vec>>, - pub swarm: Vec>>, + pub swarm: CachePaddedArc>, } impl Statistics { @@ -41,9 +41,7 @@ impl Statistics { socket: repeat_with(Default::default) .take(config.socket_workers) .collect(), - swarm: repeat_with(Default::default) - .take(config.swarm_workers) - .collect(), + swarm: Default::default(), } } } diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs index 6e27fc4..65a490e 100644 --- a/crates/udp/src/lib.rs +++ b/crates/udp/src/lib.rs @@ -71,6 +71,29 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Socket(i), handle)); } + { + let state = state.clone(); + let config = config.clone(); + let statistics = statistics.swarm.clone(); + let statistics_sender = statistics_sender.clone(); + + let handle = Builder::new().name("cleaning".into()).spawn(move || loop { + sleep(Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )); + + state.torrent_maps.clean_and_update_statistics( + &config, + &statistics, + &statistics_sender, + &state.access_list, + state.server_start_instant, + ); + })?; + + join_handles.push((WorkerType::Cleaning, handle)); + } + if config.statistics.active() { let state = state.clone(); let config = config.clone(); @@ -142,14 +165,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { join_handles.push((WorkerType::Signals, handle)); } - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - config.swarm_workers, - WorkerIndex::Util, - ); - loop { for (i, (_, handle)) in join_handles.iter().enumerate() { if handle.is_finished() { diff --git a/crates/udp/src/swarm.rs b/crates/udp/src/swarm.rs index 159e145..f6cb336 100644 --- a/crates/udp/src/swarm.rs +++ b/crates/udp/src/swarm.rs @@ -6,6 +6,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use aquatic_common::SecondsSinceServerStart; +use aquatic_common::ServerStartInstant; use aquatic_common::{ access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, ValidUntil, @@ -84,16 +85,16 @@ impl TorrentMaps { } /// Remove forbidden or inactive torrents, reclaim space and update statistics pub fn clean_and_update_statistics( - &mut self, + &self, config: &Config, - state: &State, statistics: &CachePaddedArc>, statistics_sender: &Sender, access_list: &Arc, + server_start_instant: ServerStartInstant, ) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; - let now = state.server_start_instant.seconds_elapsed(); + let now = server_start_instant.seconds_elapsed(); let ipv4 = self.ipv4 @@ -196,7 +197,7 @@ impl TorrentMapShards { } fn clean_and_get_statistics( - &mut self, + &self, config: &Config, statistics_sender: &Sender, access_list_cache: &mut AccessListCache, diff --git a/crates/udp/src/workers/statistics/collector.rs b/crates/udp/src/workers/statistics/collector.rs index 5297853..1680695 100644 --- a/crates/udp/src/workers/statistics/collector.rs +++ b/crates/udp/src/workers/statistics/collector.rs @@ -60,8 +60,6 @@ impl StatisticsCollector { let mut responses_error: usize = 0; let mut bytes_received: usize = 0; let mut bytes_sent: usize = 0; - let mut num_torrents: usize = 0; - let mut num_peers: usize = 0; #[cfg(feature = "prometheus")] let ip_version_prometheus_str = self.ip_version.prometheus_str(); @@ -186,44 +184,37 @@ impl StatisticsCollector { } } - for (i, statistics) in self - .statistics - .swarm - .iter() - .map(|s| s.by_ip_version(self.ip_version)) - .enumerate() - { - { - let n = statistics.torrents.load(Ordering::Relaxed); + let swarm_statistics = &self.statistics.swarm.by_ip_version(self.ip_version); - num_torrents += n; + let num_torrents = { + let num_torrents = swarm_statistics.torrents.load(Ordering::Relaxed); - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => ip_version_prometheus_str, - "worker_index" => i.to_string(), - ) - .set(n as f64); - } + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => ip_version_prometheus_str, + ) + .set(num_torrents as f64); } - { - let n = statistics.peers.load(Ordering::Relaxed); - num_peers += n; + num_torrents + }; - #[cfg(feature = "prometheus")] - if config.statistics.run_prometheus_endpoint { - ::metrics::gauge!( - "aquatic_peers", - "ip_version" => ip_version_prometheus_str, - "worker_index" => i.to_string(), - ) - .set(n as f64); - } + let num_peers = { + let num_peers = swarm_statistics.peers.load(Ordering::Relaxed); + + #[cfg(feature = "prometheus")] + if config.statistics.run_prometheus_endpoint { + ::metrics::gauge!( + "aquatic_peers", + "ip_version" => ip_version_prometheus_str, + ) + .set(num_peers as f64); } - } + + num_peers + }; let elapsed = { let now = Instant::now();