Use (unbounded) SegQueue instead of ArrayQueue

This commit is contained in:
Joakim Frostegård 2020-04-11 19:59:52 +02:00
parent 6503e89375
commit f37ba1e52e
7 changed files with 17 additions and 37 deletions

View file

@ -3,14 +3,12 @@ use std::sync::atomic::AtomicUsize;
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use std::time::Instant; use std::time::Instant;
use crossbeam_queue::ArrayQueue; use crossbeam_queue::SegQueue;
use dashmap::DashMap; use dashmap::DashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
pub use bittorrent_udp::types::*; pub use bittorrent_udp::types::*;
use crate::config::Config;
pub const MAX_PACKET_SIZE: usize = 4096; pub const MAX_PACKET_SIZE: usize = 4096;
@ -140,18 +138,18 @@ pub struct State {
pub connections: Arc<ConnectionMap>, pub connections: Arc<ConnectionMap>,
pub torrents: Arc<TorrentMap>, pub torrents: Arc<TorrentMap>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
pub request_queue: Arc<ArrayQueue<(Request, SocketAddr)>>, pub request_queue: Arc<SegQueue<(Request, SocketAddr)>>,
pub response_queue: Arc<ArrayQueue<(Response, SocketAddr)>>, pub response_queue: Arc<SegQueue<(Response, SocketAddr)>>,
} }
impl State { impl State {
pub fn new(config: &Config) -> Self { pub fn new() -> Self {
Self { Self {
connections: Arc::new(DashMap::new()), connections: Arc::new(DashMap::new()),
torrents: Arc::new(DashMap::new()), torrents: Arc::new(DashMap::new()),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
request_queue: Arc::new(ArrayQueue::new(config.request_queue_len)), request_queue: Arc::new(SegQueue::new()),
response_queue: Arc::new(ArrayQueue::new(config.response_queue_len)), response_queue: Arc::new(SegQueue::new()),
} }
} }
} }

View file

@ -8,8 +8,6 @@ pub struct Config {
/// Spawn this number of threads for workers /// Spawn this number of threads for workers
pub socket_workers: usize, pub socket_workers: usize,
pub response_workers: usize, pub response_workers: usize,
pub request_queue_len: usize,
pub response_queue_len: usize,
pub network: NetworkConfig, pub network: NetworkConfig,
pub statistics: StatisticsConfig, pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
@ -55,8 +53,6 @@ impl Default for Config {
Self { Self {
socket_workers: 1, socket_workers: 1,
response_workers: 1, response_workers: 1,
request_queue_len: 4096,
response_queue_len: 4096 * 4,
network: NetworkConfig::default(), network: NetworkConfig::default(),
statistics: StatisticsConfig::default(), statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(), cleaning: CleaningConfig::default(),

View file

@ -88,9 +88,7 @@ pub fn handle_connect_requests(
} }
); );
if let Err(err) = state.response_queue.push((response, src)){ state.response_queue.push((response, src));
eprintln!("couldn't push to response queue: {}", err);
}
} }
} }
@ -114,9 +112,7 @@ pub fn handle_announce_requests(
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
}; };
if let Err(err) = state.response_queue.push((response.into(), src)){ state.response_queue.push((response.into(), src));
eprintln!("couldn't push to response queue: {}", err);
}
} }
let peer_key = PeerMapKey { let peer_key = PeerMapKey {
@ -180,9 +176,7 @@ pub fn handle_announce_requests(
peers: response_peers peers: response_peers
}); });
if let Err(err) = state.response_queue.push((response, src)){ state.response_queue.push((response, src));
eprintln!("couldn't push to response queue: {}", err);
}
} }
} }
@ -206,9 +200,7 @@ pub fn handle_scrape_requests(
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
}; };
if let Err(err) = state.response_queue.push((response.into(), src)){ state.response_queue.push((response.into(), src));
eprintln!("couldn't push to response queue: {}", err);
}
} }
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity( let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(
@ -231,9 +223,7 @@ pub fn handle_scrape_requests(
torrent_stats: stats, torrent_stats: stats,
}); });
if let Err(err) = state.response_queue.push((response, src)){ state.response_queue.push((response, src));
eprintln!("couldn't push to response queue: {}", err);
}
}; };
} }

View file

@ -11,7 +11,7 @@ use common::State;
pub fn run(config: Config){ pub fn run(config: Config){
let state = State::new(&config); let state = State::new();
for _ in 0..config.response_workers { for _ in 0..config.response_workers {
let state = state.clone(); let state = state.clone();

View file

@ -127,11 +127,7 @@ fn read_requests(
match request { match request {
Ok(request) => { Ok(request) => {
let res = state.request_queue.push((request, src)); state.request_queue.push((request, src));
if let Err(err) = res {
eprintln!("couldn't push request to queue: {}", err);
}
}, },
Err(err) => { Err(err) => {
eprintln!("request_from_bytes error: {:?}", err); eprintln!("request_from_bytes error: {:?}", err);

View file

@ -51,7 +51,7 @@ pub fn bench(
requests, requests,
); );
while let Ok((response, src)) = state.response_queue.pop(){ while let Ok((response, _)) = state.response_queue.pop(){
if let Response::Announce(_) = response { if let Response::Announce(_) = response {
num_responses += 1; num_responses += 1;
} }

View file

@ -97,7 +97,7 @@ fn run(bench_config: BenchConfig){
let pb = create_progress_bar("Connect handler", bench_config.num_rounds); let pb = create_progress_bar("Connect handler", bench_config.num_rounds);
for _ in (0..bench_config.num_rounds).progress_with(pb){ for _ in (0..bench_config.num_rounds).progress_with(pb){
let state = State::new(&config); let state = State::new();
let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { let handles: Vec<_> = (0..bench_config.num_threads).map(|_| {
let requests = requests.clone(); let requests = requests.clone();
@ -157,7 +157,7 @@ fn run(bench_config: BenchConfig){
let mut last_torrents = None; let mut last_torrents = None;
for i in (0..bench_config.num_rounds).progress_with(pb){ for i in (0..bench_config.num_rounds).progress_with(pb){
let mut state = State::new(&config); let mut state = State::new();
state.connections = connections.clone(); state.connections = connections.clone();
@ -186,7 +186,7 @@ fn run(bench_config: BenchConfig){
// Benchmark scrape handler // Benchmark scrape handler
{ {
let mut state = State::new(&config); let mut state = State::new();
state.torrents = last_torrents.unwrap(); state.torrents = last_torrents.unwrap();
let requests = scrape::create_requests(&mut rng, &info_hashes); let requests = scrape::create_requests(&mut rng, &info_hashes);