Remove DashMap, use parking_lot::Mutex<HashMap> instead

This commit is contained in:
Joakim Frostegård 2020-04-11 22:07:09 +02:00
parent 70cc193522
commit 558ddadf28
5 changed files with 103 additions and 31 deletions

62
Cargo.lock generated
View file

@ -31,12 +31,12 @@ dependencies = [
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
"crossbeam-channel", "crossbeam-channel",
"dashmap",
"histogram", "histogram",
"indexmap", "indexmap",
"mimalloc", "mimalloc",
"mio", "mio",
"net2", "net2",
"parking_lot",
"quickcheck", "quickcheck",
"quickcheck_macros", "quickcheck_macros",
"rand", "rand",
@ -129,6 +129,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]] [[package]]
name = "bittorrent_udp" name = "bittorrent_udp"
version = "0.1.0" version = "0.1.0"
@ -178,6 +184,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "cmake" name = "cmake"
version = "0.1.42" version = "0.1.42"
@ -371,6 +386,15 @@ dependencies = [
"cmake", "cmake",
] ]
[[package]]
name = "lock_api"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
dependencies = [
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.8" version = "0.4.8"
@ -564,6 +588,30 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a"
[[package]]
name = "parking_lot"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e136c1904604defe99ce5fd71a28d473fa60a12255d511aa78a9ddf11237aeb"
dependencies = [
"cfg-if",
"cloudabi",
"libc",
"redox_syscall",
"smallvec",
"winapi",
]
[[package]] [[package]]
name = "plotly" name = "plotly"
version = "0.4.1" version = "0.4.1"
@ -721,6 +769,12 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76" checksum = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.106" version = "1.0.106"
@ -752,6 +806,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "smallvec"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05720e22615919e4734f6a99ceae50d00226c3c5aca406e102ebc33298214e0a"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.3.12" version = "0.3.12"

View file

@ -15,12 +15,12 @@ name = "aquatic"
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
crossbeam-channel = "0.4" crossbeam-channel = "0.4"
dashmap = "3"
histogram = "0.6" histogram = "0.6"
indexmap = "1" indexmap = "1"
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
net2 = "0.2" net2 = "0.2"
parking_lot = "0.10"
rand = { version = "0.7", features = ["small_rng"] } rand = { version = "0.7", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }

View file

@ -1,9 +1,9 @@
use std::sync::Arc; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use std::sync::{Arc, atomic::AtomicUsize};
use std::time::Instant; use std::time::Instant;
use dashmap::DashMap; use parking_lot::Mutex;
use indexmap::IndexMap; use indexmap::IndexMap;
pub use bittorrent_udp::types::*; pub use bittorrent_udp::types::*;
@ -23,7 +23,7 @@ pub struct ConnectionKey {
} }
pub type ConnectionMap = DashMap<ConnectionKey, Time>; pub type ConnectionMap = HashMap<ConnectionKey, Time>;
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
@ -119,7 +119,7 @@ impl Default for TorrentData {
} }
pub type TorrentMap = DashMap<InfoHash, TorrentData>; pub type TorrentMap = HashMap<InfoHash, TorrentData>;
#[derive(Default)] #[derive(Default)]
@ -134,16 +134,16 @@ pub struct Statistics {
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
pub connections: Arc<ConnectionMap>, pub connections: Arc<Mutex<ConnectionMap>>,
pub torrents: Arc<TorrentMap>, pub torrents: Arc<Mutex<TorrentMap>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
} }
impl State { impl State {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
connections: Arc::new(DashMap::new()), connections: Arc::new(Mutex::new(HashMap::new())),
torrents: Arc::new(DashMap::new()), torrents: Arc::new(Mutex::new(HashMap::new())),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
} }
} }

View file

@ -3,6 +3,7 @@ use std::sync::atomic::Ordering;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::vec::Drain; use std::vec::Drain;
use parking_lot::{Mutex, MutexGuard};
use crossbeam_channel::{Sender, Receiver}; use crossbeam_channel::{Sender, Receiver};
use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}};
@ -54,21 +55,28 @@ pub fn handle(
} }
} }
let mut connections = state.connections.lock();
handle_connect_requests( handle_connect_requests(
&state, &mut connections,
&mut std_rng, &mut std_rng,
connect_requests.drain(..), connect_requests.drain(..),
&response_sender &response_sender
); );
let mut torrents = state.torrents.lock();
handle_announce_requests( handle_announce_requests(
&state, &connections,
&mut torrents,
&config, &config,
&mut small_rng, &mut small_rng,
announce_requests.drain(..), announce_requests.drain(..),
&response_sender &response_sender
); );
handle_scrape_requests( handle_scrape_requests(
&state, &connections,
&mut torrents,
scrape_requests.drain(..), scrape_requests.drain(..),
&response_sender &response_sender
); );
@ -78,7 +86,7 @@ pub fn handle(
#[inline] #[inline]
pub fn handle_connect_requests( pub fn handle_connect_requests(
state: &State, connections: &mut MutexGuard<ConnectionMap>,
rng: &mut StdRng, rng: &mut StdRng,
requests: Drain<(ConnectRequest, SocketAddr)>, requests: Drain<(ConnectRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>,
@ -93,7 +101,7 @@ pub fn handle_connect_requests(
socket_addr: src, socket_addr: src,
}; };
state.connections.insert(key, now); connections.insert(key, now);
let response = Response::Connect( let response = Response::Connect(
ConnectResponse { ConnectResponse {
@ -109,7 +117,8 @@ pub fn handle_connect_requests(
#[inline] #[inline]
pub fn handle_announce_requests( pub fn handle_announce_requests(
state: &State, connections: &MutexGuard<ConnectionMap>,
torrents: &mut MutexGuard<TorrentMap>,
config: &Config, config: &Config,
rng: &mut SmallRng, rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>,
@ -121,7 +130,7 @@ pub fn handle_announce_requests(
socket_addr: src, socket_addr: src,
}; };
if !state.connections.contains_key(&connection_key){ if !connections.contains_key(&connection_key){
let response = ErrorResponse { let response = ErrorResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
@ -138,7 +147,7 @@ pub fn handle_announce_requests(
let peer = Peer::from_announce_and_ip(&request, src.ip()); let peer = Peer::from_announce_and_ip(&request, src.ip());
let peer_status = peer.status; let peer_status = peer.status;
let mut torrent_data = state.torrents let mut torrent_data = torrents
.entry(request.info_hash) .entry(request.info_hash)
.or_default(); .or_default();
@ -150,10 +159,6 @@ pub fn handle_announce_requests(
.map(|peer| peer.status) .map(|peer| peer.status)
}; };
// Downgrade ref since we don't need write access anymore, so that
// other threads can access torrent
let torrent_data = torrent_data.downgrade();
let max_num_peers_to_take = (request.peers_wanted.0.max(0) as usize) let max_num_peers_to_take = (request.peers_wanted.0.max(0) as usize)
.min(config.network.max_response_peers); .min(config.network.max_response_peers);
@ -198,7 +203,8 @@ pub fn handle_announce_requests(
#[inline] #[inline]
pub fn handle_scrape_requests( pub fn handle_scrape_requests(
state: &State, connections: &MutexGuard<ConnectionMap>,
torrents: &MutexGuard<TorrentMap>,
requests: Drain<(ScrapeRequest, SocketAddr)>, requests: Drain<(ScrapeRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>, response_sender: &Sender<(Response, SocketAddr)>,
){ ){
@ -210,7 +216,7 @@ pub fn handle_scrape_requests(
socket_addr: src, socket_addr: src,
}; };
if !state.connections.contains_key(&connection_key){ if !connections.contains_key(&connection_key){
let response = ErrorResponse { let response = ErrorResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
@ -224,7 +230,7 @@ pub fn handle_scrape_requests(
); );
for info_hash in request.info_hashes.iter() { for info_hash in request.info_hashes.iter() {
if let Some(torrent_data) = state.torrents.get(info_hash){ if let Some(torrent_data) = torrents.get(info_hash){
stats.push(create_torrent_scrape_statistics( stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders.load(Ordering::SeqCst) as i32, torrent_data.num_seeders.load(Ordering::SeqCst) as i32,
torrent_data.num_leechers.load(Ordering::SeqCst) as i32, torrent_data.num_leechers.load(Ordering::SeqCst) as i32,

View file

@ -12,8 +12,10 @@ pub fn clean_connections(state: &State, config: &Config){
config.cleaning.max_connection_age config.cleaning.max_connection_age
); );
state.connections.retain(|_, v| v.0 > limit); let mut connections = state.connections.lock();
state.connections.shrink_to_fit();
connections.retain(|_, v| v.0 > limit);
connections.shrink_to_fit();
} }
@ -22,7 +24,9 @@ pub fn clean_torrents(state: &State, config: &Config){
config.cleaning.max_peer_age config.cleaning.max_peer_age
); );
state.torrents.retain(|_, torrent| { let mut torrents = state.torrents.lock();
torrents.retain(|_, torrent| {
let num_seeders = &torrent.num_seeders; let num_seeders = &torrent.num_seeders;
let num_leechers = &torrent.num_leechers; let num_leechers = &torrent.num_leechers;
@ -47,7 +51,7 @@ pub fn clean_torrents(state: &State, config: &Config){
!torrent.peers.is_empty() !torrent.peers.is_empty()
}); });
state.torrents.shrink_to_fit(); torrents.shrink_to_fit();
} }
@ -94,7 +98,9 @@ pub fn gather_and_print_statistics(
let mut peers_per_torrent = Histogram::new(); let mut peers_per_torrent = Histogram::new();
for torrent in state.torrents.iter(){ let torrents = state.torrents.lock();
for torrent in torrents.values(){
let num_seeders = torrent.num_seeders.load(Ordering::SeqCst); let num_seeders = torrent.num_seeders.load(Ordering::SeqCst);
let num_leechers = torrent.num_leechers.load(Ordering::SeqCst); let num_leechers = torrent.num_leechers.load(Ordering::SeqCst);