aquatic: lock connections and torrents together; optimize handler

Handler: don't stop collecting requests early unless mutex can
be unlocked.
This commit is contained in:
Joakim Frostegård 2020-04-12 14:44:34 +02:00
parent 62a40317f9
commit 2779b6ec3a
5 changed files with 62 additions and 46 deletions

View file

@ -2,9 +2,6 @@
## aquatic ## aquatic
* `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9` * `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9`
* Put connections and torrent in a struct behind a commong lock. Add
functionality for checking if mutex is unlocked before quitting to
collect requests from channel (try_recv) up to a limit.
* Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make * Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make
use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4 use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4
ones, have to check. ones, have to check.

View file

@ -132,18 +132,33 @@ pub struct Statistics {
} }
pub struct HandlerData {
pub connections: ConnectionMap,
pub torrents: TorrentMap,
}
impl Default for HandlerData {
fn default() -> Self {
Self {
connections: HashMap::new(),
torrents: HashMap::new(),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
pub connections: Arc<Mutex<ConnectionMap>>, pub handler_data: Arc<Mutex<HandlerData>>,
pub torrents: Arc<Mutex<TorrentMap>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
} }
impl State { impl State {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
connections: Arc::new(Mutex::new(HashMap::new())), handler_data: Arc::new(Mutex::new(HandlerData::default())),
torrents: Arc::new(Mutex::new(HashMap::new())),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
} }
} }

View file

@ -31,16 +31,31 @@ pub fn handle(
); );
loop { loop {
let mut opt_data = None;
// Collect requests from channel, divide them by type
//
// Collect a maximum number of request. Stop collecting before that
// number is reached if having waited for too long for a request, but
// only if HandlerData mutex isn't locked.
for i in 0..config.handlers.max_requests_per_iter { for i in 0..config.handlers.max_requests_per_iter {
let (request, src): (Request, SocketAddr) = if i == 0 { let (request, src): (Request, SocketAddr) = if i == 0 {
match request_receiver.recv(){ match request_receiver.recv(){
Ok(r) => r, Ok(r) => r,
Err(_) => break, Err(_) => break, // Really shouldn't happen
} }
} else { } else {
match request_receiver.recv_timeout(timeout){ match request_receiver.recv_timeout(timeout){
Ok(r) => r, Ok(r) => r,
Err(_) => break, Err(_) => {
if let Some(data) = state.handler_data.try_lock(){
opt_data = Some(data);
break
} else {
continue
}
},
} }
}; };
@ -57,28 +72,26 @@ pub fn handle(
} }
} }
let mut connections = state.connections.lock(); let mut data: MutexGuard<HandlerData> = opt_data.unwrap_or_else(||
state.handler_data.lock()
);
handle_connect_requests( handle_connect_requests(
&mut connections, &mut data,
&mut std_rng, &mut std_rng,
connect_requests.drain(..), connect_requests.drain(..),
&response_sender &response_sender
); );
let mut torrents = state.torrents.lock();
handle_announce_requests( handle_announce_requests(
&connections, &mut data,
&mut torrents,
&config, &config,
&mut small_rng, &mut small_rng,
announce_requests.drain(..), announce_requests.drain(..),
&response_sender &response_sender
); );
handle_scrape_requests( handle_scrape_requests(
&connections, &mut data,
&mut torrents,
scrape_requests.drain(..), scrape_requests.drain(..),
&response_sender &response_sender
); );
@ -88,7 +101,7 @@ pub fn handle(
#[inline] #[inline]
pub fn handle_connect_requests( pub fn handle_connect_requests(
connections: &mut MutexGuard<ConnectionMap>, data: &mut MutexGuard<HandlerData>,
rng: &mut StdRng, rng: &mut StdRng,
requests: Drain<(ConnectRequest, SocketAddr)>, requests: Drain<(ConnectRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>,
@ -103,7 +116,7 @@ pub fn handle_connect_requests(
socket_addr: src, socket_addr: src,
}; };
connections.insert(key, now); data.connections.insert(key, now);
let response = Response::Connect( let response = Response::Connect(
ConnectResponse { ConnectResponse {
@ -119,8 +132,7 @@ pub fn handle_connect_requests(
#[inline] #[inline]
pub fn handle_announce_requests( pub fn handle_announce_requests(
connections: &MutexGuard<ConnectionMap>, data: &mut MutexGuard<HandlerData>,
torrents: &mut MutexGuard<TorrentMap>,
config: &Config, config: &Config,
rng: &mut SmallRng, rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>,
@ -132,7 +144,7 @@ pub fn handle_announce_requests(
socket_addr: src, socket_addr: src,
}; };
if !connections.contains_key(&connection_key){ if !data.connections.contains_key(&connection_key){
let response = ErrorResponse { let response = ErrorResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
@ -149,7 +161,7 @@ pub fn handle_announce_requests(
let peer = Peer::from_announce_and_ip(&request, src.ip()); let peer = Peer::from_announce_and_ip(&request, src.ip());
let peer_status = peer.status; let peer_status = peer.status;
let mut torrent_data = torrents let mut torrent_data = data.torrents
.entry(request.info_hash) .entry(request.info_hash)
.or_default(); .or_default();
@ -205,8 +217,7 @@ pub fn handle_announce_requests(
#[inline] #[inline]
pub fn handle_scrape_requests( pub fn handle_scrape_requests(
connections: &MutexGuard<ConnectionMap>, data: &mut MutexGuard<HandlerData>,
torrents: &MutexGuard<TorrentMap>,
requests: Drain<(ScrapeRequest, SocketAddr)>, requests: Drain<(ScrapeRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>,
){ ){
@ -218,7 +229,7 @@ pub fn handle_scrape_requests(
socket_addr: src, socket_addr: src,
}; };
if !connections.contains_key(&connection_key){ if !data.connections.contains_key(&connection_key){
let response = ErrorResponse { let response = ErrorResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
@ -232,7 +243,7 @@ pub fn handle_scrape_requests(
); );
for info_hash in request.info_hashes.iter() { for info_hash in request.info_hashes.iter() {
if let Some(torrent_data) = torrents.get(info_hash){ if let Some(torrent_data) = data.torrents.get(info_hash){
stats.push(create_torrent_scrape_statistics( stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders.load(Ordering::SeqCst) as i32, torrent_data.num_seeders.load(Ordering::SeqCst) as i32,
torrent_data.num_leechers.load(Ordering::SeqCst) as i32, torrent_data.num_leechers.load(Ordering::SeqCst) as i32,

View file

@ -58,7 +58,6 @@ pub fn run(config: Config){
loop { loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); ::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::clean_connections(&state, &config); tasks::clean_connections_and_torrents(&state, &config);
tasks::clean_torrents(&state, &config);
} }
} }

View file

@ -7,31 +7,25 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn clean_connections(state: &State, config: &Config){ pub fn clean_connections_and_torrents(state: &State, config: &Config){
let limit = Instant::now() - Duration::from_secs( let connection_limit = Instant::now() - Duration::from_secs(
config.cleaning.max_connection_age config.cleaning.max_connection_age
); );
let peer_limit = Instant::now() - Duration::from_secs(
let mut connections = state.connections.lock();
connections.retain(|_, v| v.0 > limit);
connections.shrink_to_fit();
}
pub fn clean_torrents(state: &State, config: &Config){
let limit = Instant::now() - Duration::from_secs(
config.cleaning.max_peer_age config.cleaning.max_peer_age
); );
let mut torrents = state.torrents.lock(); let mut data = state.handler_data.lock();
torrents.retain(|_, torrent| { data.connections.retain(|_, v| v.0 > connection_limit);
data.connections.shrink_to_fit();
data.torrents.retain(|_, torrent| {
let num_seeders = &torrent.num_seeders; let num_seeders = &torrent.num_seeders;
let num_leechers = &torrent.num_leechers; let num_leechers = &torrent.num_leechers;
torrent.peers.retain(|_, peer| { torrent.peers.retain(|_, peer| {
let keep = peer.last_announce.0 > limit; let keep = peer.last_announce.0 > peer_limit;
if !keep { if !keep {
match peer.status { match peer.status {
@ -51,7 +45,7 @@ pub fn clean_torrents(state: &State, config: &Config){
!torrent.peers.is_empty() !torrent.peers.is_empty()
}); });
torrents.shrink_to_fit(); data.torrents.shrink_to_fit();
} }
@ -98,7 +92,7 @@ pub fn gather_and_print_statistics(
let mut peers_per_torrent = Histogram::new(); let mut peers_per_torrent = Histogram::new();
let torrents = state.torrents.lock(); let torrents = &mut state.handler_data.lock().torrents;
for torrent in torrents.values(){ for torrent in torrents.values(){
let num_seeders = torrent.num_seeders.load(Ordering::SeqCst); let num_seeders = torrent.num_seeders.load(Ordering::SeqCst);