diff --git a/README.md b/README.md index f21c53b..2f9420a 100644 --- a/README.md +++ b/README.md @@ -207,7 +207,11 @@ Implements: * Doesn't allow full scrapes, i.e. of all registered info hashes `aquatic_http` has not been tested as much as `aquatic_udp` but likely works -fine. +fine in production. + +Running behind a reverse proxy is currently not supported due to the +[difficulties of determining the originating IP address](https://adam-p.ca/blog/2022/03/x-forwarded-for/) +without knowing the exact setup. #### Performance @@ -220,12 +224,14 @@ More details are available [here](./documents/aquatic-http-load-test-2022-04-11. Aims for compatibility with [WebTorrent](https://github.com/webtorrent) clients. Notes: - * Only runs over TLS * Doesn't track the number of torrent downloads (0 is always sent). * Doesn't allow full scrapes, i.e. of all registered info hashes `aquatic_ws` has not been tested as much as `aquatic_udp` but likely works -fine. +fine in production. + +Running behind a reverse proxy is supported, as long as IPv4 requests are +proxied to IPv4 requests, and IPv6 requests to IPv6 requests. #### Performance diff --git a/TODO.md b/TODO.md index 5396993..f9e3c35 100644 --- a/TODO.md +++ b/TODO.md @@ -4,6 +4,9 @@ ## Medium priority +* Consider replacing unmaintained indexmap-amortized with plain indexmap +* Run cargo-fuzz on protocol crates + * quit whole program if any thread panics * But it would be nice not to panic in workers, but to return errors instead. Once JoinHandle::is_finished is available in stable Rust (#90470), an @@ -20,8 +23,6 @@ * stagger cleaning tasks? * aquatic_ws - * Can peer IP address change after connection has been established - due to some kind of renegotition? It would cause issues. * Add cleaning task for ConnectionHandle.announced_info_hashes? * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity * replacing indexmap_amortized / simd_json with equivalents doesn't help diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 1f08cc5..b23a71b 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -1,7 +1,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use ahash::RandomState; use rand::Rng; @@ -16,23 +16,46 @@ pub mod rustls_config; /// Amortized IndexMap using AHash hasher pub type AmortizedIndexMap = indexmap_amortized::IndexMap; -/// Peer or connection valid until this instant -/// -/// Used instead of "last seen" or similar to hopefully prevent arithmetic -/// overflow when cleaning. +/// Peer, connection or similar valid until this instant #[derive(Debug, Clone, Copy)] -pub struct ValidUntil(pub Instant); +pub struct ValidUntil(SecondsSinceServerStart); impl ValidUntil { #[inline] - pub fn new(offset_seconds: u64) -> Self { - Self(Instant::now() + Duration::from_secs(offset_seconds)) + pub fn new(start_instant: ServerStartInstant, offset_seconds: u32) -> Self { + Self(SecondsSinceServerStart( + start_instant.seconds_elapsed().0 + offset_seconds, + )) } - pub fn new_with_now(now: Instant, offset_seconds: u64) -> Self { - Self(now + Duration::from_secs(offset_seconds)) + pub fn new_with_now(now: SecondsSinceServerStart, offset_seconds: u32) -> Self { + Self(SecondsSinceServerStart(now.0 + offset_seconds)) + } + pub fn valid(&self, now: SecondsSinceServerStart) -> bool { + self.0 .0 > now.0 } } +#[derive(Debug, Clone, Copy)] +pub struct ServerStartInstant(Instant); + +impl ServerStartInstant { + pub fn new() -> Self { + Self(Instant::now()) + } + pub fn seconds_elapsed(&self) -> SecondsSinceServerStart { + SecondsSinceServerStart( + self.0 + .elapsed() + .as_secs() + .try_into() + .expect("server ran for more seconds than what fits in a u32"), + ) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct SecondsSinceServerStart(u32); + pub struct PanicSentinelWatcher(Arc); impl PanicSentinelWatcher { diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 95bc281..995d51c 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -112,9 +112,9 @@ pub struct CleaningConfig { /// Clean connections this often (seconds) pub connection_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, /// Remove connections that haven't seen valid requests for this long (seconds) - pub max_connection_idle: u64, + pub max_connection_idle: u32, } impl Default for CleaningConfig { diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 328ada9..8b82c4f 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -6,7 +6,7 @@ use aquatic_common::{ }, privileges::PrivilegeDropper, rustls_config::create_rustls_config, - PanicSentinelWatcher, + PanicSentinelWatcher, ServerStartInstant, }; use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; @@ -46,6 +46,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { &config.network.tls_private_key_path, )?); + let server_start_instant = ServerStartInstant::new(); + let mut executors = Vec::new(); for i in 0..(config.socket_workers) { @@ -73,6 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { tls_config, request_mesh_builder, priv_dropper, + server_start_instant, ) .await }) @@ -97,8 +100,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let executor = builder .spawn(move || async move { - workers::swarm::run_swarm_worker(sentinel, config, state, request_mesh_builder) - .await + workers::swarm::run_swarm_worker( + sentinel, + config, + state, + request_mesh_builder, + server_start_instant, + ) + .await }) .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index f47bb33..d99062a 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -3,13 +3,13 @@ use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::response::{ @@ -59,6 +59,7 @@ pub async fn run_socket_worker( tls_config: Arc, request_mesh_builder: MeshBuilder, priv_dropper: PrivilegeDropper, + server_start_instant: ServerStartInstant, ) { let config = Rc::new(config); let access_list = state.access_list; @@ -74,6 +75,7 @@ pub async fn run_socket_worker( clean_connections( config.clone(), connection_slab.clone(), + server_start_instant, ) })); @@ -84,7 +86,10 @@ pub async fn run_socket_worker( Ok(stream) => { let key = connection_slab.borrow_mut().insert(ConnectionReference { task_handle: None, - valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + valid_until: ValidUntil::new( + server_start_instant, + config.cleaning.max_connection_idle, + ), }); let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { @@ -92,6 +97,7 @@ pub async fn run_socket_worker( config, access_list, request_senders, + server_start_instant, ConnectionId(key), tls_config, connection_slab.clone(), @@ -118,11 +124,12 @@ pub async fn run_socket_worker( async fn clean_connections( config: Rc, connection_slab: Rc>>, + server_start_instant: ServerStartInstant, ) -> Option { - let now = Instant::now(); + let now = server_start_instant.seconds_elapsed(); connection_slab.borrow_mut().retain(|_, reference| { - if reference.valid_until.0 > now { + if reference.valid_until.valid(now) { true } else { if let Some(ref handle) = reference.task_handle { @@ -145,6 +152,7 @@ struct Connection { access_list_cache: AccessListCache, request_senders: Rc>, connection_slab: Rc>>, + server_start_instant: ServerStartInstant, stream: TlsStream, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, @@ -158,6 +166,7 @@ impl Connection { config: Rc, access_list: Arc, request_senders: Rc>, + server_start_instant: ServerStartInstant, connection_id: ConnectionId, tls_config: Arc, connection_slab: Rc>>, @@ -180,6 +189,7 @@ impl Connection { access_list_cache: create_access_list_cache(&access_list), request_senders: request_senders.clone(), connection_slab, + server_start_instant, stream, peer_addr, connection_id, @@ -271,7 +281,10 @@ impl Connection { async fn handle_request(&mut self, request: Request) -> anyhow::Result { if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { if let Some(reference) = slab.get_mut(self.connection_id.0) { - reference.valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + reference.valid_until = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); } } diff --git a/aquatic_http/src/workers/swarm.rs b/aquatic_http/src/workers/swarm.rs index c952938..cbfef5c 100644 --- a/aquatic_http/src/workers/swarm.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -4,7 +4,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::rc::Rc; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use either::Either; use futures_lite::{Stream, StreamExt}; @@ -17,9 +16,9 @@ use rand::SeedableRng; use smartstring::{LazyCompact, SmartString}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::ValidUntil; use aquatic_common::{extract_response_peers, PanicSentinel}; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; +use aquatic_common::{SecondsSinceServerStart, ServerStartInstant, ValidUntil}; use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::*; use aquatic_http_protocol::response::ResponsePeer; @@ -107,20 +106,26 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean( + &mut self, + config: &Config, + access_list: &Arc, + server_start_instant: ServerStartInstant, + ) { let mut access_list_cache = create_access_list_cache(access_list); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + let now = server_start_instant.seconds_elapsed(); + + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); } fn clean_torrent_map( config: &Config, access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, + now: SecondsSinceServerStart, ) { - let now = Instant::now(); - torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -133,7 +138,7 @@ impl TorrentMaps { let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; + let keep = peer.valid_until.valid(now); if !keep { match peer.status { @@ -162,6 +167,7 @@ pub async fn run_swarm_worker( config: Config, state: State, request_mesh_builder: MeshBuilder, + server_start_instant: ServerStartInstant, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); @@ -171,19 +177,22 @@ pub async fn run_swarm_worker( // Periodically clean torrents TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - torrents.borrow_mut().clean(&config, &access_list); + torrents.borrow_mut().clean(&config, &access_list, server_start_instant); Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() })); let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new( + server_start_instant, + max_peer_age, + ))); // Periodically update peer_valid_until TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + *peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age); Some(Duration::from_secs(1)) })() diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index d8bd496..d678956 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -99,7 +99,7 @@ pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, } impl Default for CleaningConfig { diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index e10ee83..4193025 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -6,6 +6,7 @@ use std::{collections::VecDeque, sync::Arc}; use aquatic_common::{ privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher, + ServerStartInstant, }; use common::ChannelRequestSender; use dotenv::dotenv; @@ -40,6 +41,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); + let server_start_instant = ServerStartInstant::new(); + let mut handles = Vec::new(); for _ in 0..config.socket_workers { @@ -71,7 +74,14 @@ pub fn run(config: Config) -> anyhow::Result<()> { let handle = ::std::thread::Builder::new() .name("request".into()) - .spawn(move || workers::swarm::run_swarm_worker(sentinel, config, request_receiver))?; + .spawn(move || { + workers::swarm::run_swarm_worker( + sentinel, + config, + request_receiver, + server_start_instant, + ) + })?; handles.push(handle); } diff --git a/aquatic_http_private/src/workers/swarm/common.rs b/aquatic_http_private/src/workers/swarm/common.rs index abbcbc3..4fc6a12 100644 --- a/aquatic_http_private/src/workers/swarm/common.rs +++ b/aquatic_http_private/src/workers/swarm/common.rs @@ -1,7 +1,6 @@ use std::net::{Ipv4Addr, Ipv6Addr}; -use std::time::Instant; -use aquatic_common::{AmortizedIndexMap, ValidUntil}; +use aquatic_common::{AmortizedIndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil}; use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId}; use aquatic_http_protocol::response::ResponsePeer; @@ -84,20 +83,20 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self) { - Self::clean_torrent_map(&mut self.ipv4); - Self::clean_torrent_map(&mut self.ipv6); + pub fn clean(&mut self, server_start_instant: ServerStartInstant) { + let now = server_start_instant.seconds_elapsed(); + + Self::clean_torrent_map(&mut self.ipv4, now); + Self::clean_torrent_map(&mut self.ipv6, now); } - fn clean_torrent_map(torrent_map: &mut TorrentMap) { - let now = Instant::now(); - + fn clean_torrent_map(torrent_map: &mut TorrentMap, now: SecondsSinceServerStart) { torrent_map.retain(|_, torrent_data| { let num_seeders = &mut torrent_data.num_seeders; let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { - if peer.valid_until.0 >= now { + if peer.valid_until.valid(now) { true } else { match peer.status { diff --git a/aquatic_http_private/src/workers/swarm/mod.rs b/aquatic_http_private/src/workers/swarm/mod.rs index 121c34d..45fb5fb 100644 --- a/aquatic_http_private/src/workers/swarm/mod.rs +++ b/aquatic_http_private/src/workers/swarm/mod.rs @@ -11,7 +11,9 @@ use tokio::sync::mpsc::Receiver; use tokio::task::LocalSet; use tokio::time; -use aquatic_common::{extract_response_peers, CanonicalSocketAddr, PanicSentinel, ValidUntil}; +use aquatic_common::{ + extract_response_peers, CanonicalSocketAddr, PanicSentinel, ServerStartInstant, ValidUntil, +}; use aquatic_http_protocol::response::{ AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6, }; @@ -25,12 +27,13 @@ pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, request_receiver: Receiver, + server_start_instant: ServerStartInstant, ) -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - runtime.block_on(run_inner(config, request_receiver))?; + runtime.block_on(run_inner(config, request_receiver, server_start_instant))?; Ok(()) } @@ -38,6 +41,7 @@ pub fn run_swarm_worker( async fn run_inner( config: Config, mut request_receiver: Receiver, + server_start_instant: ServerStartInstant, ) -> anyhow::Result<()> { let torrents = Rc::new(RefCell::new(TorrentMaps::default())); let mut rng = SmallRng::from_entropy(); @@ -45,6 +49,7 @@ async fn run_inner( LocalSet::new().spawn_local(periodically_clean_torrents( config.clone(), torrents.clone(), + server_start_instant, )); loop { @@ -53,7 +58,7 @@ async fn run_inner( .await .ok_or_else(|| anyhow::anyhow!("request channel closed"))?; - let valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); let response = handle_announce_request( &config, @@ -68,7 +73,11 @@ async fn run_inner( } } -async fn periodically_clean_torrents(config: Config, torrents: Rc>) { +async fn periodically_clean_torrents( + config: Config, + torrents: Rc>, + server_start_instant: ServerStartInstant, +) { let mut interval = time::interval(time::Duration::from_secs( config.cleaning.torrent_cleaning_interval, )); @@ -76,7 +85,7 @@ async fn periodically_clean_torrents(config: Config, torrents: Rc) -> std::fmt::Result { + match self { + Self::NeedMoreData => write!(f, "Incomplete request, more data needed"), + Self::Invalid(err) => write!(f, "Invalid request: {:#}", err), + } + } +} + +impl ::std::error::Error for RequestParseError {} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum Request { Announce(AnnounceRequest), @@ -262,28 +273,20 @@ impl Request { let mut headers = [httparse::EMPTY_HEADER; 16]; let mut http_request = httparse::Request::new(&mut headers); - let path = match http_request.parse(bytes) { + match http_request.parse(bytes) { Ok(httparse::Status::Complete(_)) => { if let Some(path) = http_request.path { - path + Self::from_http_get_path(path).map_err(RequestParseError::Invalid) } else { - return Err(RequestParseError::Invalid(anyhow::anyhow!("no http path"))); + Err(RequestParseError::Invalid(anyhow::anyhow!("no http path"))) } } - Ok(httparse::Status::Partial) => { - if let Some(path) = http_request.path { - path - } else { - return Err(RequestParseError::NeedMoreData); - } - } - Err(err) => return Err(RequestParseError::Invalid(anyhow::Error::from(err))), - }; - - Self::from_http_get_path(path).map_err(RequestParseError::Invalid) + Ok(httparse::Status::Partial) => Err(RequestParseError::NeedMoreData), + Err(err) => Err(RequestParseError::Invalid(anyhow::Error::from(err))), + } } - /// Parse Request from http path (GET `/announce?info_hash=...`) + /// Parse Request from http GET path (`/announce?info_hash=...`) /// /// Existing serde-url decode crates were insufficient, so the decision was /// made to create a custom parser. serde_urlencoded doesn't support multiple @@ -308,10 +311,12 @@ impl Request { Ok(Request::Announce(AnnounceRequest::from_query_string( query_string, )?)) - } else { + } else if location == "/scrape" { Ok(Request::Scrape(ScrapeRequest::from_query_string( query_string, )?)) + } else { + Err(anyhow::anyhow!("Path must be /announce or /scrape")) } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 9a7c33d..06a6822 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -181,10 +181,10 @@ pub struct CleaningConfig { /// Allow clients to use a connection token for this long (seconds) pub max_connection_age: u32, /// Remove peers who have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, /// Remove pending scrape responses that have not been returned from swarm /// workers for this long (seconds) - pub max_pending_scrape_age: u64, + pub max_pending_scrape_age: u32, } impl Default for CleaningConfig { diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 815d69a..1259076 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -14,7 +14,7 @@ use aquatic_common::access_list::update_access_list; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::PanicSentinelWatcher; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use common::{ ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, @@ -41,6 +41,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_senders = Vec::new(); let mut response_receivers = BTreeMap::new(); + let server_start_instant = ServerStartInstant::new(); + for i in 0..config.swarm_workers { let (request_sender, request_receiver) = if config.worker_channel_size == 0 { unbounded() @@ -85,6 +87,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, + server_start_instant, request_receiver, response_sender, SwarmWorkerIndex(i), @@ -120,6 +123,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { config, i, connection_validator, + server_start_instant, request_sender, response_receiver, priv_dropper, diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index fd45296..6daeeb2 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -6,6 +6,7 @@ pub mod validator; use std::time::{Duration, Instant}; use anyhow::Context; +use aquatic_common::ServerStartInstant; use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -31,6 +32,7 @@ pub fn run_socket_worker( config: Config, token_num: usize, mut connection_validator: ConnectionValidator, + server_start_instant: ServerStartInstant, request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, priv_dropper: PrivilegeDropper, @@ -59,7 +61,8 @@ pub fn run_socket_worker( let pending_scrape_cleaning_duration = Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); - let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age); + let mut pending_scrape_valid_until = + ValidUntil::new(server_start_instant, config.cleaning.max_pending_scrape_age); let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; @@ -100,13 +103,17 @@ pub fn run_socket_worker( // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { + let seconds_since_start = server_start_instant.seconds_elapsed(); + + pending_scrape_valid_until = ValidUntil::new_with_now( + seconds_since_start, + config.cleaning.max_pending_scrape_age, + ); + let now = Instant::now(); - pending_scrape_valid_until = - ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age); - if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { - pending_scrape_responses.clean(); + pending_scrape_responses.clean(seconds_since_start); last_pending_scrape_cleaning = now; } diff --git a/aquatic_udp/src/workers/socket/storage.rs b/aquatic_udp/src/workers/socket/storage.rs index ee5ec54..8aed47c 100644 --- a/aquatic_udp/src/workers/socket/storage.rs +++ b/aquatic_udp/src/workers/socket/storage.rs @@ -1,10 +1,9 @@ use std::collections::BTreeMap; -use std::time::Instant; use hashbrown::HashMap; use slab::Slab; -use aquatic_common::ValidUntil; +use aquatic_common::{SecondsSinceServerStart, ValidUntil}; use aquatic_udp_protocol::*; use crate::common::*; @@ -97,11 +96,9 @@ impl PendingScrapeResponseSlab { } } - pub fn clean(&mut self) { - let now = Instant::now(); - + pub fn clean(&mut self, now: SecondsSinceServerStart) { self.0.retain(|k, v| { - if v.valid_until.0 > now { + if v.valid_until.valid(now) { true } else { ::log::warn!( @@ -120,6 +117,7 @@ impl PendingScrapeResponseSlab { #[cfg(test)] mod tests { + use aquatic_common::ServerStartInstant; use quickcheck::TestResult; use quickcheck_macros::quickcheck; @@ -138,7 +136,7 @@ mod tests { config.swarm_workers = swarm_workers as usize; - let valid_until = ValidUntil::new(1); + let valid_until = ValidUntil::new(ServerStartInstant::new(), 1); let mut map = PendingScrapeResponseSlab::default(); diff --git a/aquatic_udp/src/workers/swarm/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs index b2cc7ed..f80f192 100644 --- a/aquatic_udp/src/workers/swarm/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; +use aquatic_common::ServerStartInstant; use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; @@ -21,6 +22,7 @@ pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, + server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, response_sender: ConnectedResponseSender, worker_index: SwarmWorkerIndex, @@ -29,7 +31,7 @@ pub fn run_swarm_worker( let mut rng = SmallRng::from_entropy(); let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); - let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let mut peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); let statistics_update_interval = Duration::from_secs(config.statistics.interval); @@ -81,10 +83,14 @@ pub fn run_swarm_worker( if iter_counter % 128 == 0 { let now = Instant::now(); - peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age); + peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); if now > last_cleaning + cleaning_interval { - let (ipv4, ipv6) = torrents.clean_and_get_num_peers(&config, &state.access_list); + let (ipv4, ipv6) = torrents.clean_and_get_num_peers( + &config, + &state.access_list, + server_start_instant, + ); if config.statistics.active() { state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release); diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 75bc311..35d6392 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -1,8 +1,9 @@ use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::sync::Arc; -use std::time::Instant; +use aquatic_common::SecondsSinceServerStart; +use aquatic_common::ServerStartInstant; use aquatic_common::{ access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode}, extract_response_peers, AmortizedIndexMap, ValidUntil, @@ -99,9 +100,9 @@ impl TorrentData { } /// Remove inactive peers and reclaim space - fn clean(&mut self, now: Instant) { + fn clean(&mut self, now: SecondsSinceServerStart) { self.peers.retain(|_, peer| { - if peer.valid_until.0 > now { + if peer.valid_until.valid(now) { true } else { match peer.status { @@ -143,7 +144,7 @@ impl TorrentMap { &mut self, access_list_cache: &mut AccessListCache, access_list_mode: AccessListMode, - now: Instant, + now: SecondsSinceServerStart, ) -> usize { let mut num_peers = 0; @@ -192,10 +193,11 @@ impl TorrentMaps { &mut self, config: &Config, access_list: &Arc, + server_start_instant: ServerStartInstant, ) -> (usize, usize) { let mut cache = create_access_list_cache(access_list); let mode = config.access_list.mode; - let now = Instant::now(); + let now = server_start_instant.seconds_elapsed(); let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now); let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now); @@ -226,7 +228,7 @@ mod tests { ip_address: Ipv4Addr::from(i.to_be_bytes()), port: Port(1), status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), + valid_until: ValidUntil::new(ServerStartInstant::new(), 0), } } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 3a7c256..66f3d9e 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,7 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` -use aquatic_common::PanicSentinelWatcher; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use aquatic_udp::workers::swarm::run_swarm_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; @@ -51,6 +51,8 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { let response_sender = ConnectedResponseSender::new(vec![response_sender]); + let server_start_instant = ServerStartInstant::new(); + { let config = aquatic_config.clone(); let state = State::new(config.swarm_workers); @@ -60,6 +62,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { sentinel, config, state, + server_start_instant, request_receiver, response_sender, SwarmWorkerIndex(0), diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index a099e68..d4e6d35 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -29,16 +29,16 @@ pub struct State { } #[derive(Copy, Clone, Debug)] -pub struct PendingScrapeId(pub usize); +pub struct PendingScrapeId(pub u8); #[derive(Copy, Clone, Debug)] -pub struct ConsumerId(pub usize); +pub struct ConsumerId(pub u8); #[derive(Clone, Copy, Debug, PartialEq)] pub struct ConnectionId(pub usize); #[derive(Clone, Copy, Debug)] -pub struct ConnectionMeta { +pub struct InMessageMeta { /// Index of socket worker responsible for this connection. Required for /// sending back response through correct channel to correct worker. pub out_message_consumer_id: ConsumerId, @@ -47,6 +47,25 @@ pub struct ConnectionMeta { pub pending_scrape_id: Option, } +#[derive(Clone, Copy, Debug)] +pub struct OutMessageMeta { + /// Index of socket worker responsible for this connection. Required for + /// sending back response through correct channel to correct worker. + pub out_message_consumer_id: ConsumerId, + pub connection_id: ConnectionId, + pub pending_scrape_id: Option, +} + +impl Into for InMessageMeta { + fn into(self) -> OutMessageMeta { + OutMessageMeta { + out_message_consumer_id: self.out_message_consumer_id, + connection_id: self.connection_id, + pending_scrape_id: self.pending_scrape_id, + } + } +} + #[derive(Clone, Copy, Debug)] pub enum SwarmControlMessage { ConnectionClosed { diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index b295aa3..2964459 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -124,11 +124,11 @@ pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, + pub max_peer_age: u32, // Clean connections this often (seconds) pub connection_cleaning_interval: u64, /// Close connections if no responses have been sent to them for this long (seconds) - pub max_connection_idle: u64, + pub max_connection_idle: u32, } impl Default for CleaningConfig { diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index d7e961d..d8b6cd0 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -8,7 +8,7 @@ use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; -use aquatic_common::PanicSentinelWatcher; +use aquatic_common::{PanicSentinelWatcher, ServerStartInstant}; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{ consts::{SIGTERM, SIGUSR1}, @@ -49,14 +49,19 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let opt_tls_config = if config.network.enable_tls { - Some(Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - ).with_context(|| "create rustls config")?)) + Some(Arc::new( + create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) + .with_context(|| "create rustls config")?, + )) } else { None }; + let server_start_instant = ServerStartInstant::new(); + let mut executors = Vec::new(); for i in 0..(config.socket_workers) { @@ -88,6 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { request_mesh_builder, response_mesh_builder, priv_dropper, + server_start_instant, ) .await }) @@ -123,6 +129,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { control_mesh_builder, request_mesh_builder, response_mesh_builder, + server_start_instant, ) .await }) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 61870c7..0d7f09b 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -4,13 +4,13 @@ use std::collections::BTreeMap; use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::PanicSentinel; +use aquatic_common::{PanicSentinel, ServerStartInstant}; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; @@ -42,7 +42,7 @@ struct ConnectionReference { task_handle: Option>, /// Sender part of channel used to pass on outgoing messages from request /// worker - out_message_sender: Rc>, + out_message_sender: Rc>, /// Updated after sending message to peer valid_until: ValidUntil, peer_id: Option, @@ -56,9 +56,10 @@ pub async fn run_socket_worker( state: State, opt_tls_config: Option>, control_message_mesh_builder: MeshBuilder, - in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, - out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, + in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, + out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, priv_dropper: PrivilegeDropper, + server_start_instant: ServerStartInstant, ) { let config = Rc::new(config); let access_list = state.access_list; @@ -86,7 +87,13 @@ pub async fn run_socket_worker( let (_, mut out_message_receivers) = out_message_mesh_builder.join(Role::Consumer).await.unwrap(); - let out_message_consumer_id = ConsumerId(out_message_receivers.consumer_id().unwrap()); + let out_message_consumer_id = ConsumerId( + out_message_receivers + .consumer_id() + .unwrap() + .try_into() + .unwrap(), + ); ::log::info!("joined channels"); @@ -98,6 +105,7 @@ pub async fn run_socket_worker( clean_connections( config.clone(), connection_slab.clone(), + server_start_instant, ) }), tq_prioritized, @@ -133,13 +141,16 @@ pub async fn run_socket_worker( let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference { task_handle: None, out_message_sender: out_message_sender.clone(), - valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + valid_until: ValidUntil::new( + server_start_instant, + config.cleaning.max_connection_idle, + ), peer_id: None, announced_info_hashes: Default::default(), ip_version, }); - ::log::info!("accepting stream, assigning id {}", key); + ::log::trace!("accepting stream, assigning id {}", key); let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( @@ -151,6 +162,7 @@ pub async fn run_socket_worker( connection_slab.clone(), out_message_sender, out_message_receiver, + server_start_instant, out_message_consumer_id, ConnectionId(key), opt_tls_config, @@ -208,11 +220,12 @@ pub async fn run_socket_worker( async fn clean_connections( config: Rc, connection_slab: Rc>>, + server_start_instant: ServerStartInstant, ) -> Option { - let now = Instant::now(); + let now = server_start_instant.seconds_elapsed(); connection_slab.borrow_mut().retain(|_, reference| { - if reference.valid_until.0 > now { + if reference.valid_until.valid(now) { true } else { if let Some(ref handle) = reference.task_handle { @@ -231,14 +244,14 @@ async fn clean_connections( } async fn receive_out_messages( - mut out_message_receiver: ConnectedReceiver<(ConnectionMeta, OutMessage)>, + mut out_message_receiver: ConnectedReceiver<(OutMessageMeta, OutMessage)>, connection_references: Rc>>, ) { let connection_references = &connection_references; while let Some((meta, out_message)) = out_message_receiver.next().await { if let Some(reference) = connection_references.borrow().get(meta.connection_id.0) { - ::log::info!( + ::log::trace!( "local channel {} len: {}", meta.connection_id.0, reference.out_message_sender.len() @@ -249,7 +262,7 @@ async fn receive_out_messages( Err(GlommioError::Closed(_)) => {} Err(GlommioError::WouldBlock(_)) => {} Err(err) => { - ::log::info!( + ::log::debug!( "Couldn't send out_message from shared channel to local receiver: {:?}", err ); @@ -262,12 +275,13 @@ async fn receive_out_messages( async fn run_connection( config: Rc, access_list: Arc, - in_message_senders: Rc>, + in_message_senders: Rc>, tq_prioritized: TaskQueueHandle, tq_regular: TaskQueueHandle, connection_slab: Rc>>, - out_message_sender: Rc>, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, + server_start_instant: ServerStartInstant, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, opt_tls_config: Option>, @@ -288,6 +302,7 @@ async fn run_connection( connection_slab.clone(), out_message_sender, out_message_receiver, + server_start_instant, out_message_consumer_id, connection_id, stream, @@ -333,6 +348,7 @@ async fn run_connection( connection_slab.clone(), out_message_sender, out_message_receiver, + server_start_instant, out_message_consumer_id, connection_id, stream, @@ -347,12 +363,13 @@ async fn run_stream_agnostic_connection< >( config: Rc, access_list: Arc, - in_message_senders: Rc>, + in_message_senders: Rc>, tq_prioritized: TaskQueueHandle, tq_regular: TaskQueueHandle, connection_slab: Rc>>, - out_message_sender: Rc>, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, + server_start_instant: ServerStartInstant, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, stream: S, @@ -404,6 +421,7 @@ async fn run_stream_agnostic_connection< ws_out, pending_scrape_slab, connection_id, + server_start_instant, }; let result = writer.run_out_message_loop().await; @@ -422,8 +440,8 @@ struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, connection_slab: Rc>>, - in_message_senders: Rc>, - out_message_sender: Rc>, + in_message_senders: Rc>, + out_message_sender: Rc>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, ws_in: SplitStream>, @@ -434,8 +452,6 @@ struct ConnectionReader { impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { - ::log::debug!("read_in_message"); - while self.out_message_sender.is_full() { sleep(Duration::from_millis(100)).await; @@ -446,8 +462,6 @@ impl ConnectionReader { match InMessage::from_ws_message(message) { Ok(in_message) => { - ::log::debug!("parsed in_message"); - self.handle_in_message(in_message).await?; } Err(err) => { @@ -520,7 +534,6 @@ impl ConnectionReader { ) .await .unwrap(); - ::log::info!("sent message to swarm worker"); } else { self.send_error_response( "Info hash not allowed".into(), @@ -563,11 +576,14 @@ impl ConnectionReader { stats: Default::default(), }; - let pending_scrape_id = PendingScrapeId( - RefCell::borrow_mut(&mut self.pending_scrape_slab) - .insert(pending_scrape_response), - ); - let meta = self.make_connection_meta(Some(pending_scrape_id)); + let pending_scrape_id: u8 = self + .pending_scrape_slab + .borrow_mut() + .insert(pending_scrape_response) + .try_into() + .with_context(|| "Reached 256 pending scrape responses")?; + + let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id))); for (consumer_index, info_hashes) in info_hashes_by_worker { let in_message = InMessage::ScrapeRequest(ScrapeRequest { @@ -580,7 +596,6 @@ impl ConnectionReader { .send_to(consumer_index, (meta, in_message)) .await .unwrap(); - ::log::info!("sent message to swarm worker"); } } } @@ -601,13 +616,13 @@ impl ConnectionReader { }); self.out_message_sender - .send((self.make_connection_meta(None), out_message)) + .send((self.make_connection_meta(None).into(), out_message)) .await .map_err(|err| anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err)) } - fn make_connection_meta(&self, pending_scrape_id: Option) -> ConnectionMeta { - ConnectionMeta { + fn make_connection_meta(&self, pending_scrape_id: Option) -> InMessageMeta { + InMessageMeta { connection_id: self.connection_id, out_message_consumer_id: self.out_message_consumer_id, ip_version: self.ip_version, @@ -618,10 +633,11 @@ impl ConnectionReader { struct ConnectionWriter { config: Rc, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_receiver: LocalReceiver<(OutMessageMeta, OutMessage)>, connection_slab: Rc>>, ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, + server_start_instant: ServerStartInstant, connection_id: ConnectionId, } @@ -640,7 +656,7 @@ impl ConnectionWriter { let finished = if let Some(pending) = Slab::get_mut( &mut RefCell::borrow_mut(&self.pending_scrape_slab), - pending_scrape_id.0, + pending_scrape_id.0 as usize, ) { pending.stats.extend(out_message.files); pending.pending_worker_out_messages -= 1; @@ -654,7 +670,7 @@ impl ConnectionWriter { let out_message = { let mut slab = RefCell::borrow_mut(&self.pending_scrape_slab); - let pending = slab.remove(pending_scrape_id.0); + let pending = slab.remove(pending_scrape_id.0 as usize); slab.shrink_to_fit(); @@ -694,13 +710,16 @@ impl ConnectionWriter { self.connection_id.0 ) })? - .valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + .valid_until = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); Ok(()) } Ok(Err(err)) => Err(err.into()), Err(err) => { - ::log::info!("send_out_message: sending to peer took to long: {}", err); + ::log::debug!("send_out_message: sending to peer took to long: {}", err); Ok(()) } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 22a9f02..b8a06f7 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -1,7 +1,7 @@ use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use futures::StreamExt; @@ -12,7 +12,10 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{extract_response_peers, AmortizedIndexMap, PanicSentinel}; +use aquatic_common::{ + extract_response_peers, AmortizedIndexMap, PanicSentinel, SecondsSinceServerStart, + ServerStartInstant, +}; use aquatic_ws_protocol::*; use crate::common::*; @@ -44,8 +47,9 @@ impl PeerStatus { #[derive(Clone, Copy)] struct Peer { - pub connection_meta: ConnectionMeta, - pub status: PeerStatus, + pub consumer_id: ConsumerId, + pub connection_id: ConnectionId, + pub seeder: bool, pub valid_until: ValidUntil, } @@ -71,14 +75,10 @@ impl Default for TorrentData { impl TorrentData { pub fn remove_peer(&mut self, peer_id: PeerId) { if let Some(peer) = self.peers.remove(&peer_id) { - match peer.status { - PeerStatus::Leeching => { - self.num_leechers -= 1; - } - PeerStatus::Seeding => { - self.num_seeders -= 1; - } - PeerStatus::Stopped => (), + if peer.seeder { + self.num_seeders -= 1; + } else { + self.num_leechers -= 1; } } } @@ -93,20 +93,25 @@ struct TorrentMaps { } impl TorrentMaps { - fn clean(&mut self, config: &Config, access_list: &Arc) { + fn clean( + &mut self, + config: &Config, + access_list: &Arc, + server_start_instant: ServerStartInstant, + ) { let mut access_list_cache = create_access_list_cache(access_list); + let now = server_start_instant.seconds_elapsed(); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4); - Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4, now); + Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6, now); } fn clean_torrent_map( config: &Config, access_list_cache: &mut AccessListCache, torrent_map: &mut TorrentMap, + now: SecondsSinceServerStart, ) { - let now = Instant::now(); - torrent_map.retain(|info_hash, torrent_data| { if !access_list_cache .load() @@ -119,18 +124,14 @@ impl TorrentMaps { let num_leechers = &mut torrent_data.num_leechers; torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; + let keep = peer.valid_until.valid(now); if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; + if peer.seeder { + *num_seeders -= 1; + } else { + *num_leechers -= 1; + } } keep @@ -148,8 +149,9 @@ pub async fn run_swarm_worker( config: Config, state: State, control_message_mesh_builder: MeshBuilder, - in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, - out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, + in_message_mesh_builder: MeshBuilder<(InMessageMeta, InMessage), Partial>, + out_message_mesh_builder: MeshBuilder<(OutMessageMeta, OutMessage), Partial>, + server_start_instant: ServerStartInstant, ) { let (_, mut control_message_receivers) = control_message_mesh_builder .join(Role::Consumer) @@ -167,7 +169,7 @@ pub async fn run_swarm_worker( // Periodically clean torrents TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { enclose!((config, torrents, access_list) move || async move { - torrents.borrow_mut().clean(&config, &access_list); + torrents.borrow_mut().clean(&config, &access_list, server_start_instant); Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) })() @@ -186,6 +188,7 @@ pub async fn run_swarm_worker( let handle = spawn_local(handle_request_stream( config.clone(), torrents.clone(), + server_start_instant, out_message_senders.clone(), receiver, )) @@ -229,19 +232,23 @@ where async fn handle_request_stream( config: Config, torrents: Rc>, - out_message_senders: Rc>, + server_start_instant: ServerStartInstant, + out_message_senders: Rc>, stream: S, ) where - S: futures_lite::Stream + ::std::marker::Unpin, + S: futures_lite::Stream + ::std::marker::Unpin, { let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new( + server_start_instant, + max_peer_age, + ))); TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + *peer_valid_until.borrow_mut() = ValidUntil::new(server_start_instant, max_peer_age); Some(Duration::from_secs(1)) })() @@ -279,14 +286,12 @@ async fn handle_request_stream( }; for (meta, out_message) in out_messages.drain(..) { - ::log::info!("swarm worker trying to send OutMessage to socket worker"); - out_message_senders - .send_to(meta.out_message_consumer_id.0, (meta, out_message)) + .send_to(meta.out_message_consumer_id.0 as usize, (meta, out_message)) .await .expect("failed sending out_message to socket worker"); - ::log::info!("swarm worker sent OutMessage to socket worker"); + ::log::debug!("swarm worker sent OutMessage to socket worker"); } }, ) @@ -297,9 +302,9 @@ fn handle_announce_request( config: &Config, rng: &mut SmallRng, torrent_maps: &mut TorrentMaps, - out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, valid_until: ValidUntil, - request_sender_meta: ConnectionMeta, + request_sender_meta: InMessageMeta, request: AnnounceRequest, ) { let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version { @@ -313,7 +318,7 @@ fn handle_announce_request( // peers have access to each others peer_id's, they could send requests // using them, causing all sorts of issues. if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { - if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id { + if request_sender_meta.connection_id != previous_peer.connection_id { return; } } @@ -327,31 +332,39 @@ fn handle_announce_request( request.bytes_left, ); - let peer = Peer { - connection_meta: request_sender_meta, - status: peer_status, - valid_until, - }; - let opt_removed_peer = match peer_status { PeerStatus::Leeching => { torrent_data.num_leechers += 1; + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: false, + valid_until, + }; + torrent_data.peers.insert(request.peer_id, peer) } PeerStatus::Seeding => { torrent_data.num_seeders += 1; + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: true, + valid_until, + }; + torrent_data.peers.insert(request.peer_id, peer) } PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id), }; - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { + match opt_removed_peer.map(|peer| peer.seeder) { + Some(false) => { torrent_data.num_leechers -= 1; } - Some(PeerStatus::Seeding) => { + Some(true) => { torrent_data.num_seeders -= 1; } _ => {} @@ -385,14 +398,14 @@ fn handle_announce_request( offer_id: offer.offer_id, }; - out_messages.push(( - offer_receiver.connection_meta, - OutMessage::Offer(middleman_offer), - )); - ::log::trace!( - "sending middleman offer to {:?}", - offer_receiver.connection_meta - ); + let meta = OutMessageMeta { + out_message_consumer_id: offer_receiver.consumer_id, + connection_id: offer_receiver.connection_id, + pending_scrape_id: None, + }; + + out_messages.push((meta, OutMessage::Offer(middleman_offer))); + ::log::trace!("sending middleman offer to {:?}", meta); } } @@ -409,14 +422,14 @@ fn handle_announce_request( offer_id, }; - out_messages.push(( - answer_receiver.connection_meta, - OutMessage::Answer(middleman_answer), - )); - ::log::trace!( - "sending middleman answer to {:?}", - answer_receiver.connection_meta - ); + let meta = OutMessageMeta { + out_message_consumer_id: answer_receiver.consumer_id, + connection_id: answer_receiver.connection_id, + pending_scrape_id: None, + }; + + out_messages.push((meta, OutMessage::Answer(middleman_answer))); + ::log::trace!("sending middleman answer to {:?}", meta); } } @@ -428,14 +441,14 @@ fn handle_announce_request( announce_interval: config.protocol.peer_announce_interval, }); - out_messages.push((request_sender_meta, out_message)); + out_messages.push((request_sender_meta.into(), out_message)); } fn handle_scrape_request( config: &Config, torrent_maps: &mut TorrentMaps, - out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, - meta: ConnectionMeta, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + meta: InMessageMeta, request: ScrapeRequest, ) { let info_hashes = if let Some(info_hashes) = request.info_hashes { @@ -469,5 +482,5 @@ fn handle_scrape_request( } } - out_messages.push((meta, OutMessage::ScrapeResponse(out_message))); + out_messages.push((meta.into(), OutMessage::ScrapeResponse(out_message))); }