ws: refactor swarm worker storage code for improved readability

This commit is contained in:
Joakim Frostegård 2024-01-25 19:35:45 +01:00
parent 238cce9b16
commit e6e663761c
2 changed files with 152 additions and 143 deletions

View file

@ -21,9 +21,6 @@ use crate::SHARED_IN_CHANNEL_SIZE;
use self::storage::TorrentMaps;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
#[allow(clippy::too_many_arguments)]
pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
@ -35,9 +32,6 @@ pub async fn run_swarm_worker(
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let (_, mut control_message_receivers) = control_message_mesh_builder
.join(Role::Consumer)
.await
@ -48,7 +42,7 @@ pub async fn run_swarm_worker(
let out_message_senders = Rc::new(out_message_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index)));
let access_list = state.access_list;
// Periodically clean torrents

View file

@ -7,7 +7,6 @@ use aquatic_ws_protocol::outgoing::{
OutMessage, ScrapeResponse, ScrapeStatistics,
};
use hashbrown::HashMap;
use metrics::Gauge;
use rand::rngs::SmallRng;
use aquatic_common::{IndexMap, SecondsSinceServerStart, ServerStartInstant};
@ -16,50 +15,20 @@ use rand::Rng;
use crate::common::*;
use crate::config::Config;
use crate::workers::swarm::WORKER_INDEX;
type TorrentMap = IndexMap<InfoHash, TorrentData>;
type PeerMap = IndexMap<PeerId, Peer>;
pub struct TorrentMaps {
ipv4: TorrentMap,
ipv6: TorrentMap,
peers_gauge_ipv4: Gauge,
peers_gauge_ipv6: Gauge,
torrents_gauge_ipv4: Gauge,
torrents_gauge_ipv6: Gauge,
}
impl Default for TorrentMaps {
fn default() -> Self {
Self {
ipv4: Default::default(),
ipv6: Default::default(),
peers_gauge_ipv4: ::metrics::gauge!(
"aquatic_peers",
"ip_version" => "4",
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
),
peers_gauge_ipv6: ::metrics::gauge!(
"aquatic_peers",
"ip_version" => "6",
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
),
torrents_gauge_ipv4: ::metrics::gauge!(
"aquatic_torrents",
"ip_version" => "4",
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
),
torrents_gauge_ipv6: ::metrics::gauge!(
"aquatic_torrents",
"ip_version" => "6",
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
),
}
}
}
impl TorrentMaps {
pub fn new(worker_index: usize) -> Self {
Self {
ipv4: TorrentMap::new(worker_index, IpVersion::V4),
ipv6: TorrentMap::new(worker_index, IpVersion::V6),
}
}
pub fn handle_announce_request(
&mut self,
config: &Config,
@ -69,11 +38,121 @@ impl TorrentMaps {
request_sender_meta: InMessageMeta,
request: AnnounceRequest,
) {
let torrent_data = if let IpVersion::V4 = request_sender_meta.ip_version {
self.ipv4.entry(request.info_hash).or_default()
} else {
self.ipv6.entry(request.info_hash).or_default()
self.get_torrent_map_by_ip_version(request_sender_meta.ip_version)
.handle_announce_request(
config,
rng,
out_messages,
server_start_instant,
request_sender_meta,
request,
);
}
pub fn handle_scrape_request(
&mut self,
config: &Config,
out_messages: &mut Vec<(OutMessageMeta, OutMessage)>,
meta: InMessageMeta,
request: ScrapeRequest,
) {
self.get_torrent_map_by_ip_version(meta.ip_version)
.handle_scrape_request(config, out_messages, meta, request);
}
pub fn clean(
&mut self,
config: &Config,
access_list: &Arc<AccessListArcSwap>,
server_start_instant: ServerStartInstant,
) {
let mut access_list_cache = create_access_list_cache(access_list);
let now = server_start_instant.seconds_elapsed();
self.ipv4.clean(config, &mut access_list_cache, now);
self.ipv6.clean(config, &mut access_list_cache, now);
}
#[cfg(feature = "metrics")]
pub fn update_torrent_count_metrics(&self) {
self.ipv4.update_torrent_gauge();
self.ipv6.update_torrent_gauge();
}
pub fn handle_connection_closed(
&mut self,
info_hash: InfoHash,
peer_id: PeerId,
ip_version: IpVersion,
) {
self.get_torrent_map_by_ip_version(ip_version)
.handle_connection_closed(info_hash, peer_id);
}
fn get_torrent_map_by_ip_version(&mut self, ip_version: IpVersion) -> &mut TorrentMap {
match ip_version {
IpVersion::V4 => &mut self.ipv4,
IpVersion::V6 => &mut self.ipv6,
}
}
}
struct TorrentMap {
torrents: IndexMap<InfoHash, TorrentData>,
#[cfg(feature = "metrics")]
torrent_gauge: ::metrics::Gauge,
#[cfg(feature = "metrics")]
peer_gauge: ::metrics::Gauge,
}
impl TorrentMap {
pub fn new(worker_index: usize, ip_version: IpVersion) -> Self {
#[cfg(feature = "metrics")]
let peer_gauge = match ip_version {
IpVersion::V4 => ::metrics::gauge!(
"aquatic_peers",
"ip_version" => "4",
"worker_index" => worker_index.to_string(),
),
IpVersion::V6 => ::metrics::gauge!(
"aquatic_peers",
"ip_version" => "6",
"worker_index" => worker_index.to_string(),
),
};
#[cfg(feature = "metrics")]
let torrent_gauge = match ip_version {
IpVersion::V4 => ::metrics::gauge!(
"aquatic_torrents",
"ip_version" => "4",
"worker_index" => worker_index.to_string(),
),
IpVersion::V6 => ::metrics::gauge!(
"aquatic_torrents",
"ip_version" => "6",
"worker_index" => worker_index.to_string(),
),
};
Self {
torrents: Default::default(),
#[cfg(feature = "metrics")]
peer_gauge,
#[cfg(feature = "metrics")]
torrent_gauge,
}
}
pub fn handle_announce_request(
&mut self,
config: &Config,
rng: &mut SmallRng,
out_messages: &mut Vec<(OutMessageMeta, OutMessage)>,
server_start_instant: ServerStartInstant,
request_sender_meta: InMessageMeta,
request: AnnounceRequest,
) {
let torrent_data = self.torrents.entry(request.info_hash).or_default();
let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
@ -126,10 +205,7 @@ impl TorrentMaps {
}
#[cfg(feature = "metrics")]
match request_sender_meta.ip_version {
IpVersion::V4 => self.peers_gauge_ipv4.decrement(1.0),
IpVersion::V6 => self.peers_gauge_ipv6.decrement(1.0),
}
self.peer_gauge.decrement(1.0);
return;
}
@ -147,10 +223,7 @@ impl TorrentMaps {
entry.insert(peer);
#[cfg(feature = "metrics")]
match request_sender_meta.ip_version {
IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0),
IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0),
}
self.peer_gauge.increment(1.0)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
@ -166,10 +239,7 @@ impl TorrentMaps {
entry.insert(peer);
#[cfg(feature = "metrics")]
match request_sender_meta.ip_version {
IpVersion::V4 => self.peers_gauge_ipv4.increment(1.0),
IpVersion::V6 => self.peers_gauge_ipv6.increment(1.0),
}
self.peer_gauge.increment(1.0);
}
PeerStatus::Stopped => return,
},
@ -316,14 +386,8 @@ impl TorrentMaps {
files: HashMap::with_capacity(num_to_take),
};
let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version {
&mut self.ipv4
} else {
&mut self.ipv6
};
for info_hash in info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_map.get(&info_hash) {
if let Some(torrent_data) = self.torrents.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
@ -337,41 +401,33 @@ impl TorrentMaps {
out_messages.push((meta.into(), OutMessage::ScrapeResponse(out_message)));
}
pub fn clean(
&mut self,
config: &Config,
access_list: &Arc<AccessListArcSwap>,
server_start_instant: ServerStartInstant,
) {
let mut access_list_cache = create_access_list_cache(access_list);
let now = server_start_instant.seconds_elapsed();
Self::clean_torrent_map(
config,
&mut access_list_cache,
&mut self.ipv4,
now,
&self.peers_gauge_ipv4,
);
Self::clean_torrent_map(
config,
&mut access_list_cache,
&mut self.ipv6,
now,
&self.peers_gauge_ipv6,
);
pub fn handle_connection_closed(&mut self, info_hash: InfoHash, peer_id: PeerId) {
if let Some(torrent_data) = self.torrents.get_mut(&info_hash) {
if let Some(peer) = torrent_data.peers.remove(&peer_id) {
if peer.seeder {
torrent_data.num_seeders -= 1;
}
fn clean_torrent_map(
#[cfg(feature = "metrics")]
self.peer_gauge.decrement(1.0);
}
}
}
#[cfg(feature = "metrics")]
pub fn update_torrent_gauge(&self) {
self.torrent_gauge.set(self.torrents.len() as f64);
}
fn clean(
&mut self,
config: &Config,
access_list_cache: &mut AccessListCache,
torrent_map: &mut TorrentMap,
now: SecondsSinceServerStart,
peers_gauge: &Gauge,
) {
let mut total_num_peers = 0u64;
torrent_map.retain(|info_hash, torrent_data| {
self.torrents.retain(|info_hash, torrent_data| {
if !access_list_cache
.load()
.allows(config.access_list.mode, &info_hash.0)
@ -402,66 +458,23 @@ impl TorrentMaps {
!torrent_data.peers.is_empty()
});
torrent_map.shrink_to_fit();
self.torrents.shrink_to_fit();
#[cfg(feature = "metrics")]
peers_gauge.set(total_num_peers as f64)
}
self.peer_gauge.set(total_num_peers as f64);
#[cfg(feature = "metrics")]
pub fn update_torrent_count_metrics(&self) {
self.torrents_gauge_ipv4.set(self.ipv4.len() as f64);
self.torrents_gauge_ipv6.set(self.ipv6.len() as f64);
}
pub fn handle_connection_closed(
&mut self,
info_hash: InfoHash,
peer_id: PeerId,
ip_version: IpVersion,
) {
::log::debug!("Removing peer from torrents because connection was closed");
if let IpVersion::V4 = ip_version {
if let Some(torrent_data) = self.ipv4.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
#[cfg(feature = "metrics")]
self.peers_gauge_ipv4.decrement(1.0);
}
} else if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
#[cfg(feature = "metrics")]
self.peers_gauge_ipv6.decrement(1.0);
}
self.update_torrent_gauge();
}
}
#[derive(Default)]
struct TorrentData {
peers: PeerMap,
peers: IndexMap<PeerId, Peer>,
num_seeders: usize,
}
impl Default for TorrentData {
#[inline]
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
}
}
}
impl TorrentData {
fn remove_peer(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.remove(&peer_id) {
if peer.seeder {
self.num_seeders -= 1;
}
}
}
fn num_leechers(&self) -> usize {
self.peers.len() - self.num_seeders
}
@ -510,6 +523,8 @@ impl PeerStatus {
/// If there are more peers in map than `max_num_peers_to_take`, do a random
/// selection of peers from first and second halves of map in order to avoid
/// returning too homogeneous peers.
///
/// Filters out announcing peer.
#[inline]
pub fn extract_response_peers<K, V, R, F>(
rng: &mut impl Rng,