From 06ff4ad9d070b6ffbc57d422f2cfb82c8040c0ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 12 Apr 2020 22:02:54 +0200 Subject: [PATCH] aquatic: save valid_until Instant in connections and requests Hopefully prevents strange subtraction overflow error, but we'll see. --- TODO.md | 2 +- aquatic/src/lib/common.rs | 36 +++++++++++++++--------------------- aquatic/src/lib/handlers.rs | 36 +++++++++++++++++++++++++----------- aquatic/src/lib/lib.rs | 2 +- aquatic/src/lib/tasks.rs | 15 +++++---------- 5 files changed, 47 insertions(+), 44 deletions(-) diff --git a/TODO.md b/TODO.md index e308428..04cb0f1 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,6 @@ # TODO ## aquatic -* `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9` * Use bounded request channel? * Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4 @@ -19,6 +18,7 @@ # Not important +* No overflow on instant + duration arithmetic now, hopefully? * extract_response_peers * Cleaner code * Stack-allocated vector? diff --git a/aquatic/src/lib/common.rs b/aquatic/src/lib/common.rs index fe54001..f3e1e73 100644 --- a/aquatic/src/lib/common.rs +++ b/aquatic/src/lib/common.rs @@ -1,6 +1,6 @@ use std::net::{SocketAddr, IpAddr}; use std::sync::{Arc, atomic::AtomicUsize}; -use std::time::Instant; +use std::time::{Duration, Instant}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -12,8 +12,19 @@ pub use bittorrent_udp::types::*; pub const MAX_PACKET_SIZE: usize = 4096; +/// Peer or connection valid until this instant +/// +/// Used instead of "last seen" or similar to hopefully prevent arithmetic +/// overflow when cleaning. #[derive(Debug, Clone, Copy)] -pub struct Time(pub Instant); +pub struct ValidUntil(pub Instant); + + +impl ValidUntil { + pub fn new(offset_seconds: u64) -> Self { + Self(Instant::now() + Duration::from_secs(offset_seconds)) + } +} #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -23,7 +34,7 @@ pub struct ConnectionKey { } -pub type ConnectionMap = HashMap; +pub type ConnectionMap = HashMap; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] @@ -59,7 +70,7 @@ pub struct Peer { pub ip_address: IpAddr, pub port: Port, pub status: PeerStatus, - pub last_announce: Time + pub valid_until: ValidUntil } @@ -71,23 +82,6 @@ impl Peer { port: self.port } } - #[inline] - pub fn from_announce_and_ip( - announce_request: &AnnounceRequest, - ip_address: IpAddr - ) -> Self { - let status = PeerStatus::from_event_and_bytes_left( - announce_request.event, - announce_request.bytes_left - ); - - Self { - ip_address, - port: announce_request.port, - status, - last_announce: Time(Instant::now()) - } - } } diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 59a70dd..6882da9 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -1,10 +1,10 @@ use std::net::SocketAddr; use std::sync::atomic::Ordering; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::vec::Drain; -use parking_lot::MutexGuard; use crossbeam_channel::{Sender, Receiver}; +use parking_lot::MutexGuard; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; use bittorrent_udp::types::*; @@ -79,6 +79,7 @@ pub fn run_request_worker( ); handle_connect_requests( + &config, &mut data, &mut std_rng, connect_requests.drain(..), @@ -86,8 +87,8 @@ pub fn run_request_worker( ); handle_announce_requests( - &mut data, &config, + &mut data, &mut small_rng, announce_requests.drain(..), &mut responses @@ -111,12 +112,13 @@ pub fn run_request_worker( #[inline] pub fn handle_connect_requests( + config: &Config, data: &mut MutexGuard, rng: &mut StdRng, requests: Drain<(ConnectRequest, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>, ){ - let now = Time(Instant::now()); + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); responses.extend(requests.map(|(request, src)| { let connection_id = ConnectionId(rng.gen()); @@ -126,7 +128,7 @@ pub fn handle_connect_requests( socket_addr: src, }; - data.connections.insert(key, now); + data.connections.insert(key, valid_until); let response = Response::Connect( ConnectResponse { @@ -142,12 +144,14 @@ pub fn handle_connect_requests( #[inline] pub fn handle_announce_requests( - data: &mut MutexGuard, config: &Config, + data: &mut MutexGuard, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>, ){ + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + responses.extend(requests.map(|(request, src)| { let connection_key = ConnectionKey { connection_id: request.connection_id, @@ -163,13 +167,24 @@ pub fn handle_announce_requests( return (response.into(), src); } + let peer_ip = src.ip(); + let peer_key = PeerMapKey { - ip: src.ip(), + ip: peer_ip, peer_id: request.peer_id, }; - let peer = Peer::from_announce_and_ip(&request, src.ip()); - let peer_status = peer.status; + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event, + request.bytes_left + ); + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: peer_status, + valid_until: peer_valid_until, + }; let torrent_data = data.torrents .entry(request.info_hash) @@ -343,7 +358,6 @@ pub fn create_torrent_scrape_statistics( #[cfg(test)] mod tests { - use std::time::Instant; use std::net::IpAddr; use std::collections::HashSet; @@ -365,7 +379,7 @@ mod tests { ip_address, port: Port(1), status: PeerStatus::Leeching, - last_announce: Time(Instant::now()), + valid_until: ValidUntil::new(0), }; (key, value) diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index a1d13b8..2cc4694 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -70,6 +70,6 @@ pub fn run(config: Config){ loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_connections_and_torrents(&state, &config); + tasks::clean_connections_and_torrents(&state); } } diff --git a/aquatic/src/lib/tasks.rs b/aquatic/src/lib/tasks.rs index fcd6c75..1d758bb 100644 --- a/aquatic/src/lib/tasks.rs +++ b/aquatic/src/lib/tasks.rs @@ -1,5 +1,5 @@ use std::sync::atomic::Ordering; -use std::time::{Duration, Instant}; +use std::time::Instant; use histogram::Histogram; @@ -7,17 +7,12 @@ use crate::common::*; use crate::config::Config; -pub fn clean_connections_and_torrents(state: &State, config: &Config){ - let connection_limit = Instant::now() - Duration::from_secs( - config.cleaning.max_connection_age - ); - let peer_limit = Instant::now() - Duration::from_secs( - config.cleaning.max_peer_age - ); +pub fn clean_connections_and_torrents(state: &State){ + let now = Instant::now(); let mut data = state.handler_data.lock(); - data.connections.retain(|_, v| v.0 > connection_limit); + data.connections.retain(|_, v| v.0 > now); data.connections.shrink_to_fit(); data.torrents.retain(|_, torrent| { @@ -25,7 +20,7 @@ pub fn clean_connections_and_torrents(state: &State, config: &Config){ let num_leechers = &torrent.num_leechers; torrent.peers.retain(|_, peer| { - let keep = peer.last_announce.0 > peer_limit; + let keep = peer.valid_until.0 > now; if !keep { match peer.status {