From fcf18c845fe653ef077ee4c2346c3b5d6268cfd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Aug 2022 14:15:06 +0200 Subject: [PATCH] Reduce ValidUntil size; reduce size of various ws structs --- aquatic_common/src/lib.rs | 43 +++-- aquatic_http/src/config.rs | 4 +- aquatic_http/src/lib.rs | 15 +- aquatic_http/src/workers/socket.rs | 25 ++- aquatic_http/src/workers/swarm.rs | 31 ++-- aquatic_http_private/src/config.rs | 2 +- aquatic_http_private/src/lib.rs | 12 +- .../src/workers/swarm/common.rs | 17 +- aquatic_http_private/src/workers/swarm/mod.rs | 19 ++- aquatic_udp/src/config.rs | 4 +- aquatic_udp/src/lib.rs | 6 +- aquatic_udp/src/workers/socket/mod.rs | 17 +- aquatic_udp/src/workers/socket/storage.rs | 12 +- aquatic_udp/src/workers/swarm/mod.rs | 12 +- aquatic_udp/src/workers/swarm/storage.rs | 14 +- aquatic_udp_bench/src/main.rs | 5 +- aquatic_ws/src/common.rs | 25 ++- aquatic_ws/src/config.rs | 4 +- aquatic_ws/src/lib.rs | 17 +- aquatic_ws/src/workers/socket.rs | 99 +++++++----- aquatic_ws/src/workers/swarm.rs | 153 ++++++++++-------- 21 files changed, 343 insertions(+), 193 deletions(-) diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 1f08cc5..b23a71b 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,7 +1,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use ahash::RandomState; use rand::Rng; @@ -16,23 +16,46 @@ pub mod rustls_config; /// Amortized IndexMap using AHash hasher pub type AmortizedIndexMap = indexmap_amortized::IndexMap; -/// Peer or connection valid until this instant -/// -/// Used instead of "last seen" or similar to hopefully prevent arithmetic -/// overflow when cleaning. +/// Peer, connection or similar valid until this instant #[derive(Debug, Clone, Copy)] -pub struct ValidUntil(pub Instant); +pub struct ValidUntil(SecondsSinceServerStart); impl ValidUntil { #[inline] - pub fn new(offset_seconds: u64) -> Self { - Self(Instant::now() + Duration::from_secs(offset_seconds)) + pub fn new(start_instant: ServerStartInstant, offset_seconds: u32) -> Self { + Self(SecondsSinceServerStart( + start_instant.seconds_elapsed().0 + offset_seconds, + )) } - pub fn new_with_now(now: Instant, offset_seconds: u64) -> Self { - Self(now + Duration::from_secs(offset_seconds)) + pub fn new_with_now(now: SecondsSinceServerStart, offset_seconds: u32) -> Self { + Self(SecondsSinceServerStart(now.0 + offset_seconds)) + } + pub fn valid(&self, now: SecondsSinceServerStart) -> bool { + self.0 .0 > now.0 } } +#[derive(Debug, Clone, Copy)] +pub struct ServerStartInstant(Instant); + +impl ServerStartInstant { + pub fn new() -> Self { + Self(Instant::now()) + } + pub fn seconds_elapsed(&self) -> SecondsSinceServerStart { + SecondsSinceServerStart( + self.0 + .elapsed() + .as_secs() + .try_into() + .expect("server ran for more seconds than what fits in a u32"), + ) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SecondsSinceServerStart(u32); + pub struct PanicSentinelWatcher(Arc); impl PanicSentinelWatcher { diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 95bc281..995d51c 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -112,9 +112,9 @@ pub struct CleaningConfig { /// Clean connections this often (seconds) pub connection_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, /// Remove connections that haven't seen valid requests for this long (seconds) - pub max_connection_idle: u64, + pub max_connection_idle: u32, } impl Default for CleaningConfig { diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 328ada9..8b82c4f 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -6,7 +6,7 @@ use aquatic_common::{ }, privileges::PrivilegeDropper, rustls_config::create_rustls_config, - PanicSentinelWatcher, + PanicSentinelWatcher, ServerStartInstant, }; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; @@ -46,6 +46,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { &config.network.tls_private_key_path, )?); + let server_start_instant = ServerStartInstant::new(); + let mut executors = Vec::new(); for i in 0..(config.socket_workers) { @@ -73,6 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { tls_config, request_mesh_builder, priv_dropper, + server_start_instant, ) .await }) @@ -97,8 +100,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let executor = builder .spawn(move || async move { - workers::swarm::run_swarm_worker(sentinel, config, state, request_mesh_builder) - .await + workers::swarm::run_swarm_worker( + sentinel, + config, + state, + request_mesh_builder, + server_start_instant, + ) + .await }) .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index f47bb33..d99062a 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -3,13 +3,13 @@ use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ @@ -59,6 +59,7 @@ pub async fn run_socket_worker( tls_config: Arc, request_mesh_builder: MeshBuilder, priv_dropper: PrivilegeDropper, + server_start_instant: ServerStartInstant, ) { let config = Rc::new(config); let access_list = state.access_list; @@ -74,6 +75,7 @@ pub async fn run_socket_worker( clean_connections( config.clone(), connection_slab.clone(), + server_start_instant, ) })); @@ -84,7 +86,10 @@ pub async fn run_socket_worker( Ok(stream) => { let key = connection_slab.borrow_mut().insert(ConnectionReference { task_handle: None, - valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + valid_until: ValidUntil::new( + server_start_instant, + config.cleaning.max_connection_idle, + ), }); let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { @@ -92,6 +97,7 @@ pub async fn run_socket_worker( config, access_list, request_senders, + server_start_instant, ConnectionId(key), tls_config, connection_slab.clone(), @@ -118,11 +124,12 @@ pub async fn run_socket_worker( async fn clean_connections( config: Rc, connection_slab: Rc>>, + server_start_instant: ServerStartInstant, ) -> Option { - let now = Instant::now(); + let now = server_start_instant.seconds_elapsed(); connection_slab.borrow_mut().retain(|_, reference| { - if reference.valid_until.0 > now { + if reference.valid_until.valid(now) { true } else { if let Some(ref handle) = reference.task_handle { @@ -145,6 +152,7 @@ struct Connection { access_list_cache: AccessListCache, request_senders: Rc>, connection_slab: Rc>>, + server_start_instant: ServerStartInstant, stream: TlsStream, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, @@ -158,6 +166,7 @@ impl Connection { config: Rc, access_list: Arc, request_senders: Rc>, + server_start_instant: ServerStartInstant, connection_id: ConnectionId, tls_config: Arc, connection_slab: Rc>>, @@ -180,6 +189,7 @@ impl Connection { access_list_cache: create_access_list_cache(&access_list), request_senders: request_senders.clone(), connection_slab, + server_start_instant, stream, peer_addr, connection_id, @@ -271,7 +281,10 @@ impl Connection { async fn handle_request(&mut self, request: Request) -> anyhow::Result { if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { if let Some(reference) = slab.get_mut(self.connection_id.0) { - reference.valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + reference.valid_until = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); } } diff --git a/aquatic_http/src/workers/swarm.rs b/aquatic_http/src/workers/swarm.rs index c952938..cbfef5c 100644 --- a/aquatic_http/src/workers/swarm.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -4,7 +4,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::rc::Rc; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use either::Either; use futures_lite::{Stream, StreamExt}; @@ -17,9 +16,9 @@ use rand::SeedableRng; use smartstring::{LazyCompact, SmartString}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::ValidUntil; use aquatic_common::{extract_response_peers, PanicSentinel}; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; +use aquatic_common::{SecondsSinceServerStart, ServerStartInstant, ValidUntil}; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::*; use aquatic_http_protocol::response::ResponsePeer; @@ -107,20 +106,26 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean( + &mut self, + config: &Config, + access_list: &Arc, + server_start_instant: ServerStartInstant, + ) { let mut access_list_cache = create_access_list_cache(access_list); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + let now = server_start_instant.seconds_elapsed(); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); } fn clean_torrent_map( config: &Config, access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, + now: SecondsSinceServerStart, ) { - let now = Instant::now(); - torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -133,7 +138,7 @@ impl TorrentMaps { let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; + let keep = peer.valid_until.valid(now); if !keep { match peer.status { @@ -162,6 +167,7 @@ pub async fn run_swarm_worker( config: Config, state: State, request_mesh_builder: MeshBuilder, + server_start_instant: ServerStartInstant, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); @@ -171,19 +177,22 @@ pub async fn run_swarm_worker( // Periodically clean torrents TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - torrents.borrow_mut().clean(&config, &access_list); + torrents.borrow_mut().clean(&config, &access_list, server_start_instant); Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() })); let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new( + server_start_instant, + max_peer_age, + ))); // Periodically update peer_valid_until TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + *peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age); Some(Duration::from_secs(1)) })() diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index d8bd496..d678956 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -99,7 +99,7 @@ pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, } impl Default for CleaningConfig { diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index e10ee83..4193025 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -6,6 +6,7 @@ use std::{collections::VecDeque, sync::Arc}; use aquatic_common::{ privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher, + ServerStartInstant, }; use common::ChannelRequestSender; use dotenv::dotenv; @@ -40,6 +41,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let server_start_instant = ServerStartInstant::new(); + let mut handles = Vec::new(); for _ in 0..config.socket_workers { @@ -71,7 +74,14 @@ pub fn run(config: Config) -> anyhow::Result<()> { let handle = ::std::thread::Builder::new() .name("request".into()) - .spawn(move || workers::swarm::run_swarm_worker(sentinel, config, request_receiver))?; + .spawn(move || { + workers::swarm::run_swarm_worker( + sentinel, + config, + request_receiver, + server_start_instant, + ) + })?; handles.push(handle); } diff --git a/aquatic_http_private/src/workers/swarm/common.rs b/aquatic_http_private/src/workers/swarm/common.rs index abbcbc3..4fc6a12 100644 --- a/aquatic_http_private/src/workers/swarm/common.rs +++ b/aquatic_http_private/src/workers/swarm/common.rs @@ -1,7 +1,6 @@ use std::net::{Ipv4Addr, Ipv6Addr}; -use std::time::Instant; -use aquatic_common::{AmortizedIndexMap, ValidUntil}; +use aquatic_common::{AmortizedIndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil}; use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId}; use aquatic_http_protocol::response::ResponsePeer; @@ -84,20 +83,20 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self) { - Self::clean_torrent_map(&mut self.ipv4); - Self::clean_torrent_map(&mut self.ipv6); + pub fn clean(&mut self, server_start_instant: ServerStartInstant) { + let now = server_start_instant.seconds_elapsed(); + + Self::clean_torrent_map(&mut self.ipv4, now); + Self::clean_torrent_map(&mut self.ipv6, now); } - fn clean_torrent_map(torrent_map: &mut TorrentMap) { - let now = Instant::now(); - + fn clean_torrent_map(torrent_map: &mut TorrentMap, now: SecondsSinceServerStart) { torrent_map.retain(|_, torrent_data| { let num_seeders = &mut torrent_data.num_seeders; let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { - if peer.valid_until.0 >= now { + if peer.valid_until.valid(now) { true } else { match peer.status { diff --git a/aquatic_http_private/src/workers/swarm/mod.rs b/aquatic_http_private/src/workers/swarm/mod.rs index 121c34d..45fb5fb 100644 --- a/aquatic_http_private/src/workers/swarm/mod.rs +++ b/aquatic_http_private/src/workers/swarm/mod.rs @@ -11,7 +11,9 @@ use tokio::sync::mpsc::Receiver; use tokio::task::LocalSet; use tokio::time; -use aquatic_common::{extract_response_peers, CanonicalSocketAddr, PanicSentinel, ValidUntil}; +use aquatic_common::{ + extract_response_peers, CanonicalSocketAddr, PanicSentinel, ServerStartInstant, ValidUntil, +}; use aquatic_http_protocol::response::{ AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6, }; @@ -25,12 +27,13 @@ pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, request_receiver: Receiver, + server_start_instant: ServerStartInstant, ) -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - runtime.block_on(run_inner(config, request_receiver))?; + runtime.block_on(run_inner(config, request_receiver, server_start_instant))?; Ok(()) } @@ -38,6 +41,7 @@ pub fn run_swarm_worker( async fn run_inner( config: Config, mut request_receiver: Receiver, + server_start_instant: ServerStartInstant, ) -> anyhow::Result<()> { let torrents = Rc::new(RefCell::new(TorrentMaps::default())); let mut rng = SmallRng::from_entropy(); @@ -45,6 +49,7 @@ async fn run_inner( LocalSet::new().spawn_local(periodically_clean_torrents( config.clone(), torrents.clone(), + server_start_instant, )); loop { @@ -53,7 +58,7 @@ async fn run_inner( .await .ok_or_else(|| anyhow::anyhow!("request channel closed"))?; - let valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); let response = handle_announce_request( &config, @@ -68,7 +73,11 @@ async fn run_inner( } } -async fn periodically_clean_torrents(config: Config, torrents: Rc>) { +async fn periodically_clean_torrents( + config: Config, + torrents: Rc>, + server_start_instant: ServerStartInstant, +) { let mut interval = time::interval(time::Duration::from_secs( config.cleaning.torrent_cleaning_interval, )); @@ -76,7 +85,7 @@ async fn periodically_clean_torrents(config: Config, torrents: Rc ::anyhow::Result<()> { let mut response_senders = Vec::new(); let mut response_receivers = BTreeMap::new(); + let server_start_instant = ServerStartInstant::new(); + for i in 0..config.swarm_workers { let (request_sender, request_receiver) = if config.worker_channel_size == 0 { unbounded() @@ -85,6 +87,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, + server_start_instant, request_receiver, response_sender, SwarmWorkerIndex(i), @@ -120,6 +123,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { config, i, connection_validator, + server_start_instant, request_sender, response_receiver, priv_dropper, diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index fd45296..6daeeb2 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -6,6 +6,7 @@ pub mod validator; use std::time::{Duration, Instant}; use anyhow::Context; +use aquatic_common::ServerStartInstant; use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -31,6 +32,7 @@ pub fn run_socket_worker( config: Config, token_num: usize, mut connection_validator: ConnectionValidator, + server_start_instant: ServerStartInstant, request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, priv_dropper: PrivilegeDropper, @@ -59,7 +61,8 @@ pub fn run_socket_worker( let pending_scrape_cleaning_duration = Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); - let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); + let mut pending_scrape_valid_until = + ValidUntil::new(server_start_instant, config.cleaning.max_pending_scrape_age); let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; @@ -100,13 +103,17 @@ pub fn run_socket_worker( // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { + let seconds_since_start = server_start_instant.seconds_elapsed(); + + pending_scrape_valid_until = ValidUntil::new_with_now( + seconds_since_start, + config.cleaning.max_pending_scrape_age, + ); + let now = Instant::now(); - pending_scrape_valid_until = - ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age); - if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { - pending_scrape_responses.clean(); + pending_scrape_responses.clean(seconds_since_start); last_pending_scrape_cleaning = now; } diff --git a/aquatic_udp/src/workers/socket/storage.rs b/aquatic_udp/src/workers/socket/storage.rs index ee5ec54..8aed47c 100644 --- a/aquatic_udp/src/workers/socket/storage.rs +++ b/aquatic_udp/src/workers/socket/storage.rs @@ -1,10 +1,9 @@ use std::collections::BTreeMap; -use std::time::Instant; use hashbrown::HashMap; use slab::Slab; -use aquatic_common::ValidUntil; +use aquatic_common::{SecondsSinceServerStart, ValidUntil}; use aquatic_udp_protocol::*; use crate::common::*; @@ -97,11 +96,9 @@ impl PendingScrapeResponseSlab { } } - pub fn clean(&mut self) { - let now = Instant::now(); - + pub fn clean(&mut self, now: SecondsSinceServerStart) { self.0.retain(|k, v| { - if v.valid_until.0 > now { + if v.valid_until.valid(now) { true } else { ::log::warn!( @@ -120,6 +117,7 @@ impl PendingScrapeResponseSlab { #[cfg(test)] mod tests { + use aquatic_common::ServerStartInstant; use quickcheck::TestResult; use quickcheck_macros::quickcheck; @@ -138,7 +136,7 @@ mod tests { config.swarm_workers = swarm_workers as usize; - let valid_until = ValidUntil::new(1); + let valid_until = ValidUntil::new(ServerStartInstant::new(), 1); let mut map = PendingScrapeResponseSlab::default(); diff --git a/aquatic_udp/src/workers/swarm/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs index b2cc7ed..f80f192 100644 --- a/aquatic_udp/src/workers/swarm/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; +use aquatic_common::ServerStartInstant; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; @@ -21,6 +22,7 @@ pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, + server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, response_sender: ConnectedResponseSender, worker_index: SwarmWorkerIndex, @@ -29,7 +31,7 @@ pub fn run_swarm_worker( let mut rng = SmallRng::from_entropy(); let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let mut peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); let statistics_update_interval = Duration::from_secs(config.statistics.interval); @@ -81,10 +83,14 @@ pub fn run_swarm_worker( if iter_counter % 128 == 0 { let now = Instant::now(); - peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age); + peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - let (ipv4, ipv6) = torrents.clean_and_get_num_peers(&config, &state.access_list); + let (ipv4, ipv6) = torrents.clean_and_get_num_peers( + &config, + &state.access_list, + server_start_instant, + ); if config.statistics.active() { state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release); diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 75bc311..35d6392 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -1,8 +1,9 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::sync::Arc; -use std::time::Instant; +use aquatic_common::SecondsSinceServerStart; +use aquatic_common::ServerStartInstant; use aquatic_common::{ access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, extract_response_peers, AmortizedIndexMap, ValidUntil, @@ -99,9 +100,9 @@ impl TorrentData { } /// Remove inactive peers and reclaim space - fn clean(&mut self, now: Instant) { + fn clean(&mut self, now: SecondsSinceServerStart) { self.peers.retain(|_, peer| { - if peer.valid_until.0 > now { + if peer.valid_until.valid(now) { true } else { match peer.status { @@ -143,7 +144,7 @@ impl TorrentMap { &mut self, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, - now: Instant, + now: SecondsSinceServerStart, ) -> usize { let mut num_peers = 0; @@ -192,10 +193,11 @@ impl TorrentMaps { &mut self, config: &Config, access_list: &Arc, + server_start_instant: ServerStartInstant, ) -> (usize, usize) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; - let now = Instant::now(); + let now = server_start_instant.seconds_elapsed(); let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now); let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now); @@ -226,7 +228,7 @@ mod tests { ip_address: Ipv4Addr::from(i.to_be_bytes()), port: Port(1), status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), + valid_until: ValidUntil::new(ServerStartInstant::new(), 0), } } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 3a7c256..66f3d9e 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,7 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` -use aquatic_common::PanicSentinelWatcher; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use aquatic_udp::workers::swarm::run_swarm_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; @@ -51,6 +51,8 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { let response_sender = ConnectedResponseSender::new(vec![response_sender]); + let server_start_instant = ServerStartInstant::new(); + { let config = aquatic_config.clone(); let state = State::new(config.swarm_workers); @@ -60,6 +62,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { sentinel, config, state, + server_start_instant, request_receiver, response_sender, SwarmWorkerIndex(0), diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index a099e68..d4e6d35 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -29,16 +29,16 @@ pub struct State { } #[derive(Copy, Clone, Debug)] -pub struct PendingScrapeId(pub usize); +pub struct PendingScrapeId(pub u8); #[derive(Copy, Clone, Debug)] -pub struct ConsumerId(pub usize); +pub struct ConsumerId(pub u8); #[derive(Clone, Copy, Debug, PartialEq)] pub struct ConnectionId(pub usize); #[derive(Clone, Copy, Debug)] -pub struct ConnectionMeta { +pub struct InMessageMeta { /// Index of socket worker responsible for this connection. Required for /// sending back response through correct channel to correct worker. pub out_message_consumer_id: ConsumerId, @@ -47,6 +47,25 @@ pub struct ConnectionMeta { pub pending_scrape_id: Option, } +#[derive(Clone, Copy, Debug)] +pub struct OutMessageMeta { + /// Index of socket worker responsible for this connection. Required for + /// sending back response through correct channel to correct worker. + pub out_message_consumer_id: ConsumerId, + pub connection_id: ConnectionId, + pub pending_scrape_id: Option, +} + +impl Into for InMessageMeta { + fn into(self) -> OutMessageMeta { + OutMessageMeta { + out_message_consumer_id: self.out_message_consumer_id, + connection_id: self.connection_id, + pending_scrape_id: self.pending_scrape_id, + } + } +} + #[derive(Clone, Copy, Debug)] pub enum SwarmControlMessage { ConnectionClosed { diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index b295aa3..2964459 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -124,11 +124,11 @@ pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, // Clean connections this often (seconds) pub connection_cleaning_interval: u64, /// Close connections if no responses have been sent to them for this long (seconds) - pub max_connection_idle: u64, + pub max_connection_idle: u32, } impl Default for CleaningConfig { diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 7d48a08..8847eed 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -8,7 +8,7 @@ use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; -use aquatic_common::PanicSentinelWatcher; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{ consts::{SIGTERM, SIGUSR1}, @@ -49,14 +49,19 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let opt_tls_config = if config.network.enable_tls { - Some(Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - ).with_context(|| "create rustls config")?)) + Some(Arc::new( + create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) + .with_context(|| "create rustls config")?, + )) } else { None }; + let server_start_instant = ServerStartInstant::new(); + let mut executors = Vec::new(); for i in 0..(config.socket_workers) { @@ -88,6 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_mesh_builder, response_mesh_builder, priv_dropper, + server_start_instant, ) .await }) @@ -121,6 +127,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { control_mesh_builder, request_mesh_builder, response_mesh_builder, + server_start_instant, ) .await }) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index fdfd5a9..547987c 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -4,13 +4,13 @@ use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::PanicSentinel; +use aquatic_common::{PanicSentinel, ServerStartInstant}; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -42,7 +42,7 @@ struct ConnectionReference { task_handle: Option>, /// Sender part of channel used to pass on outgoing messages from request /// worker - out_message_sender: Rc>, + out_message_sender: Rc>, /// Updated after sending message to peer valid_until: ValidUntil, peer_id: Option, @@ -56,9 +56,10 @@ pub async fn run_socket_worker( state: State, opt_tls_config: Option>, control_message_mesh_builder: MeshBuilder, - in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, - out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, + in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, + out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, priv_dropper: PrivilegeDropper, + server_start_instant: ServerStartInstant, ) { let config = Rc::new(config); let access_list = state.access_list; @@ -84,7 +85,13 @@ pub async fn run_socket_worker( let (_, mut out_message_receivers) = out_message_mesh_builder.join(Role::Consumer).await.unwrap(); - let out_message_consumer_id = ConsumerId(out_message_receivers.consumer_id().unwrap()); + let out_message_consumer_id = ConsumerId( + out_message_receivers + .consumer_id() + .unwrap() + .try_into() + .unwrap(), + ); let connection_slab = Rc::new(RefCell::new(Slab::new())); @@ -94,6 +101,7 @@ pub async fn run_socket_worker( clean_connections( config.clone(), connection_slab.clone(), + server_start_instant, ) }), tq_prioritized, @@ -129,13 +137,16 @@ pub async fn run_socket_worker( let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference { task_handle: None, out_message_sender: out_message_sender.clone(), - valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + valid_until: ValidUntil::new( + server_start_instant, + config.cleaning.max_connection_idle, + ), peer_id: None, announced_info_hashes: Default::default(), ip_version, }); - ::log::info!("accepting stream, assigning id {}", key); + ::log::trace!("accepting stream, assigning id {}", key); let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( @@ -147,6 +158,7 @@ pub async fn run_socket_worker( connection_slab.clone(), out_message_sender, out_message_receiver, + server_start_instant, out_message_consumer_id, ConnectionId(key), opt_tls_config, @@ -204,11 +216,12 @@ pub async fn run_socket_worker( async fn clean_connections( config: Rc, connection_slab: Rc>>, + server_start_instant: ServerStartInstant, ) -> Option { - let now = Instant::now(); + let now = server_start_instant.seconds_elapsed(); connection_slab.borrow_mut().retain(|_, reference| { - if reference.valid_until.0 > now { + if reference.valid_until.valid(now) { true } else { if let Some(ref handle) = reference.task_handle { @@ -227,14 +240,14 @@ async fn clean_connections( } async fn receive_out_messages( - mut out_message_receiver: ConnectedReceiver<(ConnectionMeta, OutMessage)>, + mut out_message_receiver: ConnectedReceiver<(OutMessageMeta, OutMessage)>, connection_references: Rc>>, ) { let connection_references = &connection_references; while let Some((meta, out_message)) = out_message_receiver.next().await { if let Some(reference) = connection_references.borrow().get(meta.connection_id.0) { - ::log::info!( + ::log::trace!( "local channel {} len: {}", meta.connection_id.0, reference.out_message_sender.len() @@ -245,7 +258,7 @@ async fn receive_out_messages( Err(GlommioError::Closed(_)) => {} Err(GlommioError::WouldBlock(_)) => {} Err(err) => { - ::log::info!( + ::log::debug!( "Couldn't send out_message from shared channel to local receiver: {:?}", err ); @@ -258,12 +271,13 @@ async fn receive_out_messages( async fn run_connection( config: Rc, access_list: Arc, - in_message_senders: Rc>, + in_message_senders: Rc>, tq_prioritized: TaskQueueHandle, tq_regular: TaskQueueHandle, connection_slab: Rc>>, - out_message_sender: Rc>, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, + server_start_instant: ServerStartInstant, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, opt_tls_config: Option>, @@ -284,6 +298,7 @@ async fn run_connection( connection_slab.clone(), out_message_sender, out_message_receiver, + server_start_instant, out_message_consumer_id, connection_id, stream, @@ -329,6 +344,7 @@ async fn run_connection( connection_slab.clone(), out_message_sender, out_message_receiver, + server_start_instant, out_message_consumer_id, connection_id, stream, @@ -343,12 +359,13 @@ async fn run_stream_agnostic_connection< >( config: Rc, access_list: Arc, - in_message_senders: Rc>, + in_message_senders: Rc>, tq_prioritized: TaskQueueHandle, tq_regular: TaskQueueHandle, connection_slab: Rc>>, - out_message_sender: Rc>, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, + server_start_instant: ServerStartInstant, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, stream: S, @@ -400,6 +417,7 @@ async fn run_stream_agnostic_connection< ws_out, pending_scrape_slab, connection_id, + server_start_instant, }; let result = writer.run_out_message_loop().await; @@ -418,8 +436,8 @@ struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, connection_slab: Rc>>, - in_message_senders: Rc>, - out_message_sender: Rc>, + in_message_senders: Rc>, + out_message_sender: Rc>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, @@ -430,8 +448,6 @@ struct ConnectionReader { impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { - ::log::debug!("read_in_message"); - while self.out_message_sender.is_full() { sleep(Duration::from_millis(100)).await; @@ -442,8 +458,6 @@ impl ConnectionReader { match InMessage::from_ws_message(message) { Ok(in_message) => { - ::log::debug!("parsed in_message"); - self.handle_in_message(in_message).await?; } Err(err) => { @@ -516,7 +530,6 @@ impl ConnectionReader { ) .await .unwrap(); - ::log::info!("sent message to swarm worker"); } else { self.send_error_response( "Info hash not allowed".into(), @@ -559,11 +572,14 @@ impl ConnectionReader { stats: Default::default(), }; - let pending_scrape_id = PendingScrapeId( - RefCell::borrow_mut(&mut self.pending_scrape_slab) - .insert(pending_scrape_response), - ); - let meta = self.make_connection_meta(Some(pending_scrape_id)); + let pending_scrape_id: u8 = self + .pending_scrape_slab + .borrow_mut() + .insert(pending_scrape_response) + .try_into() + .with_context(|| "Reached 256 pending scrape responses")?; + + let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id))); for (consumer_index, info_hashes) in info_hashes_by_worker { let in_message = InMessage::ScrapeRequest(ScrapeRequest { @@ -576,7 +592,6 @@ impl ConnectionReader { .send_to(consumer_index, (meta, in_message)) .await .unwrap(); - ::log::info!("sent message to swarm worker"); } } } @@ -597,13 +612,13 @@ impl ConnectionReader { }); self.out_message_sender - .send((self.make_connection_meta(None), out_message)) + .send((self.make_connection_meta(None).into(), out_message)) .await .map_err(|err| anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err)) } - fn make_connection_meta(&self, pending_scrape_id: Option) -> ConnectionMeta { - ConnectionMeta { + fn make_connection_meta(&self, pending_scrape_id: Option) -> InMessageMeta { + InMessageMeta { connection_id: self.connection_id, out_message_consumer_id: self.out_message_consumer_id, ip_version: self.ip_version, @@ -614,10 +629,11 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, connection_slab: Rc>>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, + server_start_instant: ServerStartInstant, connection_id: ConnectionId, } @@ -636,7 +652,7 @@ impl ConnectionWriter { let finished = if let Some(pending) = Slab::get_mut( &mut RefCell::borrow_mut(&self.pending_scrape_slab), - pending_scrape_id.0, + pending_scrape_id.0 as usize, ) { pending.stats.extend(out_message.files); pending.pending_worker_out_messages -= 1; @@ -650,7 +666,7 @@ impl ConnectionWriter { let out_message = { let mut slab = RefCell::borrow_mut(&self.pending_scrape_slab); - let pending = slab.remove(pending_scrape_id.0); + let pending = slab.remove(pending_scrape_id.0 as usize); slab.shrink_to_fit(); @@ -690,13 +706,16 @@ impl ConnectionWriter { self.connection_id.0 ) })? - .valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + .valid_until = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); Ok(()) } Ok(Err(err)) => Err(err.into()), Err(err) => { - ::log::info!("send_out_message: sending to peer took to long: {}", err); + ::log::debug!("send_out_message: sending to peer took to long: {}", err); Ok(()) } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 22a9f02..b8a06f7 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -1,7 +1,7 @@ use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use futures::StreamExt; @@ -12,7 +12,10 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{extract_response_peers, AmortizedIndexMap, PanicSentinel}; +use aquatic_common::{ + extract_response_peers, AmortizedIndexMap, PanicSentinel, SecondsSinceServerStart, + ServerStartInstant, +}; use aquatic_ws_protocol::*; use crate::common::*; @@ -44,8 +47,9 @@ impl PeerStatus { #[derive(Clone, Copy)] struct Peer { - pub connection_meta: ConnectionMeta, - pub status: PeerStatus, + pub consumer_id: ConsumerId, + pub connection_id: ConnectionId, + pub seeder: bool, pub valid_until: ValidUntil, } @@ -71,14 +75,10 @@ impl Default for TorrentData { impl TorrentData { pub fn remove_peer(&mut self, peer_id: PeerId) { if let Some(peer) = self.peers.remove(&peer_id) { - match peer.status { - PeerStatus::Leeching => { - self.num_leechers -= 1; - } - PeerStatus::Seeding => { - self.num_seeders -= 1; - } - PeerStatus::Stopped => (), + if peer.seeder { + self.num_seeders -= 1; + } else { + self.num_leechers -= 1; } } } @@ -93,20 +93,25 @@ struct TorrentMaps { } impl TorrentMaps { - fn clean(&mut self, config: &Config, access_list: &Arc) { + fn clean( + &mut self, + config: &Config, + access_list: &Arc, + server_start_instant: ServerStartInstant, + ) { let mut access_list_cache = create_access_list_cache(access_list); + let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); } fn clean_torrent_map( config: &Config, access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, + now: SecondsSinceServerStart, ) { - let now = Instant::now(); - torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -119,18 +124,14 @@ impl TorrentMaps { let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; + let keep = peer.valid_until.valid(now); if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; + if peer.seeder { + *num_seeders -= 1; + } else { + *num_leechers -= 1; + } } keep @@ -148,8 +149,9 @@ pub async fn run_swarm_worker( config: Config, state: State, control_message_mesh_builder: MeshBuilder, - in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, - out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, + in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, + out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, + server_start_instant: ServerStartInstant, ) { let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) @@ -167,7 +169,7 @@ pub async fn run_swarm_worker( // Periodically clean torrents TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - torrents.borrow_mut().clean(&config, &access_list); + torrents.borrow_mut().clean(&config, &access_list, server_start_instant); Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() @@ -186,6 +188,7 @@ pub async fn run_swarm_worker( let handle = spawn_local(handle_request_stream( config.clone(), torrents.clone(), + server_start_instant, out_message_senders.clone(), receiver, )) @@ -229,19 +232,23 @@ where async fn handle_request_stream( config: Config, torrents: Rc>, - out_message_senders: Rc>, + server_start_instant: ServerStartInstant, + out_message_senders: Rc>, stream: S, ) where - S: futures_lite::Stream + ::std::marker::Unpin, + S: futures_lite::Stream + ::std::marker::Unpin, { let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new( + server_start_instant, + max_peer_age, + ))); TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + *peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age); Some(Duration::from_secs(1)) })() @@ -279,14 +286,12 @@ async fn handle_request_stream( }; for (meta, out_message) in out_messages.drain(..) { - ::log::info!("swarm worker trying to send OutMessage to socket worker"); - out_message_senders - .send_to(meta.out_message_consumer_id.0, (meta, out_message)) + .send_to(meta.out_message_consumer_id.0 as usize, (meta, out_message)) .await .expect("failed sending out_message to socket worker"); - ::log::info!("swarm worker sent OutMessage to socket worker"); + ::log::debug!("swarm worker sent OutMessage to socket worker"); } }, ) @@ -297,9 +302,9 @@ fn handle_announce_request( config: &Config, rng: &mut SmallRng, torrent_maps: &mut TorrentMaps, - out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, valid_until: ValidUntil, - request_sender_meta: ConnectionMeta, + request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version { @@ -313,7 +318,7 @@ fn handle_announce_request( // peers have access to each others peer_id's, they could send requests // using them, causing all sorts of issues. if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { - if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id { + if request_sender_meta.connection_id != previous_peer.connection_id { return; } } @@ -327,31 +332,39 @@ fn handle_announce_request( request.bytes_left, ); - let peer = Peer { - connection_meta: request_sender_meta, - status: peer_status, - valid_until, - }; - let opt_removed_peer = match peer_status { PeerStatus::Leeching => { torrent_data.num_leechers += 1; + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: false, + valid_until, + }; + torrent_data.peers.insert(request.peer_id, peer) } PeerStatus::Seeding => { torrent_data.num_seeders += 1; + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: true, + valid_until, + }; + torrent_data.peers.insert(request.peer_id, peer) } PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id), }; - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { + match opt_removed_peer.map(|peer| peer.seeder) { + Some(false) => { torrent_data.num_leechers -= 1; } - Some(PeerStatus::Seeding) => { + Some(true) => { torrent_data.num_seeders -= 1; } _ => {} @@ -385,14 +398,14 @@ fn handle_announce_request( offer_id: offer.offer_id, }; - out_messages.push(( - offer_receiver.connection_meta, - OutMessage::Offer(middleman_offer), - )); - ::log::trace!( - "sending middleman offer to {:?}", - offer_receiver.connection_meta - ); + let meta = OutMessageMeta { + out_message_consumer_id: offer_receiver.consumer_id, + connection_id: offer_receiver.connection_id, + pending_scrape_id: None, + }; + + out_messages.push((meta, OutMessage::Offer(middleman_offer))); + ::log::trace!("sending middleman offer to {:?}", meta); } } @@ -409,14 +422,14 @@ fn handle_announce_request( offer_id, }; - out_messages.push(( - answer_receiver.connection_meta, - OutMessage::Answer(middleman_answer), - )); - ::log::trace!( - "sending middleman answer to {:?}", - answer_receiver.connection_meta - ); + let meta = OutMessageMeta { + out_message_consumer_id: answer_receiver.consumer_id, + connection_id: answer_receiver.connection_id, + pending_scrape_id: None, + }; + + out_messages.push((meta, OutMessage::Answer(middleman_answer))); + ::log::trace!("sending middleman answer to {:?}", meta); } } @@ -428,14 +441,14 @@ fn handle_announce_request( announce_interval: config.protocol.peer_announce_interval, }); - out_messages.push((request_sender_meta, out_message)); + out_messages.push((request_sender_meta.into(), out_message)); } fn handle_scrape_request( config: &Config, torrent_maps: &mut TorrentMaps, - out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, - meta: ConnectionMeta, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + meta: InMessageMeta, request: ScrapeRequest, ) { let info_hashes = if let Some(info_hashes) = request.info_hashes { @@ -469,5 +482,5 @@ fn handle_scrape_request( } } - out_messages.push((meta, OutMessage::ScrapeResponse(out_message))); + out_messages.push((meta.into(), OutMessage::ScrapeResponse(out_message))); }