diff --git a/Cargo.lock b/Cargo.lock index e3b44bb..89550ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,8 @@ version = "0.1.0" dependencies = [ "bittorrent_udp", "cli_helpers", + "crossbeam-queue", + "crossbeam-utils", "dashmap", "histogram", "indexmap", @@ -222,6 +224,27 @@ dependencies = [ "proc-macro-hack", ] +[[package]] +name = "crossbeam-queue" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + [[package]] name = "dashmap" version = "3.10.0" diff --git a/TODO.md b/TODO.md index 34e40fb..a6f8573 100644 --- a/TODO.md +++ b/TODO.md @@ -5,6 +5,7 @@ * Generic bench function since current functions are almost identical * Show percentile stats for peers per torrent * aquatic + * https://docs.rs/crossbeam/0.7.3/crossbeam/utils/struct.Backoff.html#method.is_completed * Tests * Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4 diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index 4a1ec85..c2d6b37 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -14,6 +14,8 @@ name = "aquatic" [dependencies] bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } +crossbeam-queue = "0.2" +crossbeam-utils = "0.7" dashmap = "3" histogram = "0.6" indexmap = "1" diff --git a/aquatic/src/lib/common.rs b/aquatic/src/lib/common.rs index 4ff7273..917a128 100644 --- a/aquatic/src/lib/common.rs +++ b/aquatic/src/lib/common.rs @@ -3,11 +3,14 @@ use std::sync::atomic::AtomicUsize; use std::net::{SocketAddr, IpAddr}; use std::time::Instant; +use crossbeam_queue::ArrayQueue; use dashmap::DashMap; use indexmap::IndexMap; pub use bittorrent_udp::types::*; +use crate::config::Config; + pub const MAX_PACKET_SIZE: usize = 4096; @@ -137,14 +140,18 @@ pub struct State { pub connections: Arc, pub torrents: Arc, pub statistics: Arc, + pub request_queue: Arc>, + pub response_queue: Arc>, } impl State { - pub fn new() -> Self { + pub fn new(config: &Config) -> Self { Self { connections: Arc::new(DashMap::new()), torrents: Arc::new(DashMap::new()), statistics: Arc::new(Statistics::default()), + request_queue: Arc::new(ArrayQueue::new(config.request_queue_len)), + response_queue: Arc::new(ArrayQueue::new(config.response_queue_len)), } } } diff --git a/aquatic/src/lib/config.rs b/aquatic/src/lib/config.rs index c6fe9f4..10de5c0 100644 --- a/aquatic/src/lib/config.rs +++ b/aquatic/src/lib/config.rs @@ -6,7 +6,10 @@ use serde::{Serialize, Deserialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Config { /// Spawn this number of threads for workers - pub num_threads: usize, + pub socket_workers: usize, + pub response_workers: usize, + pub request_queue_len: usize, + pub response_queue_len: usize, pub network: NetworkConfig, pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, @@ -50,7 +53,10 @@ pub struct CleaningConfig { impl Default for Config { fn default() -> Self { Self { - num_threads: 4, + socket_workers: 1, + response_workers: 1, + request_queue_len: 4096, + response_queue_len: 4096 * 4, network: NetworkConfig::default(), statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 6b38ad0..0580d9c 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -3,7 +3,8 @@ use std::sync::atomic::Ordering; use std::time::Instant; use std::vec::Drain; -use rand::{Rng, rngs::{SmallRng, StdRng}}; +use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; +use crossbeam_utils::Backoff; use bittorrent_udp::types::*; @@ -11,16 +12,66 @@ use crate::common::*; use crate::config::Config; +pub fn handle( + state: State, + config: Config, +){ + let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); + let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); + let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); + + let mut std_rng = StdRng::from_entropy(); + let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); + + let backoff = Backoff::new(); + + loop { + if state.request_queue.is_empty(){ + backoff.snooze(); + } else { + while let Ok((request, src)) = state.request_queue.pop(){ + match request { + Request::Connect(r) => { + connect_requests.push((r, src)) + }, + Request::Announce(r) => { + announce_requests.push((r, src)) + }, + Request::Scrape(r) => { + scrape_requests.push((r, src)) + }, + } + } + + handle_connect_requests( + &state, + &mut std_rng, + connect_requests.drain(..) + ); + handle_announce_requests( + &state, + &config, + &mut small_rng, + announce_requests.drain(..), + ); + handle_scrape_requests( + &state, + scrape_requests.drain(..), + ); + } + } +} + + #[inline] pub fn handle_connect_requests( state: &State, rng: &mut StdRng, - responses: &mut Vec<(Response, SocketAddr)>, requests: Drain<(ConnectRequest, SocketAddr)>, ){ let now = Time(Instant::now()); - responses.extend(requests.map(|(request, src)| { + for (request, src) in requests { let connection_id = ConnectionId(rng.gen()); let key = ConnectionKey { @@ -37,8 +88,10 @@ pub fn handle_connect_requests( } ); - (response, src) - })); + if let Err(err) = state.response_queue.push((response, src)){ + eprintln!("couldn't push to response queue: {}", err); + } + } } @@ -47,20 +100,23 @@ pub fn handle_announce_requests( state: &State, config: &Config, rng: &mut SmallRng, - responses: &mut Vec<(Response, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>, ){ - responses.extend(requests.map(|(request, src)| { + for (request, src) in requests { let connection_key = ConnectionKey { connection_id: request.connection_id, socket_addr: src, }; if !state.connections.contains_key(&connection_key){ - return ((ErrorResponse { + let response = ErrorResponse { transaction_id: request.transaction_id, message: "Connection invalid or expired".to_string() - }).into(), src); + }; + + if let Err(err) = state.response_queue.push((response.into(), src)){ + eprintln!("couldn't push to response queue: {}", err); + } } let peer_key = PeerMapKey { @@ -124,30 +180,35 @@ pub fn handle_announce_requests( peers: response_peers }); - (response, src) - })); + if let Err(err) = state.response_queue.push((response, src)){ + eprintln!("couldn't push to response queue: {}", err); + } + } } #[inline] pub fn handle_scrape_requests( state: &State, - responses: &mut Vec<(Response, SocketAddr)>, requests: Drain<(ScrapeRequest, SocketAddr)>, ){ let empty_stats = create_torrent_scrape_statistics(0, 0); - responses.extend(requests.map(|(request, src)| { + for (request, src) in requests { let connection_key = ConnectionKey { connection_id: request.connection_id, socket_addr: src, }; if !state.connections.contains_key(&connection_key){ - return ((ErrorResponse { + let response = ErrorResponse { transaction_id: request.transaction_id, message: "Connection invalid or expired".to_string() - }).into(), src); + }; + + if let Err(err) = state.response_queue.push((response.into(), src)){ + eprintln!("couldn't push to response queue: {}", err); + } } let mut stats: Vec = Vec::with_capacity( @@ -170,8 +231,10 @@ pub fn handle_scrape_requests( torrent_stats: stats, }); - (response, src) - })); + if let Err(err) = state.response_queue.push((response, src)){ + eprintln!("couldn't push to response queue: {}", err); + } + }; } diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index 3175388..dce2b70 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -11,9 +11,18 @@ use common::State; pub fn run(config: Config){ - let state = State::new(); + let state = State::new(&config); - for i in 0..config.num_threads { + for _ in 0..config.response_workers { + let state = state.clone(); + let config = config.clone(); + + ::std::thread::spawn(move || { + handlers::handle(state, config); + }); + } + + for i in 0..config.socket_workers { let state = state.clone(); let config = config.clone(); diff --git a/aquatic/src/lib/network.rs b/aquatic/src/lib/network.rs index 7942f34..9bd76f3 100644 --- a/aquatic/src/lib/network.rs +++ b/aquatic/src/lib/network.rs @@ -1,19 +1,17 @@ use std::sync::atomic::Ordering; -use std::net::SocketAddr; use std::io::{Cursor, ErrorKind}; +use std::time::Duration; use mio::{Events, Poll, Interest, Token}; use mio::net::UdpSocket; use net2::{UdpSocketExt, UdpBuilder}; use net2::unix::UnixUdpBuilderExt; -use rand::{SeedableRng, rngs::{SmallRng, StdRng}}; use bittorrent_udp::types::IpVersion; use bittorrent_udp::converters::{response_to_bytes, request_from_bytes}; use crate::common::*; use crate::config::Config; -use crate::handlers::*; pub fn run_event_loop( @@ -34,16 +32,10 @@ pub fn run_event_loop( let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); - let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); - let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); - - let mut std_rng = StdRng::from_entropy(); - let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); + let timeout = Duration::from_millis(1); loop { - poll.poll(&mut events, None) + poll.poll(&mut events, Some(timeout)) .expect("failed polling"); for event in events.iter(){ @@ -51,17 +43,11 @@ pub fn run_event_loop( if token.0 == token_num { if event.is_readable(){ - handle_readable_socket( + read_requests( &state, &config, &mut socket, - &mut std_rng, - &mut small_rng, &mut buffer, - &mut responses, - &mut connect_requests, - &mut announce_requests, - &mut scrape_requests ); state.statistics.readable_events.fetch_add(1, Ordering::SeqCst); @@ -72,6 +58,13 @@ pub fn run_event_loop( } } } + + send_responses( + &state, + &config, + &mut socket, + &mut buffer, + ); } } @@ -108,24 +101,15 @@ fn create_socket(config: &Config) -> ::std::net::UdpSocket { } -/// Read requests, generate and send back responses #[inline] -fn handle_readable_socket( +fn read_requests( state: &State, config: &Config, socket: &mut UdpSocket, - std_rng: &mut StdRng, - small_rng: &mut SmallRng, buffer: &mut [u8], - responses: &mut Vec<(Response, SocketAddr)>, - connect_requests: &mut Vec<(ConnectRequest, SocketAddr)>, - announce_requests: &mut Vec<(AnnounceRequest, SocketAddr)>, - scrape_requests: &mut Vec<(ScrapeRequest, SocketAddr)>, ){ let mut requests_received: usize = 0; - let mut responses_sent: usize = 0; let mut bytes_received: usize = 0; - let mut bytes_sent: usize = 0; loop { match socket.recv_from(&mut buffer[..]) { @@ -142,14 +126,12 @@ fn handle_readable_socket( } match request { - Ok(Request::Connect(r)) => { - connect_requests.push((r, src)); - }, - Ok(Request::Announce(r)) => { - announce_requests.push((r, src)); - }, - Ok(Request::Scrape(r)) => { - scrape_requests.push((r, src)); + Ok(request) => { + let res = state.request_queue.push((request, src)); + + if let Err(err) = res { + eprintln!("couldn't push request to queue: {}", err); + } }, Err(err) => { eprintln!("request_from_bytes error: {:?}", err); @@ -169,7 +151,7 @@ fn handle_readable_socket( message, }; - responses.push((response.into(), src)); + // responses.push((response.into(), src)); // FIXME } } }, @@ -185,28 +167,28 @@ fn handle_readable_socket( } } - handle_connect_requests( - state, - std_rng, - responses, - connect_requests.drain(..) - ); - handle_announce_requests( - state, - config, - small_rng, - responses, - announce_requests.drain(..), - ); - handle_scrape_requests( - state, - responses, - scrape_requests.drain(..), - ); + if config.statistics.interval != 0 { + state.statistics.requests_received + .fetch_add(requests_received, Ordering::SeqCst); + state.statistics.bytes_received + .fetch_add(bytes_received, Ordering::SeqCst); + } +} + + +#[inline] +fn send_responses( + state: &State, + config: &Config, + socket: &mut UdpSocket, + buffer: &mut [u8], +){ + let mut responses_sent: usize = 0; + let mut bytes_sent: usize = 0; let mut cursor = Cursor::new(buffer); - for (response, src) in responses.drain(..) { + while let Ok((response, src)) = state.response_queue.pop(){ cursor.set_position(0); response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap(); @@ -229,12 +211,8 @@ fn handle_readable_socket( } if config.statistics.interval != 0 { - state.statistics.requests_received - .fetch_add(requests_received, Ordering::SeqCst); state.statistics.responses_sent .fetch_add(responses_sent, Ordering::SeqCst); - state.statistics.bytes_received - .fetch_add(bytes_received, Ordering::SeqCst); state.statistics.bytes_sent .fetch_add(bytes_sent, Ordering::SeqCst); } diff --git a/aquatic_bench/src/bin/bench_handlers/announce.rs b/aquatic_bench/src/bin/bench_handlers/announce.rs index f1ec56e..6a739d5 100644 --- a/aquatic_bench/src/bin/bench_handlers/announce.rs +++ b/aquatic_bench/src/bin/bench_handlers/announce.rs @@ -23,8 +23,6 @@ pub fn bench( config: &Config, requests: Arc> ) -> (usize, Duration){ - let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); - let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer.as_mut()); let mut num_responses: usize = 0; @@ -50,11 +48,10 @@ pub fn bench( &state, config, &mut small_rng, - &mut responses, requests, ); - for (response, _) in responses.drain(..) { + while let Ok((response, src)) = state.response_queue.pop(){ if let Response::Announce(_) = response { num_responses += 1; } diff --git a/aquatic_bench/src/bin/bench_handlers/connect.rs b/aquatic_bench/src/bin/bench_handlers/connect.rs index ffd11d4..4be414e 100644 --- a/aquatic_bench/src/bin/bench_handlers/connect.rs +++ b/aquatic_bench/src/bin/bench_handlers/connect.rs @@ -19,8 +19,6 @@ pub fn bench( state: State, requests: Arc> ) -> (usize, Duration){ - let mut responses = Vec::with_capacity(ITERATIONS); - let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer.as_mut()); let mut num_responses: usize = 0; @@ -42,9 +40,9 @@ pub fn bench( let requests = requests.drain(..); - handle_connect_requests(&state, &mut rng, &mut responses, requests); + handle_connect_requests(&state, &mut rng, requests); - for (response, _) in responses.drain(..){ + while let Ok((response, _)) = state.response_queue.pop(){ if let Response::Connect(_) = response { num_responses += 1; } diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index 9ce33ae..a596e53 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -73,6 +73,8 @@ fn run(bench_config: BenchConfig){ let mut announce_data = (0usize, Duration::new(0, 0)); let mut scrape_data = (0usize, Duration::new(0, 0)); + let config = Config::default(); + println!("# Benchmarking request handlers\n"); // Benchmark connect handler @@ -95,7 +97,7 @@ fn run(bench_config: BenchConfig){ let pb = create_progress_bar("Connect handler", bench_config.num_rounds); for _ in (0..bench_config.num_rounds).progress_with(pb){ - let state = State::new(); + let state = State::new(&config); let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { let requests = requests.clone(); @@ -115,7 +117,6 @@ fn run(bench_config: BenchConfig){ let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); let info_hashes = create_info_hashes(&mut rng); - let config = Config::default(); // Benchmark announce handler let last_torrents: Option> = { @@ -156,7 +157,7 @@ fn run(bench_config: BenchConfig){ let mut last_torrents = None; for i in (0..bench_config.num_rounds).progress_with(pb){ - let mut state = State::new(); + let mut state = State::new(&config); state.connections = connections.clone(); @@ -185,7 +186,7 @@ fn run(bench_config: BenchConfig){ // Benchmark scrape handler { - let mut state = State::new(); + let mut state = State::new(&config); state.torrents = last_torrents.unwrap(); let requests = scrape::create_requests(&mut rng, &info_hashes); diff --git a/aquatic_bench/src/bin/bench_handlers/scrape.rs b/aquatic_bench/src/bin/bench_handlers/scrape.rs index c485cd4..8a85b9b 100644 --- a/aquatic_bench/src/bin/bench_handlers/scrape.rs +++ b/aquatic_bench/src/bin/bench_handlers/scrape.rs @@ -22,8 +22,6 @@ pub fn bench( state: &State, requests: Arc> ) -> (usize, Duration){ - let mut responses = Vec::with_capacity(SCRAPE_REQUESTS); - let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut cursor = Cursor::new(buffer.as_mut()); let mut num_responses: usize = 0; @@ -45,11 +43,10 @@ pub fn bench( handle_scrape_requests( &state, - &mut responses, requests, ); - for (response, _src) in responses.drain(..){ + while let Ok((response, _)) = state.response_queue.pop(){ if let Response::Scrape(_) = response { num_responses += 1; }