From f37ba1e52e57f908857bd1ce9af7e38e9003d026 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 11 Apr 2020 19:59:52 +0200 Subject: [PATCH] Use (unbounded) SegQueue instead of ArrayQueue --- aquatic/src/lib/common.rs | 14 ++++++------- aquatic/src/lib/config.rs | 4 ---- aquatic/src/lib/handlers.rs | 20 +++++-------------- aquatic/src/lib/lib.rs | 2 +- aquatic/src/lib/network.rs | 6 +----- .../src/bin/bench_handlers/announce.rs | 2 +- aquatic_bench/src/bin/bench_handlers/main.rs | 6 +++--- 7 files changed, 17 insertions(+), 37 deletions(-) diff --git a/aquatic/src/lib/common.rs b/aquatic/src/lib/common.rs index 917a128..52d18df 100644 --- a/aquatic/src/lib/common.rs +++ b/aquatic/src/lib/common.rs @@ -3,14 +3,12 @@ use std::sync::atomic::AtomicUsize; use std::net::{SocketAddr, IpAddr}; use std::time::Instant; -use crossbeam_queue::ArrayQueue; +use crossbeam_queue::SegQueue; use dashmap::DashMap; use indexmap::IndexMap; pub use bittorrent_udp::types::*; -use crate::config::Config; - pub const MAX_PACKET_SIZE: usize = 4096; @@ -140,18 +138,18 @@ pub struct State { pub connections: Arc, pub torrents: Arc, pub statistics: Arc, - pub request_queue: Arc>, - pub response_queue: Arc>, + pub request_queue: Arc>, + pub response_queue: Arc>, } impl State { - pub fn new(config: &Config) -> Self { + pub fn new() -> Self { Self { connections: Arc::new(DashMap::new()), torrents: Arc::new(DashMap::new()), statistics: Arc::new(Statistics::default()), - request_queue: Arc::new(ArrayQueue::new(config.request_queue_len)), - response_queue: Arc::new(ArrayQueue::new(config.response_queue_len)), + request_queue: Arc::new(SegQueue::new()), + response_queue: Arc::new(SegQueue::new()), } } } diff --git a/aquatic/src/lib/config.rs b/aquatic/src/lib/config.rs index 10de5c0..e4f7b97 100644 --- a/aquatic/src/lib/config.rs +++ b/aquatic/src/lib/config.rs @@ -8,8 +8,6 @@ pub struct Config { /// Spawn this number of threads for workers pub socket_workers: usize, pub response_workers: usize, - pub request_queue_len: usize, - pub response_queue_len: usize, pub network: NetworkConfig, pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, @@ -55,8 +53,6 @@ impl Default for Config { Self { socket_workers: 1, response_workers: 1, - request_queue_len: 4096, - response_queue_len: 4096 * 4, network: NetworkConfig::default(), statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 0580d9c..6c147d9 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -88,9 +88,7 @@ pub fn handle_connect_requests( } ); - if let Err(err) = state.response_queue.push((response, src)){ - eprintln!("couldn't push to response queue: {}", err); - } + state.response_queue.push((response, src)); } } @@ -114,9 +112,7 @@ pub fn handle_announce_requests( message: "Connection invalid or expired".to_string() }; - if let Err(err) = state.response_queue.push((response.into(), src)){ - eprintln!("couldn't push to response queue: {}", err); - } + state.response_queue.push((response.into(), src)); } let peer_key = PeerMapKey { @@ -180,9 +176,7 @@ pub fn handle_announce_requests( peers: response_peers }); - if let Err(err) = state.response_queue.push((response, src)){ - eprintln!("couldn't push to response queue: {}", err); - } + state.response_queue.push((response, src)); } } @@ -206,9 +200,7 @@ pub fn handle_scrape_requests( message: "Connection invalid or expired".to_string() }; - if let Err(err) = state.response_queue.push((response.into(), src)){ - eprintln!("couldn't push to response queue: {}", err); - } + state.response_queue.push((response.into(), src)); } let mut stats: Vec = Vec::with_capacity( @@ -231,9 +223,7 @@ pub fn handle_scrape_requests( torrent_stats: stats, }); - if let Err(err) = state.response_queue.push((response, src)){ - eprintln!("couldn't push to response queue: {}", err); - } + state.response_queue.push((response, src)); }; } diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index dce2b70..d9b0aef 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -11,7 +11,7 @@ use common::State; pub fn run(config: Config){ - let state = State::new(&config); + let state = State::new(); for _ in 0..config.response_workers { let state = state.clone(); diff --git a/aquatic/src/lib/network.rs b/aquatic/src/lib/network.rs index 9bd76f3..82cad9e 100644 --- a/aquatic/src/lib/network.rs +++ b/aquatic/src/lib/network.rs @@ -127,11 +127,7 @@ fn read_requests( match request { Ok(request) => { - let res = state.request_queue.push((request, src)); - - if let Err(err) = res { - eprintln!("couldn't push request to queue: {}", err); - } + state.request_queue.push((request, src)); }, Err(err) => { eprintln!("request_from_bytes error: {:?}", err); diff --git a/aquatic_bench/src/bin/bench_handlers/announce.rs b/aquatic_bench/src/bin/bench_handlers/announce.rs index 6a739d5..b88420e 100644 --- a/aquatic_bench/src/bin/bench_handlers/announce.rs +++ b/aquatic_bench/src/bin/bench_handlers/announce.rs @@ -51,7 +51,7 @@ pub fn bench( requests, ); - while let Ok((response, src)) = state.response_queue.pop(){ + while let Ok((response, _)) = state.response_queue.pop(){ if let Response::Announce(_) = response { num_responses += 1; } diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index a596e53..19dea5f 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -97,7 +97,7 @@ fn run(bench_config: BenchConfig){ let pb = create_progress_bar("Connect handler", bench_config.num_rounds); for _ in (0..bench_config.num_rounds).progress_with(pb){ - let state = State::new(&config); + let state = State::new(); let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { let requests = requests.clone(); @@ -157,7 +157,7 @@ fn run(bench_config: BenchConfig){ let mut last_torrents = None; for i in (0..bench_config.num_rounds).progress_with(pb){ - let mut state = State::new(&config); + let mut state = State::new(); state.connections = connections.clone(); @@ -186,7 +186,7 @@ fn run(bench_config: BenchConfig){ // Benchmark scrape handler { - let mut state = State::new(&config); + let mut state = State::new(); state.torrents = last_torrents.unwrap(); let requests = scrape::create_requests(&mut rng, &info_hashes);