diff --git a/Cargo.lock b/Cargo.lock index 89550ad..9f0cdd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,8 +30,7 @@ version = "0.1.0" dependencies = [ "bittorrent_udp", "cli_helpers", - "crossbeam-queue", - "crossbeam-utils", + "crossbeam-channel", "dashmap", "histogram", "indexmap", @@ -225,13 +224,13 @@ dependencies = [ ] [[package]] -name = "crossbeam-queue" -version = "0.2.1" +name = "crossbeam-channel" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" +checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" dependencies = [ - "cfg-if", "crossbeam-utils", + "maybe-uninit", ] [[package]] @@ -381,6 +380,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.3" diff --git a/TODO.md b/TODO.md index 1500fbb..534cdf8 100644 --- a/TODO.md +++ b/TODO.md @@ -6,8 +6,6 @@ * Generic bench function since current functions are almost identical * Show percentile stats for peers per torrent * aquatic - * Park handler threads when really inactive? Or generally avoid utilizing - CPU needlessly. See https://docs.rs/crossbeam/0.7.3/crossbeam/utils/struct.Backoff.html#method.is_completed * Lock whole torrent map over many requests in handlers? Could use HashMap in Mutex instead of DashMap maybe (parking lot mutex?) * Tests @@ -16,7 +14,6 @@ ones, have to check. * bittorrent_udp * other test cases - * Check if announce response to bytes code changed caused slowdown ## Not important diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index c2d6b37..cedaca3 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -14,8 +14,7 @@ name = "aquatic" [dependencies] bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } -crossbeam-queue = "0.2" -crossbeam-utils = "0.7" +crossbeam-channel = "0.4" dashmap = "3" histogram = "0.6" indexmap = "1" diff --git a/aquatic/src/lib/common.rs b/aquatic/src/lib/common.rs index 52d18df..4ff7273 100644 --- a/aquatic/src/lib/common.rs +++ b/aquatic/src/lib/common.rs @@ -3,7 +3,6 @@ use std::sync::atomic::AtomicUsize; use std::net::{SocketAddr, IpAddr}; use std::time::Instant; -use crossbeam_queue::SegQueue; use dashmap::DashMap; use indexmap::IndexMap; @@ -138,8 +137,6 @@ pub struct State { pub connections: Arc, pub torrents: Arc, pub statistics: Arc, - pub request_queue: Arc>, - pub response_queue: Arc>, } impl State { @@ -148,8 +145,6 @@ impl State { connections: Arc::new(DashMap::new()), torrents: Arc::new(DashMap::new()), statistics: Arc::new(Statistics::default()), - request_queue: Arc::new(SegQueue::new()), - response_queue: Arc::new(SegQueue::new()), } } } diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 6c147d9..37f9887 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -1,10 +1,10 @@ use std::net::SocketAddr; use std::sync::atomic::Ordering; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::vec::Drain; +use crossbeam_channel::{Sender, Receiver}; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; -use crossbeam_utils::Backoff; use bittorrent_udp::types::*; @@ -15,6 +15,8 @@ use crate::config::Config; pub fn handle( state: State, config: Config, + request_receiver: Receiver<(Request, SocketAddr)>, + response_sender: Sender<(Response, SocketAddr)>, ){ let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); @@ -23,42 +25,53 @@ pub fn handle( let mut std_rng = StdRng::from_entropy(); let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); - let backoff = Backoff::new(); + let timeout = Duration::from_millis(10); loop { - if state.request_queue.is_empty(){ - backoff.snooze(); - } else { - while let Ok((request, src)) = state.request_queue.pop(){ - match request { - Request::Connect(r) => { - connect_requests.push((r, src)) - }, - Request::Announce(r) => { - announce_requests.push((r, src)) - }, - Request::Scrape(r) => { - scrape_requests.push((r, src)) - }, + for i in 0..1000 { + let (request, src): (Request, SocketAddr) = if i == 0 { + match request_receiver.recv(){ + Ok(r) => r, + Err(_) => break, } - } + } else { + match request_receiver.recv_timeout(timeout){ + Ok(r) => r, + Err(_) => break, + } + }; - handle_connect_requests( - &state, - &mut std_rng, - connect_requests.drain(..) - ); - handle_announce_requests( - &state, - &config, - &mut small_rng, - announce_requests.drain(..), - ); - handle_scrape_requests( - &state, - scrape_requests.drain(..), - ); + match request { + Request::Connect(r) => { + connect_requests.push((r, src)) + }, + Request::Announce(r) => { + announce_requests.push((r, src)) + }, + Request::Scrape(r) => { + scrape_requests.push((r, src)) + }, + } } + + handle_connect_requests( + &state, + &mut std_rng, + connect_requests.drain(..), + &response_sender + ); + handle_announce_requests( + &state, + &config, + &mut small_rng, + announce_requests.drain(..), + &response_sender + ); + handle_scrape_requests( + &state, + scrape_requests.drain(..), + &response_sender + ); } } @@ -68,6 +81,7 @@ pub fn handle_connect_requests( state: &State, rng: &mut StdRng, requests: Drain<(ConnectRequest, SocketAddr)>, + response_sender: &Sender<(Response, SocketAddr)>, ){ let now = Time(Instant::now()); @@ -88,7 +102,7 @@ pub fn handle_connect_requests( } ); - state.response_queue.push((response, src)); + response_sender.send((response, src)); } } @@ -99,6 +113,7 @@ pub fn handle_announce_requests( config: &Config, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, + response_sender: &Sender<(Response, SocketAddr)>, ){ for (request, src) in requests { let connection_key = ConnectionKey { @@ -112,7 +127,7 @@ pub fn handle_announce_requests( message: "Connection invalid or expired".to_string() }; - state.response_queue.push((response.into(), src)); + response_sender.send((response.into(), src)); } let peer_key = PeerMapKey { @@ -176,7 +191,7 @@ pub fn handle_announce_requests( peers: response_peers }); - state.response_queue.push((response, src)); + response_sender.send((response, src)); } } @@ -185,6 +200,7 @@ pub fn handle_announce_requests( pub fn handle_scrape_requests( state: &State, requests: Drain<(ScrapeRequest, SocketAddr)>, + response_sender: &Sender<(Response, SocketAddr)>, ){ let empty_stats = create_torrent_scrape_statistics(0, 0); @@ -200,7 +216,7 @@ pub fn handle_scrape_requests( message: "Connection invalid or expired".to_string() }; - state.response_queue.push((response.into(), src)); + response_sender.send((response.into(), src)); } let mut stats: Vec = Vec::with_capacity( @@ -223,7 +239,7 @@ pub fn handle_scrape_requests( torrent_stats: stats, }); - state.response_queue.push((response, src)); + response_sender.send((response, src)); }; } diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index d9b0aef..21f6e12 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -1,5 +1,7 @@ use std::time::Duration; +use crossbeam_channel::unbounded; + pub mod common; pub mod config; pub mod handlers; @@ -13,21 +15,28 @@ use common::State; pub fn run(config: Config){ let state = State::new(); + let (request_sender, request_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + for _ in 0..config.response_workers { let state = state.clone(); let config = config.clone(); + let request_receiver = request_receiver.clone(); + let response_sender = response_sender.clone(); ::std::thread::spawn(move || { - handlers::handle(state, config); + handlers::handle(state, config, request_receiver, response_sender); }); } for i in 0..config.socket_workers { let state = state.clone(); let config = config.clone(); + let request_sender = request_sender.clone(); + let response_receiver = response_receiver.clone(); ::std::thread::spawn(move || { - network::run_event_loop(state, config, i); + network::run_event_loop(state, config, i, request_sender, response_receiver); }); } diff --git a/aquatic/src/lib/network.rs b/aquatic/src/lib/network.rs index 82cad9e..24c428a 100644 --- a/aquatic/src/lib/network.rs +++ b/aquatic/src/lib/network.rs @@ -1,7 +1,9 @@ use std::sync::atomic::Ordering; use std::io::{Cursor, ErrorKind}; +use std::net::SocketAddr; use std::time::Duration; +use crossbeam_channel::{Sender, Receiver}; use mio::{Events, Poll, Interest, Token}; use mio::net::UdpSocket; use net2::{UdpSocketExt, UdpBuilder}; @@ -18,6 +20,8 @@ pub fn run_event_loop( state: State, config: Config, token_num: usize, + request_sender: Sender<(Request, SocketAddr)>, + response_receiver: Receiver<(Response, SocketAddr)>, ){ let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -32,7 +36,7 @@ pub fn run_event_loop( let mut events = Events::with_capacity(config.network.poll_event_capacity); - let timeout = Duration::from_millis(1); + let timeout = Duration::from_millis(50); loop { poll.poll(&mut events, Some(timeout)) @@ -48,6 +52,7 @@ pub fn run_event_loop( &config, &mut socket, &mut buffer, + &request_sender, ); state.statistics.readable_events.fetch_add(1, Ordering::SeqCst); @@ -64,6 +69,7 @@ pub fn run_event_loop( &config, &mut socket, &mut buffer, + &response_receiver, ); } } @@ -107,6 +113,7 @@ fn read_requests( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], + request_sender: &Sender<(Request, SocketAddr)>, ){ let mut requests_received: usize = 0; let mut bytes_received: usize = 0; @@ -127,7 +134,7 @@ fn read_requests( match request { Ok(request) => { - state.request_queue.push((request, src)); + request_sender.try_send((request, src)); }, Err(err) => { eprintln!("request_from_bytes error: {:?}", err); @@ -178,13 +185,14 @@ fn send_responses( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], + response_receiver: &Receiver<(Response, SocketAddr)>, ){ let mut responses_sent: usize = 0; let mut bytes_sent: usize = 0; let mut cursor = Cursor::new(buffer); - while let Ok((response, src)) = state.response_queue.pop(){ + for (response, src) in response_receiver.try_iter(){ cursor.set_position(0); response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap();