udp: add cleaning worker

This commit is contained in:
Joakim Frostegård 2024-02-10 15:48:09 +01:00
parent 6d7ffd40ae
commit c4fd3c9e83
6 changed files with 57 additions and 52 deletions

View file

@ -32,7 +32,7 @@ impl IpVersion {
#[derive(Clone)]
pub struct Statistics {
pub socket: Vec<CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>>,
pub swarm: Vec<CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>>,
pub swarm: CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
}
impl Statistics {
@ -41,9 +41,7 @@ impl Statistics {
socket: repeat_with(Default::default)
.take(config.socket_workers)
.collect(),
swarm: repeat_with(Default::default)
.take(config.swarm_workers)
.collect(),
swarm: Default::default(),
}
}
}

View file

@ -71,6 +71,29 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Socket(i), handle));
}
{
let state = state.clone();
let config = config.clone();
let statistics = statistics.swarm.clone();
let statistics_sender = statistics_sender.clone();
let handle = Builder::new().name("cleaning".into()).spawn(move || loop {
sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
state.torrent_maps.clean_and_update_statistics(
&config,
&statistics,
&statistics_sender,
&state.access_list,
state.server_start_instant,
);
})?;
join_handles.push((WorkerType::Cleaning, handle));
}
if config.statistics.active() {
let state = state.clone();
let config = config.clone();
@ -142,14 +165,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Signals, handle));
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
loop {
for (i, (_, handle)) in join_handles.iter().enumerate() {
if handle.is_finished() {

View file

@ -6,6 +6,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use aquatic_common::SecondsSinceServerStart;
use aquatic_common::ServerStartInstant;
use aquatic_common::{
access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode},
ValidUntil,
@ -84,16 +85,16 @@ impl TorrentMaps {
}
/// Remove forbidden or inactive torrents, reclaim space and update statistics
pub fn clean_and_update_statistics(
&mut self,
&self,
config: &Config,
state: &State,
statistics: &CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
statistics_sender: &Sender<StatisticsMessage>,
access_list: &Arc<AccessListArcSwap>,
server_start_instant: ServerStartInstant,
) {
let mut cache = create_access_list_cache(access_list);
let mode = config.access_list.mode;
let now = state.server_start_instant.seconds_elapsed();
let now = server_start_instant.seconds_elapsed();
let ipv4 =
self.ipv4
@ -196,7 +197,7 @@ impl<I: Ip> TorrentMapShards<I> {
}
fn clean_and_get_statistics(
&mut self,
&self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
access_list_cache: &mut AccessListCache,

View file

@ -60,8 +60,6 @@ impl StatisticsCollector {
let mut responses_error: usize = 0;
let mut bytes_received: usize = 0;
let mut bytes_sent: usize = 0;
let mut num_torrents: usize = 0;
let mut num_peers: usize = 0;
#[cfg(feature = "prometheus")]
let ip_version_prometheus_str = self.ip_version.prometheus_str();
@ -186,44 +184,37 @@ impl StatisticsCollector {
}
}
for (i, statistics) in self
.statistics
.swarm
.iter()
.map(|s| s.by_ip_version(self.ip_version))
.enumerate()
{
{
let n = statistics.torrents.load(Ordering::Relaxed);
let swarm_statistics = &self.statistics.swarm.by_ip_version(self.ip_version);
num_torrents += n;
let num_torrents = {
let num_torrents = swarm_statistics.torrents.load(Ordering::Relaxed);
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_torrents",
"ip_version" => ip_version_prometheus_str,
"worker_index" => i.to_string(),
)
.set(n as f64);
}
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_torrents",
"ip_version" => ip_version_prometheus_str,
)
.set(num_torrents as f64);
}
{
let n = statistics.peers.load(Ordering::Relaxed);
num_peers += n;
num_torrents
};
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_peers",
"ip_version" => ip_version_prometheus_str,
"worker_index" => i.to_string(),
)
.set(n as f64);
}
let num_peers = {
let num_peers = swarm_statistics.peers.load(Ordering::Relaxed);
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_peers",
"ip_version" => ip_version_prometheus_str,
)
.set(num_peers as f64);
}
}
num_peers
};
let elapsed = {
let now = Instant::now();