diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs index f26c34d..f885b97 100644 --- a/crates/ws/src/workers/swarm/mod.rs +++ b/crates/ws/src/workers/swarm/mod.rs @@ -21,9 +21,6 @@ use crate::SHARED_IN_CHANNEL_SIZE; use self::storage::TorrentMaps; -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - #[allow(clippy::too_many_arguments)] pub async fn run_swarm_worker( _sentinel: PanicSentinel, @@ -35,9 +32,6 @@ pub async fn run_swarm_worker( server_start_instant: ServerStartInstant, worker_index: usize, ) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) .await @@ -48,7 +42,7 @@ pub async fn run_swarm_worker( let out_message_senders = Rc::new(out_message_senders); - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index))); let access_list = state.access_list; // Periodically clean torrents diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 9eff053..5ef51b5 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -7,7 +7,6 @@ use aquatic_ws_protocol::outgoing::{ OutMessage, ScrapeResponse, ScrapeStatistics, }; use hashbrown::HashMap; -use metrics::Gauge; use rand::rngs::SmallRng; use aquatic_common::{IndexMap, SecondsSinceServerStart, ServerStartInstant}; @@ -16,50 +15,20 @@ use rand::Rng; use crate::common::*; use crate::config::Config; -use crate::workers::swarm::WORKER_INDEX; - -type TorrentMap = IndexMap; -type PeerMap = IndexMap; pub struct TorrentMaps { ipv4: TorrentMap, ipv6: TorrentMap, - peers_gauge_ipv4: Gauge, - peers_gauge_ipv6: Gauge, - torrents_gauge_ipv4: Gauge, - torrents_gauge_ipv6: Gauge, -} - -impl Default for TorrentMaps { - fn default() -> Self { - Self { - ipv4: Default::default(), - ipv6: Default::default(), - peers_gauge_ipv4: ::metrics::gauge!( - "aquatic_peers", - "ip_version" => "4", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - peers_gauge_ipv6: ::metrics::gauge!( - "aquatic_peers", - "ip_version" => "6", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - torrents_gauge_ipv4: ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => "4", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - torrents_gauge_ipv6: ::metrics::gauge!( - "aquatic_torrents", - "ip_version" => "6", - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ), - } - } } impl TorrentMaps { + pub fn new(worker_index: usize) -> Self { + Self { + ipv4: TorrentMap::new(worker_index, IpVersion::V4), + ipv6: TorrentMap::new(worker_index, IpVersion::V6), + } + } + pub fn handle_announce_request( &mut self, config: &Config, @@ -69,11 +38,121 @@ impl TorrentMaps { request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { - let torrent_data = if let IpVersion::V4 = request_sender_meta.ip_version { - self.ipv4.entry(request.info_hash).or_default() - } else { - self.ipv6.entry(request.info_hash).or_default() + self.get_torrent_map_by_ip_version(request_sender_meta.ip_version) + .handle_announce_request( + config, + rng, + out_messages, + server_start_instant, + request_sender_meta, + request, + ); + } + + pub fn handle_scrape_request( + &mut self, + config: &Config, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + meta: InMessageMeta, + request: ScrapeRequest, + ) { + self.get_torrent_map_by_ip_version(meta.ip_version) + .handle_scrape_request(config, out_messages, meta, request); + } + + pub fn clean( + &mut self, + config: &Config, + access_list: &Arc, + server_start_instant: ServerStartInstant, + ) { + let mut access_list_cache = create_access_list_cache(access_list); + let now = server_start_instant.seconds_elapsed(); + + self.ipv4.clean(config, &mut access_list_cache, now); + self.ipv6.clean(config, &mut access_list_cache, now); + } + + #[cfg(feature = "metrics")] + pub fn update_torrent_count_metrics(&self) { + self.ipv4.update_torrent_gauge(); + self.ipv6.update_torrent_gauge(); + } + + pub fn handle_connection_closed( + &mut self, + info_hash: InfoHash, + peer_id: PeerId, + ip_version: IpVersion, + ) { + self.get_torrent_map_by_ip_version(ip_version) + .handle_connection_closed(info_hash, peer_id); + } + + fn get_torrent_map_by_ip_version(&mut self, ip_version: IpVersion) -> &mut TorrentMap { + match ip_version { + IpVersion::V4 => &mut self.ipv4, + IpVersion::V6 => &mut self.ipv6, + } + } +} + +struct TorrentMap { + torrents: IndexMap, + #[cfg(feature = "metrics")] + torrent_gauge: ::metrics::Gauge, + #[cfg(feature = "metrics")] + peer_gauge: ::metrics::Gauge, +} + +impl TorrentMap { + pub fn new(worker_index: usize, ip_version: IpVersion) -> Self { + #[cfg(feature = "metrics")] + let peer_gauge = match ip_version { + IpVersion::V4 => ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ), + IpVersion::V6 => ::metrics::gauge!( + "aquatic_peers", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ), }; + #[cfg(feature = "metrics")] + let torrent_gauge = match ip_version { + IpVersion::V4 => ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "4", + "worker_index" => worker_index.to_string(), + ), + IpVersion::V6 => ::metrics::gauge!( + "aquatic_torrents", + "ip_version" => "6", + "worker_index" => worker_index.to_string(), + ), + }; + + Self { + torrents: Default::default(), + #[cfg(feature = "metrics")] + peer_gauge, + #[cfg(feature = "metrics")] + torrent_gauge, + } + } + + pub fn handle_announce_request( + &mut self, + config: &Config, + rng: &mut SmallRng, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + server_start_instant: ServerStartInstant, + request_sender_meta: InMessageMeta, + request: AnnounceRequest, + ) { + let torrent_data = self.torrents.entry(request.info_hash).or_default(); let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); @@ -126,10 +205,7 @@ impl TorrentMaps { } #[cfg(feature = "metrics")] - match request_sender_meta.ip_version { - IpVersion::V4 => self.peers_gauge_ipv4.decrement(1.0), - IpVersion::V6 => self.peers_gauge_ipv6.decrement(1.0), - } + self.peer_gauge.decrement(1.0); return; } @@ -147,10 +223,7 @@ impl TorrentMaps { entry.insert(peer); #[cfg(feature = "metrics")] - match request_sender_meta.ip_version { - IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0), - IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0), - } + self.peer_gauge.increment(1.0) } PeerStatus::Seeding => { torrent_data.num_seeders += 1; @@ -166,10 +239,7 @@ impl TorrentMaps { entry.insert(peer); #[cfg(feature = "metrics")] - match request_sender_meta.ip_version { - IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0), - IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0), - } + self.peer_gauge.increment(1.0); } PeerStatus::Stopped => return, }, @@ -316,14 +386,8 @@ impl TorrentMaps { files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version { - &mut self.ipv4 - } else { - &mut self.ipv6 - }; - for info_hash in info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = torrent_map.get(&info_hash) { + if let Some(torrent_data) = self.torrents.get(&info_hash) { let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned @@ -337,41 +401,33 @@ impl TorrentMaps { out_messages.push((meta.into(), OutMessage::ScrapeResponse(out_message))); } - pub fn clean( - &mut self, - config: &Config, - access_list: &Arc, - server_start_instant: ServerStartInstant, - ) { - let mut access_list_cache = create_access_list_cache(access_list); - let now = server_start_instant.seconds_elapsed(); + pub fn handle_connection_closed(&mut self, info_hash: InfoHash, peer_id: PeerId) { + if let Some(torrent_data) = self.torrents.get_mut(&info_hash) { + if let Some(peer) = torrent_data.peers.remove(&peer_id) { + if peer.seeder { + torrent_data.num_seeders -= 1; + } - Self::clean_torrent_map( - config, - &mut access_list_cache, - &mut self.ipv4, - now, - &self.peers_gauge_ipv4, - ); - Self::clean_torrent_map( - config, - &mut access_list_cache, - &mut self.ipv6, - now, - &self.peers_gauge_ipv6, - ); + #[cfg(feature = "metrics")] + self.peer_gauge.decrement(1.0); + } + } } - fn clean_torrent_map( + #[cfg(feature = "metrics")] + pub fn update_torrent_gauge(&self) { + self.torrent_gauge.set(self.torrents.len() as f64); + } + + fn clean( + &mut self, config: &Config, access_list_cache: &mut AccessListCache, - torrent_map: &mut TorrentMap, now: SecondsSinceServerStart, - peers_gauge: &Gauge, ) { let mut total_num_peers = 0u64; - torrent_map.retain(|info_hash, torrent_data| { + self.torrents.retain(|info_hash, torrent_data| { if !access_list_cache .load() .allows(config.access_list.mode, &info_hash.0) @@ -402,66 +458,23 @@ impl TorrentMaps { !torrent_data.peers.is_empty() }); - torrent_map.shrink_to_fit(); + self.torrents.shrink_to_fit(); #[cfg(feature = "metrics")] - peers_gauge.set(total_num_peers as f64) - } + self.peer_gauge.set(total_num_peers as f64); - #[cfg(feature = "metrics")] - pub fn update_torrent_count_metrics(&self) { - self.torrents_gauge_ipv4.set(self.ipv4.len() as f64); - self.torrents_gauge_ipv6.set(self.ipv6.len() as f64); - } - - pub fn handle_connection_closed( - &mut self, - info_hash: InfoHash, - peer_id: PeerId, - ip_version: IpVersion, - ) { - ::log::debug!("Removing peer from torrents because connection was closed"); - - if let IpVersion::V4 = ip_version { - if let Some(torrent_data) = self.ipv4.get_mut(&info_hash) { - torrent_data.remove_peer(peer_id); - - #[cfg(feature = "metrics")] - self.peers_gauge_ipv4.decrement(1.0); - } - } else if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) { - torrent_data.remove_peer(peer_id); - - #[cfg(feature = "metrics")] - self.peers_gauge_ipv6.decrement(1.0); - } + #[cfg(feature = "metrics")] + self.update_torrent_gauge(); } } +#[derive(Default)] struct TorrentData { - peers: PeerMap, + peers: IndexMap, num_seeders: usize, } -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - } - } -} - impl TorrentData { - fn remove_peer(&mut self, peer_id: PeerId) { - if let Some(peer) = self.peers.remove(&peer_id) { - if peer.seeder { - self.num_seeders -= 1; - } - } - } - fn num_leechers(&self) -> usize { self.peers.len() - self.num_seeders } @@ -510,6 +523,8 @@ impl PeerStatus { /// If there are more peers in map than `max_num_peers_to_take`, do a random /// selection of peers from first and second halves of map in order to avoid /// returning too homogeneous peers. +/// +/// Filters out announcing peer. #[inline] pub fn extract_response_peers( rng: &mut impl Rng,