aquatic: save valid_until Instant in connections and requests

Hopefully prevents strange subtraction overflow error, but
we'll see.
This commit is contained in:
Joakim Frostegård 2020-04-12 22:02:54 +02:00
parent 587096f76f
commit 06ff4ad9d0
5 changed files with 47 additions and 44 deletions

View file

@ -1,7 +1,6 @@
# TODO # TODO
## aquatic ## aquatic
* `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9`
* Use bounded request channel? * Use bounded request channel?
* Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make * Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make
use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4 use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4
@ -19,6 +18,7 @@
# Not important # Not important
* No overflow on instant + duration arithmetic now, hopefully?
* extract_response_peers * extract_response_peers
* Cleaner code * Cleaner code
* Stack-allocated vector? * Stack-allocated vector?

View file

@ -1,6 +1,6 @@
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use std::sync::{Arc, atomic::AtomicUsize}; use std::sync::{Arc, atomic::AtomicUsize};
use std::time::Instant; use std::time::{Duration, Instant};
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
@ -12,8 +12,19 @@ pub use bittorrent_udp::types::*;
pub const MAX_PACKET_SIZE: usize = 4096; pub const MAX_PACKET_SIZE: usize = 4096;
/// Peer or connection valid until this instant
///
/// Used instead of "last seen" or similar to hopefully prevent arithmetic
/// overflow when cleaning.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub struct Time(pub Instant); pub struct ValidUntil(pub Instant);
impl ValidUntil {
pub fn new(offset_seconds: u64) -> Self {
Self(Instant::now() + Duration::from_secs(offset_seconds))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -23,7 +34,7 @@ pub struct ConnectionKey {
} }
pub type ConnectionMap = HashMap<ConnectionKey, Time>; pub type ConnectionMap = HashMap<ConnectionKey, ValidUntil>;
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
@ -59,7 +70,7 @@ pub struct Peer {
pub ip_address: IpAddr, pub ip_address: IpAddr,
pub port: Port, pub port: Port,
pub status: PeerStatus, pub status: PeerStatus,
pub last_announce: Time pub valid_until: ValidUntil
} }
@ -71,23 +82,6 @@ impl Peer {
port: self.port port: self.port
} }
} }
#[inline]
pub fn from_announce_and_ip(
announce_request: &AnnounceRequest,
ip_address: IpAddr
) -> Self {
let status = PeerStatus::from_event_and_bytes_left(
announce_request.event,
announce_request.bytes_left
);
Self {
ip_address,
port: announce_request.port,
status,
last_announce: Time(Instant::now())
}
}
} }

View file

@ -1,10 +1,10 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::Duration;
use std::vec::Drain; use std::vec::Drain;
use parking_lot::MutexGuard;
use crossbeam_channel::{Sender, Receiver}; use crossbeam_channel::{Sender, Receiver};
use parking_lot::MutexGuard;
use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}};
use bittorrent_udp::types::*; use bittorrent_udp::types::*;
@ -79,6 +79,7 @@ pub fn run_request_worker(
); );
handle_connect_requests( handle_connect_requests(
&config,
&mut data, &mut data,
&mut std_rng, &mut std_rng,
connect_requests.drain(..), connect_requests.drain(..),
@ -86,8 +87,8 @@ pub fn run_request_worker(
); );
handle_announce_requests( handle_announce_requests(
&mut data,
&config, &config,
&mut data,
&mut small_rng, &mut small_rng,
announce_requests.drain(..), announce_requests.drain(..),
&mut responses &mut responses
@ -111,12 +112,13 @@ pub fn run_request_worker(
#[inline] #[inline]
pub fn handle_connect_requests( pub fn handle_connect_requests(
config: &Config,
data: &mut MutexGuard<HandlerData>, data: &mut MutexGuard<HandlerData>,
rng: &mut StdRng, rng: &mut StdRng,
requests: Drain<(ConnectRequest, SocketAddr)>, requests: Drain<(ConnectRequest, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>,
){ ){
let now = Time(Instant::now()); let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
responses.extend(requests.map(|(request, src)| { responses.extend(requests.map(|(request, src)| {
let connection_id = ConnectionId(rng.gen()); let connection_id = ConnectionId(rng.gen());
@ -126,7 +128,7 @@ pub fn handle_connect_requests(
socket_addr: src, socket_addr: src,
}; };
data.connections.insert(key, now); data.connections.insert(key, valid_until);
let response = Response::Connect( let response = Response::Connect(
ConnectResponse { ConnectResponse {
@ -142,12 +144,14 @@ pub fn handle_connect_requests(
#[inline] #[inline]
pub fn handle_announce_requests( pub fn handle_announce_requests(
data: &mut MutexGuard<HandlerData>,
config: &Config, config: &Config,
data: &mut MutexGuard<HandlerData>,
rng: &mut SmallRng, rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>,
){ ){
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request, src)| { responses.extend(requests.map(|(request, src)| {
let connection_key = ConnectionKey { let connection_key = ConnectionKey {
connection_id: request.connection_id, connection_id: request.connection_id,
@ -163,13 +167,24 @@ pub fn handle_announce_requests(
return (response.into(), src); return (response.into(), src);
} }
let peer_ip = src.ip();
let peer_key = PeerMapKey { let peer_key = PeerMapKey {
ip: src.ip(), ip: peer_ip,
peer_id: request.peer_id, peer_id: request.peer_id,
}; };
let peer = Peer::from_announce_and_ip(&request, src.ip()); let peer_status = PeerStatus::from_event_and_bytes_left(
let peer_status = peer.status; request.event,
request.bytes_left
);
let peer = Peer {
ip_address: peer_ip,
port: request.port,
status: peer_status,
valid_until: peer_valid_until,
};
let torrent_data = data.torrents let torrent_data = data.torrents
.entry(request.info_hash) .entry(request.info_hash)
@ -343,7 +358,6 @@ pub fn create_torrent_scrape_statistics(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::Instant;
use std::net::IpAddr; use std::net::IpAddr;
use std::collections::HashSet; use std::collections::HashSet;
@ -365,7 +379,7 @@ mod tests {
ip_address, ip_address,
port: Port(1), port: Port(1),
status: PeerStatus::Leeching, status: PeerStatus::Leeching,
last_announce: Time(Instant::now()), valid_until: ValidUntil::new(0),
}; };
(key, value) (key, value)

View file

@ -70,6 +70,6 @@ pub fn run(config: Config){
loop { loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); ::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::clean_connections_and_torrents(&state, &config); tasks::clean_connections_and_torrents(&state);
} }
} }

View file

@ -1,5 +1,5 @@
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::Instant;
use histogram::Histogram; use histogram::Histogram;
@ -7,17 +7,12 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn clean_connections_and_torrents(state: &State, config: &Config){ pub fn clean_connections_and_torrents(state: &State){
let connection_limit = Instant::now() - Duration::from_secs( let now = Instant::now();
config.cleaning.max_connection_age
);
let peer_limit = Instant::now() - Duration::from_secs(
config.cleaning.max_peer_age
);
let mut data = state.handler_data.lock(); let mut data = state.handler_data.lock();
data.connections.retain(|_, v| v.0 > connection_limit); data.connections.retain(|_, v| v.0 > now);
data.connections.shrink_to_fit(); data.connections.shrink_to_fit();
data.torrents.retain(|_, torrent| { data.torrents.retain(|_, torrent| {
@ -25,7 +20,7 @@ pub fn clean_connections_and_torrents(state: &State, config: &Config){
let num_leechers = &torrent.num_leechers; let num_leechers = &torrent.num_leechers;
torrent.peers.retain(|_, peer| { torrent.peers.retain(|_, peer| {
let keep = peer.last_announce.0 > peer_limit; let keep = peer.valid_until.0 > now;
if !keep { if !keep {
match peer.status { match peer.status {