Use crossbeam channel insteaf of SegQueue

Reduces overutilization of CPU greatly
This commit is contained in:
Joakim Frostegård 2020-04-11 21:11:25 +02:00
parent f37ba1e52e
commit 70cc193522
7 changed files with 88 additions and 59 deletions

17
Cargo.lock generated
View file

@ -30,8 +30,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
"crossbeam-queue", "crossbeam-channel",
"crossbeam-utils",
"dashmap", "dashmap",
"histogram", "histogram",
"indexmap", "indexmap",
@ -225,13 +224,13 @@ dependencies = [
] ]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-channel"
version = "0.2.1" version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
dependencies = [ dependencies = [
"cfg-if",
"crossbeam-utils", "crossbeam-utils",
"maybe-uninit",
] ]
[[package]] [[package]]
@ -381,6 +380,12 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.3.3" version = "2.3.3"

View file

@ -6,8 +6,6 @@
* Generic bench function since current functions are almost identical * Generic bench function since current functions are almost identical
* Show percentile stats for peers per torrent * Show percentile stats for peers per torrent
* aquatic * aquatic
* Park handler threads when really inactive? Or generally avoid utilizing
CPU needlessly. See https://docs.rs/crossbeam/0.7.3/crossbeam/utils/struct.Backoff.html#method.is_completed
* Lock whole torrent map over many requests in handlers? Could use HashMap * Lock whole torrent map over many requests in handlers? Could use HashMap
in Mutex instead of DashMap maybe (parking lot mutex?) in Mutex instead of DashMap maybe (parking lot mutex?)
* Tests * Tests
@ -16,7 +14,6 @@
ones, have to check. ones, have to check.
* bittorrent_udp * bittorrent_udp
* other test cases * other test cases
* Check if announce response to bytes code changed caused slowdown
## Not important ## Not important

View file

@ -14,8 +14,7 @@ name = "aquatic"
[dependencies] [dependencies]
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
crossbeam-queue = "0.2" crossbeam-channel = "0.4"
crossbeam-utils = "0.7"
dashmap = "3" dashmap = "3"
histogram = "0.6" histogram = "0.6"
indexmap = "1" indexmap = "1"

View file

@ -3,7 +3,6 @@ use std::sync::atomic::AtomicUsize;
use std::net::{SocketAddr, IpAddr}; use std::net::{SocketAddr, IpAddr};
use std::time::Instant; use std::time::Instant;
use crossbeam_queue::SegQueue;
use dashmap::DashMap; use dashmap::DashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
@ -138,8 +137,6 @@ pub struct State {
pub connections: Arc<ConnectionMap>, pub connections: Arc<ConnectionMap>,
pub torrents: Arc<TorrentMap>, pub torrents: Arc<TorrentMap>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
pub request_queue: Arc<SegQueue<(Request, SocketAddr)>>,
pub response_queue: Arc<SegQueue<(Response, SocketAddr)>>,
} }
impl State { impl State {
@ -148,8 +145,6 @@ impl State {
connections: Arc::new(DashMap::new()), connections: Arc::new(DashMap::new()),
torrents: Arc::new(DashMap::new()), torrents: Arc::new(DashMap::new()),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
request_queue: Arc::new(SegQueue::new()),
response_queue: Arc::new(SegQueue::new()),
} }
} }
} }

View file

@ -1,10 +1,10 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Instant; use std::time::{Duration, Instant};
use std::vec::Drain; use std::vec::Drain;
use crossbeam_channel::{Sender, Receiver};
use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}};
use crossbeam_utils::Backoff;
use bittorrent_udp::types::*; use bittorrent_udp::types::*;
@ -15,6 +15,8 @@ use crate::config::Config;
pub fn handle( pub fn handle(
state: State, state: State,
config: Config, config: Config,
request_receiver: Receiver<(Request, SocketAddr)>,
response_sender: Sender<(Response, SocketAddr)>,
){ ){
let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new();
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
@ -23,42 +25,53 @@ pub fn handle(
let mut std_rng = StdRng::from_entropy(); let mut std_rng = StdRng::from_entropy();
let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap();
let backoff = Backoff::new(); let timeout = Duration::from_millis(10);
loop { loop {
if state.request_queue.is_empty(){ for i in 0..1000 {
backoff.snooze(); let (request, src): (Request, SocketAddr) = if i == 0 {
} else { match request_receiver.recv(){
while let Ok((request, src)) = state.request_queue.pop(){ Ok(r) => r,
match request { Err(_) => break,
Request::Connect(r) => {
connect_requests.push((r, src))
},
Request::Announce(r) => {
announce_requests.push((r, src))
},
Request::Scrape(r) => {
scrape_requests.push((r, src))
},
} }
} } else {
match request_receiver.recv_timeout(timeout){
Ok(r) => r,
Err(_) => break,
}
};
handle_connect_requests( match request {
&state, Request::Connect(r) => {
&mut std_rng, connect_requests.push((r, src))
connect_requests.drain(..) },
); Request::Announce(r) => {
handle_announce_requests( announce_requests.push((r, src))
&state, },
&config, Request::Scrape(r) => {
&mut small_rng, scrape_requests.push((r, src))
announce_requests.drain(..), },
); }
handle_scrape_requests(
&state,
scrape_requests.drain(..),
);
} }
handle_connect_requests(
&state,
&mut std_rng,
connect_requests.drain(..),
&response_sender
);
handle_announce_requests(
&state,
&config,
&mut small_rng,
announce_requests.drain(..),
&response_sender
);
handle_scrape_requests(
&state,
scrape_requests.drain(..),
&response_sender
);
} }
} }
@ -68,6 +81,7 @@ pub fn handle_connect_requests(
state: &State, state: &State,
rng: &mut StdRng, rng: &mut StdRng,
requests: Drain<(ConnectRequest, SocketAddr)>, requests: Drain<(ConnectRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>,
){ ){
let now = Time(Instant::now()); let now = Time(Instant::now());
@ -88,7 +102,7 @@ pub fn handle_connect_requests(
} }
); );
state.response_queue.push((response, src)); response_sender.send((response, src));
} }
} }
@ -99,6 +113,7 @@ pub fn handle_announce_requests(
config: &Config, config: &Config,
rng: &mut SmallRng, rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>,
){ ){
for (request, src) in requests { for (request, src) in requests {
let connection_key = ConnectionKey { let connection_key = ConnectionKey {
@ -112,7 +127,7 @@ pub fn handle_announce_requests(
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
}; };
state.response_queue.push((response.into(), src)); response_sender.send((response.into(), src));
} }
let peer_key = PeerMapKey { let peer_key = PeerMapKey {
@ -176,7 +191,7 @@ pub fn handle_announce_requests(
peers: response_peers peers: response_peers
}); });
state.response_queue.push((response, src)); response_sender.send((response, src));
} }
} }
@ -185,6 +200,7 @@ pub fn handle_announce_requests(
pub fn handle_scrape_requests( pub fn handle_scrape_requests(
state: &State, state: &State,
requests: Drain<(ScrapeRequest, SocketAddr)>, requests: Drain<(ScrapeRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>,
){ ){
let empty_stats = create_torrent_scrape_statistics(0, 0); let empty_stats = create_torrent_scrape_statistics(0, 0);
@ -200,7 +216,7 @@ pub fn handle_scrape_requests(
message: "Connection invalid or expired".to_string() message: "Connection invalid or expired".to_string()
}; };
state.response_queue.push((response.into(), src)); response_sender.send((response.into(), src));
} }
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity( let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(
@ -223,7 +239,7 @@ pub fn handle_scrape_requests(
torrent_stats: stats, torrent_stats: stats,
}); });
state.response_queue.push((response, src)); response_sender.send((response, src));
}; };
} }

View file

@ -1,5 +1,7 @@
use std::time::Duration; use std::time::Duration;
use crossbeam_channel::unbounded;
pub mod common; pub mod common;
pub mod config; pub mod config;
pub mod handlers; pub mod handlers;
@ -13,21 +15,28 @@ use common::State;
pub fn run(config: Config){ pub fn run(config: Config){
let state = State::new(); let state = State::new();
let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();
for _ in 0..config.response_workers { for _ in 0..config.response_workers {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
::std::thread::spawn(move || { ::std::thread::spawn(move || {
handlers::handle(state, config); handlers::handle(state, config, request_receiver, response_sender);
}); });
} }
for i in 0..config.socket_workers { for i in 0..config.socket_workers {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
let request_sender = request_sender.clone();
let response_receiver = response_receiver.clone();
::std::thread::spawn(move || { ::std::thread::spawn(move || {
network::run_event_loop(state, config, i); network::run_event_loop(state, config, i, request_sender, response_receiver);
}); });
} }

View file

@ -1,7 +1,9 @@
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::io::{Cursor, ErrorKind}; use std::io::{Cursor, ErrorKind};
use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use crossbeam_channel::{Sender, Receiver};
use mio::{Events, Poll, Interest, Token}; use mio::{Events, Poll, Interest, Token};
use mio::net::UdpSocket; use mio::net::UdpSocket;
use net2::{UdpSocketExt, UdpBuilder}; use net2::{UdpSocketExt, UdpBuilder};
@ -18,6 +20,8 @@ pub fn run_event_loop(
state: State, state: State,
config: Config, config: Config,
token_num: usize, token_num: usize,
request_sender: Sender<(Request, SocketAddr)>,
response_receiver: Receiver<(Response, SocketAddr)>,
){ ){
let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut buffer = [0u8; MAX_PACKET_SIZE];
@ -32,7 +36,7 @@ pub fn run_event_loop(
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
let timeout = Duration::from_millis(1); let timeout = Duration::from_millis(50);
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
@ -48,6 +52,7 @@ pub fn run_event_loop(
&config, &config,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
&request_sender,
); );
state.statistics.readable_events.fetch_add(1, Ordering::SeqCst); state.statistics.readable_events.fetch_add(1, Ordering::SeqCst);
@ -64,6 +69,7 @@ pub fn run_event_loop(
&config, &config,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
&response_receiver,
); );
} }
} }
@ -107,6 +113,7 @@ fn read_requests(
config: &Config, config: &Config,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
request_sender: &Sender<(Request, SocketAddr)>,
){ ){
let mut requests_received: usize = 0; let mut requests_received: usize = 0;
let mut bytes_received: usize = 0; let mut bytes_received: usize = 0;
@ -127,7 +134,7 @@ fn read_requests(
match request { match request {
Ok(request) => { Ok(request) => {
state.request_queue.push((request, src)); request_sender.try_send((request, src));
}, },
Err(err) => { Err(err) => {
eprintln!("request_from_bytes error: {:?}", err); eprintln!("request_from_bytes error: {:?}", err);
@ -178,13 +185,14 @@ fn send_responses(
config: &Config, config: &Config,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
response_receiver: &Receiver<(Response, SocketAddr)>,
){ ){
let mut responses_sent: usize = 0; let mut responses_sent: usize = 0;
let mut bytes_sent: usize = 0; let mut bytes_sent: usize = 0;
let mut cursor = Cursor::new(buffer); let mut cursor = Cursor::new(buffer);
while let Ok((response, src)) = state.response_queue.pop(){ for (response, src) in response_receiver.try_iter(){
cursor.set_position(0); cursor.set_position(0);
response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap(); response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap();