udp: swarm cleaning: send statistics messages after releasing locks

This commit is contained in:
Joakim Frostegård 2024-02-11 00:59:23 +01:00
parent 14c973f72f
commit f455e58251

View file

@ -98,26 +98,37 @@ impl TorrentMaps {
let mode = config.access_list.mode;
let now = server_start_instant.seconds_elapsed();
let ipv4 =
self.ipv4
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
let ipv6 =
self.ipv6
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
let mut statistics_messages = Vec::new();
let ipv4 = self.ipv4.clean_and_get_statistics(
config,
&mut statistics_messages,
&mut cache,
mode,
now,
);
let ipv6 = self.ipv6.clean_and_get_statistics(
config,
&mut statistics_messages,
&mut cache,
mode,
now,
);
if config.statistics.active() {
statistics.ipv4.torrents.store(ipv4.0, Ordering::Relaxed);
statistics.ipv6.torrents.store(ipv6.0, Ordering::Relaxed);
statistics.ipv4.peers.store(ipv4.1, Ordering::Relaxed);
statistics.ipv6.peers.store(ipv6.1, Ordering::Relaxed);
if let Some(message) = ipv4.2.map(StatisticsMessage::Ipv4PeerHistogram) {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err);
if let Some(message) = ipv4.2 {
statistics_messages.push(StatisticsMessage::Ipv4PeerHistogram(message));
}
if let Some(message) = ipv6.2 {
statistics_messages.push(StatisticsMessage::Ipv6PeerHistogram(message));
}
if let Some(message) = ipv6.2.map(StatisticsMessage::Ipv6PeerHistogram) {
for message in statistics_messages {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err);
}
@ -204,7 +215,7 @@ impl<I: Ip> TorrentMapShards<I> {
fn clean_and_get_statistics(
&self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
statistics_messages: &mut Vec<StatisticsMessage>,
access_list_cache: &mut AccessListCache,
access_list_mode: AccessListMode,
now: SecondsSinceServerStart,
@ -212,19 +223,10 @@ impl<I: Ip> TorrentMapShards<I> {
let mut total_num_torrents = 0;
let mut total_num_peers = 0;
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
{
match Histogram::new(3) {
Ok(histogram) => Some(histogram),
Err(err) => {
::log::error!("Couldn't create peer histogram: {:#}", err);
None
}
}
} else {
None
};
let mut opt_histogram: Option<Histogram<u64>> = config
.statistics
.torrent_peer_histograms
.then(|| Histogram::new(3).expect("create peer histogram"));
for torrent_map_shard in self.0.iter() {
for torrent_data in torrent_map_shard.read().values() {
@ -232,11 +234,14 @@ impl<I: Ip> TorrentMapShards<I> {
let num_peers = match peer_map.deref_mut() {
PeerMap::Small(small_peer_map) => {
small_peer_map.clean_and_get_num_peers(config, statistics_sender, now)
small_peer_map.clean_and_get_num_peers(config, statistics_messages, now)
}
PeerMap::Large(large_peer_map) => {
let num_peers =
large_peer_map.clean_and_get_num_peers(config, statistics_sender, now);
let num_peers = large_peer_map.clean_and_get_num_peers(
config,
statistics_messages,
now,
);
if let Some(small_peer_map) = large_peer_map.try_shrink() {
*peer_map = PeerMap::Small(small_peer_map);
@ -248,22 +253,20 @@ impl<I: Ip> TorrentMapShards<I> {
drop(peer_map);
match opt_histogram {
Some(ref mut histogram) if num_peers > 0 => {
let n = num_peers.try_into().expect("Couldn't fit usize into u64");
if let Err(err) = histogram.record(n) {
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
match opt_histogram.as_mut() {
Some(histogram) if num_peers > 0 => {
if let Err(err) = histogram.record(num_peers as u64) {
::log::error!("Couldn't record {} to histogram: {:#}", num_peers, err);
}
}
_ => (),
}
total_num_peers += num_peers;
torrent_data
.pending_removal
.store(num_peers == 0, Ordering::Release);
total_num_peers += num_peers;
}
let mut torrent_map_shard = torrent_map_shard.write();
@ -509,20 +512,14 @@ impl<I: Ip> SmallPeerMap<I> {
fn clean_and_get_num_peers(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
statistics_messages: &mut Vec<StatisticsMessage>,
now: SecondsSinceServerStart,
) -> usize {
self.0.retain(|(_, peer)| {
let keep = peer.valid_until.valid(now);
if !keep
&& config.statistics.peer_clients
&& statistics_sender
.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
.is_err()
{
// Should never happen in practice
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
if !keep && config.statistics.peer_clients {
statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id));
}
keep
@ -621,7 +618,7 @@ impl<I: Ip> LargePeerMap<I> {
fn clean_and_get_num_peers(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
statistics_messages: &mut Vec<StatisticsMessage>,
now: SecondsSinceServerStart,
) -> usize {
self.peers.retain(|_, peer| {
@ -631,13 +628,8 @@ impl<I: Ip> LargePeerMap<I> {
if peer.is_seeder {
self.num_seeders -= 1;
}
if config.statistics.peer_clients
&& statistics_sender
.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
.is_err()
{
// Should never happen in practice
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
if config.statistics.peer_clients {
statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id));
}
}