From f455e5825114a3f72e8d28ce8876e70504f46c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 11 Feb 2024 00:59:23 +0100 Subject: [PATCH] udp: swarm cleaning: send statistics messages after releasing locks --- crates/udp/src/swarm.rs | 100 ++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 54 deletions(-) diff --git a/crates/udp/src/swarm.rs b/crates/udp/src/swarm.rs index 6a5e327..c9523a2 100644 --- a/crates/udp/src/swarm.rs +++ b/crates/udp/src/swarm.rs @@ -98,26 +98,37 @@ impl TorrentMaps { let mode = config.access_list.mode; let now = server_start_instant.seconds_elapsed(); - let ipv4 = - self.ipv4 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); - let ipv6 = - self.ipv6 - .clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now); + let mut statistics_messages = Vec::new(); + + let ipv4 = self.ipv4.clean_and_get_statistics( + config, + &mut statistics_messages, + &mut cache, + mode, + now, + ); + let ipv6 = self.ipv6.clean_and_get_statistics( + config, + &mut statistics_messages, + &mut cache, + mode, + now, + ); if config.statistics.active() { statistics.ipv4.torrents.store(ipv4.0, Ordering::Relaxed); statistics.ipv6.torrents.store(ipv6.0, Ordering::Relaxed); - statistics.ipv4.peers.store(ipv4.1, Ordering::Relaxed); statistics.ipv6.peers.store(ipv6.1, Ordering::Relaxed); - if let Some(message) = ipv4.2.map(StatisticsMessage::Ipv4PeerHistogram) { - if let Err(err) = statistics_sender.try_send(message) { - ::log::error!("couldn't send statistics message: {:#}", err); - } + if let Some(message) = ipv4.2 { + statistics_messages.push(StatisticsMessage::Ipv4PeerHistogram(message)); } - if let Some(message) = ipv6.2.map(StatisticsMessage::Ipv6PeerHistogram) { + if let Some(message) = ipv6.2 { + statistics_messages.push(StatisticsMessage::Ipv6PeerHistogram(message)); + } + + for message in statistics_messages { if let Err(err) = statistics_sender.try_send(message) { ::log::error!("couldn't send statistics message: {:#}", err); } @@ -204,7 +215,7 @@ impl TorrentMapShards { fn clean_and_get_statistics( &self, config: &Config, - statistics_sender: &Sender, + statistics_messages: &mut Vec, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, now: SecondsSinceServerStart, @@ -212,19 +223,10 @@ impl TorrentMapShards { let mut total_num_torrents = 0; let mut total_num_peers = 0; - let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms - { - match Histogram::new(3) { - Ok(histogram) => Some(histogram), - Err(err) => { - ::log::error!("Couldn't create peer histogram: {:#}", err); - - None - } - } - } else { - None - }; + let mut opt_histogram: Option> = config + .statistics + .torrent_peer_histograms + .then(|| Histogram::new(3).expect("create peer histogram")); for torrent_map_shard in self.0.iter() { for torrent_data in torrent_map_shard.read().values() { @@ -232,11 +234,14 @@ impl TorrentMapShards { let num_peers = match peer_map.deref_mut() { PeerMap::Small(small_peer_map) => { - small_peer_map.clean_and_get_num_peers(config, statistics_sender, now) + small_peer_map.clean_and_get_num_peers(config, statistics_messages, now) } PeerMap::Large(large_peer_map) => { - let num_peers = - large_peer_map.clean_and_get_num_peers(config, statistics_sender, now); + let num_peers = large_peer_map.clean_and_get_num_peers( + config, + statistics_messages, + now, + ); if let Some(small_peer_map) = large_peer_map.try_shrink() { *peer_map = PeerMap::Small(small_peer_map); @@ -248,22 +253,20 @@ impl TorrentMapShards { drop(peer_map); - match opt_histogram { - Some(ref mut histogram) if num_peers > 0 => { - let n = num_peers.try_into().expect("Couldn't fit usize into u64"); - - if let Err(err) = histogram.record(n) { - ::log::error!("Couldn't record {} to histogram: {:#}", n, err); + match opt_histogram.as_mut() { + Some(histogram) if num_peers > 0 => { + if let Err(err) = histogram.record(num_peers as u64) { + ::log::error!("Couldn't record {} to histogram: {:#}", num_peers, err); } } _ => (), } + total_num_peers += num_peers; + torrent_data .pending_removal .store(num_peers == 0, Ordering::Release); - - total_num_peers += num_peers; } let mut torrent_map_shard = torrent_map_shard.write(); @@ -509,20 +512,14 @@ impl SmallPeerMap { fn clean_and_get_num_peers( &mut self, config: &Config, - statistics_sender: &Sender, + statistics_messages: &mut Vec, now: SecondsSinceServerStart, ) -> usize { self.0.retain(|(_, peer)| { let keep = peer.valid_until.valid(now); - if !keep - && config.statistics.peer_clients - && statistics_sender - .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - .is_err() - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + if !keep && config.statistics.peer_clients { + statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id)); } keep @@ -621,7 +618,7 @@ impl LargePeerMap { fn clean_and_get_num_peers( &mut self, config: &Config, - statistics_sender: &Sender, + statistics_messages: &mut Vec, now: SecondsSinceServerStart, ) -> usize { self.peers.retain(|_, peer| { @@ -631,13 +628,8 @@ impl LargePeerMap { if peer.is_seeder { self.num_seeders -= 1; } - if config.statistics.peer_clients - && statistics_sender - .try_send(StatisticsMessage::PeerRemoved(peer.peer_id)) - .is_err() - { - // Should never happen in practice - ::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); + if config.statistics.peer_clients { + statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id)); } }