diff --git a/Cargo.lock b/Cargo.lock index 9f0cdd3..bbe6059 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,12 +31,12 @@ dependencies = [ "bittorrent_udp", "cli_helpers", "crossbeam-channel", - "dashmap", "histogram", "indexmap", "mimalloc", "mio", "net2", + "parking_lot", "quickcheck", "quickcheck_macros", "rand", @@ -129,6 +129,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + [[package]] name = "bittorrent_udp" version = "0.1.0" @@ -178,6 +184,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + [[package]] name = "cmake" version = "0.1.42" @@ -371,6 +386,15 @@ dependencies = [ "cmake", ] +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.8" @@ -564,6 +588,30 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" +[[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e136c1904604defe99ce5fd71a28d473fa60a12255d511aa78a9ddf11237aeb" +dependencies = [ + "cfg-if", + "cloudabi", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + [[package]] name = "plotly" version = "0.4.1" @@ -721,6 +769,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "serde" version = "1.0.106" @@ -752,6 +806,12 @@ dependencies = [ "serde", ] +[[package]] +name = "smallvec" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05720e22615919e4734f6a99ceae50d00226c3c5aca406e102ebc33298214e0a" + [[package]] name = "socket2" version = "0.3.12" diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index cedaca3..94e59ce 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -15,12 +15,12 @@ name = "aquatic" bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } crossbeam-channel = "0.4" -dashmap = "3" histogram = "0.6" indexmap = "1" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } net2 = "0.2" +parking_lot = "0.10" rand = { version = "0.7", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/aquatic/src/lib/common.rs b/aquatic/src/lib/common.rs index 4ff7273..834579a 100644 --- a/aquatic/src/lib/common.rs +++ b/aquatic/src/lib/common.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicUsize; +use std::collections::HashMap; use std::net::{SocketAddr, IpAddr}; +use std::sync::{Arc, atomic::AtomicUsize}; use std::time::Instant; -use dashmap::DashMap; +use parking_lot::Mutex; use indexmap::IndexMap; pub use bittorrent_udp::types::*; @@ -23,7 +23,7 @@ pub struct ConnectionKey { } -pub type ConnectionMap = DashMap; +pub type ConnectionMap = HashMap; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] @@ -119,7 +119,7 @@ impl Default for TorrentData { } -pub type TorrentMap = DashMap; +pub type TorrentMap = HashMap; #[derive(Default)] @@ -134,16 +134,16 @@ pub struct Statistics { #[derive(Clone)] pub struct State { - pub connections: Arc, - pub torrents: Arc, + pub connections: Arc>, + pub torrents: Arc>, pub statistics: Arc, } impl State { pub fn new() -> Self { Self { - connections: Arc::new(DashMap::new()), - torrents: Arc::new(DashMap::new()), + connections: Arc::new(Mutex::new(HashMap::new())), + torrents: Arc::new(Mutex::new(HashMap::new())), statistics: Arc::new(Statistics::default()), } } diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 37f9887..41fa3ed 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -3,6 +3,7 @@ use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use std::vec::Drain; +use parking_lot::{Mutex, MutexGuard}; use crossbeam_channel::{Sender, Receiver}; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; @@ -54,21 +55,28 @@ pub fn handle( } } + let mut connections = state.connections.lock(); + handle_connect_requests( - &state, + &mut connections, &mut std_rng, connect_requests.drain(..), &response_sender ); + + let mut torrents = state.torrents.lock(); + handle_announce_requests( - &state, + &connections, + &mut torrents, &config, &mut small_rng, announce_requests.drain(..), &response_sender ); handle_scrape_requests( - &state, + &connections, + &mut torrents, scrape_requests.drain(..), &response_sender ); @@ -78,7 +86,7 @@ pub fn handle( #[inline] pub fn handle_connect_requests( - state: &State, + connections: &mut MutexGuard, rng: &mut StdRng, requests: Drain<(ConnectRequest, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>, @@ -93,7 +101,7 @@ pub fn handle_connect_requests( socket_addr: src, }; - state.connections.insert(key, now); + connections.insert(key, now); let response = Response::Connect( ConnectResponse { @@ -109,7 +117,8 @@ pub fn handle_connect_requests( #[inline] pub fn handle_announce_requests( - state: &State, + connections: &MutexGuard, + torrents: &mut MutexGuard, config: &Config, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, @@ -121,7 +130,7 @@ pub fn handle_announce_requests( socket_addr: src, }; - if !state.connections.contains_key(&connection_key){ + if !connections.contains_key(&connection_key){ let response = ErrorResponse { transaction_id: request.transaction_id, message: "Connection invalid or expired".to_string() @@ -138,7 +147,7 @@ pub fn handle_announce_requests( let peer = Peer::from_announce_and_ip(&request, src.ip()); let peer_status = peer.status; - let mut torrent_data = state.torrents + let mut torrent_data = torrents .entry(request.info_hash) .or_default(); @@ -150,10 +159,6 @@ pub fn handle_announce_requests( .map(|peer| peer.status) }; - // Downgrade ref since we don't need write access anymore, so that - // other threads can access torrent - let torrent_data = torrent_data.downgrade(); - let max_num_peers_to_take = (request.peers_wanted.0.max(0) as usize) .min(config.network.max_response_peers); @@ -198,7 +203,8 @@ pub fn handle_announce_requests( #[inline] pub fn handle_scrape_requests( - state: &State, + connections: &MutexGuard, + torrents: &MutexGuard, requests: Drain<(ScrapeRequest, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>, ){ @@ -210,7 +216,7 @@ pub fn handle_scrape_requests( socket_addr: src, }; - if !state.connections.contains_key(&connection_key){ + if !connections.contains_key(&connection_key){ let response = ErrorResponse { transaction_id: request.transaction_id, message: "Connection invalid or expired".to_string() @@ -224,7 +230,7 @@ pub fn handle_scrape_requests( ); for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = state.torrents.get(info_hash){ + if let Some(torrent_data) = torrents.get(info_hash){ stats.push(create_torrent_scrape_statistics( torrent_data.num_seeders.load(Ordering::SeqCst) as i32, torrent_data.num_leechers.load(Ordering::SeqCst) as i32, diff --git a/aquatic/src/lib/tasks.rs b/aquatic/src/lib/tasks.rs index d97cef1..82725de 100644 --- a/aquatic/src/lib/tasks.rs +++ b/aquatic/src/lib/tasks.rs @@ -12,8 +12,10 @@ pub fn clean_connections(state: &State, config: &Config){ config.cleaning.max_connection_age ); - state.connections.retain(|_, v| v.0 > limit); - state.connections.shrink_to_fit(); + let mut connections = state.connections.lock(); + + connections.retain(|_, v| v.0 > limit); + connections.shrink_to_fit(); } @@ -22,7 +24,9 @@ pub fn clean_torrents(state: &State, config: &Config){ config.cleaning.max_peer_age ); - state.torrents.retain(|_, torrent| { + let mut torrents = state.torrents.lock(); + + torrents.retain(|_, torrent| { let num_seeders = &torrent.num_seeders; let num_leechers = &torrent.num_leechers; @@ -47,7 +51,7 @@ pub fn clean_torrents(state: &State, config: &Config){ !torrent.peers.is_empty() }); - state.torrents.shrink_to_fit(); + torrents.shrink_to_fit(); } @@ -94,7 +98,9 @@ pub fn gather_and_print_statistics( let mut peers_per_torrent = Histogram::new(); - for torrent in state.torrents.iter(){ + let torrents = state.torrents.lock(); + + for torrent in torrents.values(){ let num_seeders = torrent.num_seeders.load(Ordering::SeqCst); let num_leechers = torrent.num_leechers.load(Ordering::SeqCst);