diff --git a/TODO.md b/TODO.md index 44a4875..9e932bf 100644 --- a/TODO.md +++ b/TODO.md @@ -2,9 +2,6 @@ ## aquatic * `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9` -* Put connections and torrent in a struct behind a commong lock. Add - functionality for checking if mutex is unlocked before quitting to - collect requests from channel (try_recv) up to a limit. * Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4 ones, have to check. diff --git a/aquatic/src/lib/common.rs b/aquatic/src/lib/common.rs index a119c98..fe54001 100644 --- a/aquatic/src/lib/common.rs +++ b/aquatic/src/lib/common.rs @@ -132,18 +132,33 @@ pub struct Statistics { } +pub struct HandlerData { + pub connections: ConnectionMap, + pub torrents: TorrentMap, +} + + +impl Default for HandlerData { + fn default() -> Self { + Self { + connections: HashMap::new(), + torrents: HashMap::new(), + } + } +} + + #[derive(Clone)] pub struct State { - pub connections: Arc>, - pub torrents: Arc>, + pub handler_data: Arc>, pub statistics: Arc, } + impl State { pub fn new() -> Self { Self { - connections: Arc::new(Mutex::new(HashMap::new())), - torrents: Arc::new(Mutex::new(HashMap::new())), + handler_data: Arc::new(Mutex::new(HandlerData::default())), statistics: Arc::new(Statistics::default()), } } diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 1cb86b2..3a22844 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -31,16 +31,31 @@ pub fn handle( ); loop { + let mut opt_data = None; + + // Collect requests from channel, divide them by type + // + // Collect a maximum number of request. Stop collecting before that + // number is reached if having waited for too long for a request, but + // only if HandlerData mutex isn't locked. for i in 0..config.handlers.max_requests_per_iter { let (request, src): (Request, SocketAddr) = if i == 0 { match request_receiver.recv(){ Ok(r) => r, - Err(_) => break, + Err(_) => break, // Really shouldn't happen } } else { match request_receiver.recv_timeout(timeout){ Ok(r) => r, - Err(_) => break, + Err(_) => { + if let Some(data) = state.handler_data.try_lock(){ + opt_data = Some(data); + + break + } else { + continue + } + }, } }; @@ -57,28 +72,26 @@ pub fn handle( } } - let mut connections = state.connections.lock(); + let mut data: MutexGuard = opt_data.unwrap_or_else(|| + state.handler_data.lock() + ); handle_connect_requests( - &mut connections, + &mut data, &mut std_rng, connect_requests.drain(..), &response_sender ); - let mut torrents = state.torrents.lock(); - handle_announce_requests( - &connections, - &mut torrents, + &mut data, &config, &mut small_rng, announce_requests.drain(..), &response_sender ); handle_scrape_requests( - &connections, - &mut torrents, + &mut data, scrape_requests.drain(..), &response_sender ); @@ -88,7 +101,7 @@ pub fn handle( #[inline] pub fn handle_connect_requests( - connections: &mut MutexGuard, + data: &mut MutexGuard, rng: &mut StdRng, requests: Drain<(ConnectRequest, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>, @@ -103,7 +116,7 @@ pub fn handle_connect_requests( socket_addr: src, }; - connections.insert(key, now); + data.connections.insert(key, now); let response = Response::Connect( ConnectResponse { @@ -119,8 +132,7 @@ pub fn handle_connect_requests( #[inline] pub fn handle_announce_requests( - connections: &MutexGuard, - torrents: &mut MutexGuard, + data: &mut MutexGuard, config: &Config, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, @@ -132,7 +144,7 @@ pub fn handle_announce_requests( socket_addr: src, }; - if !connections.contains_key(&connection_key){ + if !data.connections.contains_key(&connection_key){ let response = ErrorResponse { transaction_id: request.transaction_id, message: "Connection invalid or expired".to_string() @@ -149,7 +161,7 @@ pub fn handle_announce_requests( let peer = Peer::from_announce_and_ip(&request, src.ip()); let peer_status = peer.status; - let mut torrent_data = torrents + let mut torrent_data = data.torrents .entry(request.info_hash) .or_default(); @@ -205,8 +217,7 @@ pub fn handle_announce_requests( #[inline] pub fn handle_scrape_requests( - connections: &MutexGuard, - torrents: &MutexGuard, + data: &mut MutexGuard, requests: Drain<(ScrapeRequest, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>, ){ @@ -218,7 +229,7 @@ pub fn handle_scrape_requests( socket_addr: src, }; - if !connections.contains_key(&connection_key){ + if !data.connections.contains_key(&connection_key){ let response = ErrorResponse { transaction_id: request.transaction_id, message: "Connection invalid or expired".to_string() @@ -232,7 +243,7 @@ pub fn handle_scrape_requests( ); for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.get(info_hash){ + if let Some(torrent_data) = data.torrents.get(info_hash){ stats.push(create_torrent_scrape_statistics( torrent_data.num_seeders.load(Ordering::SeqCst) as i32, torrent_data.num_leechers.load(Ordering::SeqCst) as i32, diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index 21f6e12..13c81ab 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -58,7 +58,6 @@ pub fn run(config: Config){ loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_connections(&state, &config); - tasks::clean_torrents(&state, &config); + tasks::clean_connections_and_torrents(&state, &config); } } diff --git a/aquatic/src/lib/tasks.rs b/aquatic/src/lib/tasks.rs index 82725de..fcd6c75 100644 --- a/aquatic/src/lib/tasks.rs +++ b/aquatic/src/lib/tasks.rs @@ -7,31 +7,25 @@ use crate::common::*; use crate::config::Config; -pub fn clean_connections(state: &State, config: &Config){ - let limit = Instant::now() - Duration::from_secs( +pub fn clean_connections_and_torrents(state: &State, config: &Config){ + let connection_limit = Instant::now() - Duration::from_secs( config.cleaning.max_connection_age ); - - let mut connections = state.connections.lock(); - - connections.retain(|_, v| v.0 > limit); - connections.shrink_to_fit(); -} - - -pub fn clean_torrents(state: &State, config: &Config){ - let limit = Instant::now() - Duration::from_secs( + let peer_limit = Instant::now() - Duration::from_secs( config.cleaning.max_peer_age ); - let mut torrents = state.torrents.lock(); + let mut data = state.handler_data.lock(); - torrents.retain(|_, torrent| { + data.connections.retain(|_, v| v.0 > connection_limit); + data.connections.shrink_to_fit(); + + data.torrents.retain(|_, torrent| { let num_seeders = &torrent.num_seeders; let num_leechers = &torrent.num_leechers; torrent.peers.retain(|_, peer| { - let keep = peer.last_announce.0 > limit; + let keep = peer.last_announce.0 > peer_limit; if !keep { match peer.status { @@ -51,7 +45,7 @@ pub fn clean_torrents(state: &State, config: &Config){ !torrent.peers.is_empty() }); - torrents.shrink_to_fit(); + data.torrents.shrink_to_fit(); } @@ -98,7 +92,7 @@ pub fn gather_and_print_statistics( let mut peers_per_torrent = Histogram::new(); - let torrents = state.torrents.lock(); + let torrents = &mut state.handler_data.lock().torrents; for torrent in torrents.values(){ let num_seeders = torrent.num_seeders.load(Ordering::SeqCst);