From 8d7cbb7926a3cbb6a4b0468b17f45cb9b19e5c13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 11 Apr 2020 17:32:05 +0200 Subject: [PATCH] aquatic_bench: test multiple threads at once, with disappointing results DashMap doesn't scale as well as I had hoped. Only the scrape handler performed somewhat better with more threads, since it doesn't exlusively lock the torrent map. The announce and connect handlers, however, gained barely nothing from more threads. --- Cargo.lock | 1 + TODO.md | 4 +- aquatic_bench/Cargo.toml | 1 + .../src/bin/bench_handlers/announce.rs | 18 +- .../src/bin/bench_handlers/config.rs | 2 + .../src/bin/bench_handlers/connect.rs | 39 +---- aquatic_bench/src/bin/bench_handlers/main.rs | 154 ++++++++++-------- .../src/bin/bench_handlers/scrape.rs | 18 +- 8 files changed, 122 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e00d467..e3b44bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,7 @@ dependencies = [ "aquatic", "bittorrent_udp", "cli_helpers", + "dashmap", "indicatif", "mimalloc", "num-format", diff --git a/TODO.md b/TODO.md index 198c205..34e40fb 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,6 @@ # TODO * Benchmarks - * Run multiple threads to test performance when contested? * Iterate over whole returned buffer and run e.g. xor on it (.iter().fold()) * Generic bench function since current functions are almost identical * Show percentile stats for peers per torrent @@ -30,6 +29,9 @@ * mialloc good? * Use less bytes from PeerId for hashing? Would need to implement "faulty" PartialEq too (on PeerMapKey, which would be OK) + * Seperate dashmap for torrent stats, meaning there will be no + contention across announce and scrape requests. Will however + require two lookups in announce requests. Do this? * bittorrent_udp * ParseError enum maybe, with `Option` * Avoid heap allocation in general if it can be avoided? diff --git a/aquatic_bench/Cargo.toml b/aquatic_bench/Cargo.toml index ddf376d..b20ccce 100644 --- a/aquatic_bench/Cargo.toml +++ b/aquatic_bench/Cargo.toml @@ -18,6 +18,7 @@ name = "plot_pareto" aquatic = { path = "../aquatic" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } +dashmap = "3" indicatif = "0.14" mimalloc = { version = "0.1", default-features = false } num-format = "0.4" diff --git a/aquatic_bench/src/bin/bench_handlers/announce.rs b/aquatic_bench/src/bin/bench_handlers/announce.rs index 6c42a94..f1ec56e 100644 --- a/aquatic_bench/src/bin/bench_handlers/announce.rs +++ b/aquatic_bench/src/bin/bench_handlers/announce.rs @@ -1,6 +1,7 @@ use std::io::Cursor; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::net::SocketAddr; +use std::sync::Arc; use rand::{Rng, SeedableRng, thread_rng, rngs::SmallRng}; use rand_distr::Pareto; @@ -20,8 +21,8 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000; pub fn bench( state: &State, config: &Config, - requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> -) -> (f64, f64) { + requests: Arc> +) -> (usize, Duration){ let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -33,10 +34,10 @@ pub fn bench( let now = Instant::now(); - let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.into_iter() + let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.iter() .map(|(request_bytes, src)| { - if let Request::Announce(r) = request_from_bytes(&request_bytes, 255).unwrap() { - (r, src) + if let Request::Announce(r) = request_from_bytes(request_bytes, 255).unwrap() { + (r, *src) } else { unreachable!() } @@ -67,16 +68,13 @@ pub fn bench( let duration = Instant::now() - now; - let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0); - let time_per_request = duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64; - assert_eq!(num_responses, ANNOUNCE_REQUESTS); if dummy == 123u8 { println!("dummy info"); } - (requests_per_second, time_per_request) + (ANNOUNCE_REQUESTS, duration) } diff --git a/aquatic_bench/src/bin/bench_handlers/config.rs b/aquatic_bench/src/bin/bench_handlers/config.rs index 256a275..02c3573 100644 --- a/aquatic_bench/src/bin/bench_handlers/config.rs +++ b/aquatic_bench/src/bin/bench_handlers/config.rs @@ -4,6 +4,7 @@ use serde::{Serialize, Deserialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BenchConfig { pub num_rounds: u64, + pub num_threads: u64, } @@ -11,6 +12,7 @@ impl Default for BenchConfig { fn default() -> Self { Self { num_rounds: 20, + num_threads: 4, } } } \ No newline at end of file diff --git a/aquatic_bench/src/bin/bench_handlers/connect.rs b/aquatic_bench/src/bin/bench_handlers/connect.rs index b48d8d2..ffd11d4 100644 --- a/aquatic_bench/src/bin/bench_handlers/connect.rs +++ b/aquatic_bench/src/bin/bench_handlers/connect.rs @@ -1,6 +1,7 @@ use std::io::Cursor; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::net::SocketAddr; +use std::sync::Arc; use rand::{Rng, SeedableRng, thread_rng, rngs::{SmallRng, StdRng}}; @@ -15,9 +16,9 @@ const ITERATIONS: usize = 10_000_000; pub fn bench( - requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> -) -> (f64, f64){ - let state = State::new(); + state: State, + requests: Arc> +) -> (usize, Duration){ let mut responses = Vec::with_capacity(ITERATIONS); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -29,10 +30,10 @@ pub fn bench( let now = Instant::now(); - let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.into_iter() + let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.iter() .map(|(request_bytes, src)| { - if let Request::Connect(r) = request_from_bytes(&request_bytes, 255).unwrap() { - (r, src) + if let Request::Connect(r) = request_from_bytes(request_bytes, 255).unwrap() { + (r, *src) } else { unreachable!() } @@ -57,35 +58,13 @@ pub fn bench( let duration = Instant::now() - now; - let requests_per_second = ITERATIONS as f64 / (duration.as_micros() as f64 / 1000000.0); - let time_per_request = duration.as_nanos() as f64 / ITERATIONS as f64; - assert_eq!(num_responses, ITERATIONS); - // println!("\nrequests/second: {:.2}", requests_per_second); - // println!("time per request: {:.2}ns", time_per_request); - - /* - let mut dummy = 0usize; - - for (response, _src) in responses { - if let Response::Connect(response) = response { - if response.connection_id.0 > 0 { - dummy += 1; - } - } - } - - if dummy == ITERATIONS { - println!("dummy test output: {}", dummy); - } - */ - if dummy == 123u8 { println!("dummy info"); } - (requests_per_second, time_per_request) + (ITERATIONS, duration) } diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index b67d108..9ce33ae 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -2,17 +2,19 @@ //! //! Example summary output: //! ``` -//! ## Average results over 20 rounds +//! ## Average results over 20 rounds in 4 threads //! -//! Connect handler: 2 473 860 requests/second, 404.94 ns/request -//! Announce handler: 302 665 requests/second, 3306.17 ns/request -//! Scrape handler: 745 598 requests/second, 1341.30 ns/request +//! Connect handler: 2 543 896 requests/second, 393.10 ns/request +//! Announce handler: 382 055 requests/second, 2617.42 ns/request +//! Scrape handler: 1 168 651 requests/second, 855.69 ns/request //! ``` use std::time::{Duration, Instant}; use std::io::Cursor; use std::net::SocketAddr; +use std::sync::Arc; +use dashmap::DashMap; use indicatif::{ProgressBar, ProgressStyle, ProgressIterator}; use num_format::{Locale, ToFormattedString}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; @@ -39,18 +41,21 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn print_results( request_type: &str, - num_rounds: u64, - data: (f64, f64) + config: &BenchConfig, + num_items: usize, + duration: Duration, ) { let per_second = ( - (data.0 / (num_rounds as f64) + ((config.num_threads as f64 * num_items as f64) / (duration.as_micros() as f64 / 1000000.0) ) as usize).to_formatted_string(&Locale::se); + let time_per_request = duration.as_nanos() as f64 / (config.num_threads as f64 * num_items as f64); + println!( "{} {:>10} requests/second, {:>8.2} ns/request", request_type, per_second, - data.1 / (num_rounds as f64) + time_per_request, ); } @@ -64,18 +69,9 @@ fn main(){ fn run(bench_config: BenchConfig){ - let num_rounds = bench_config.num_rounds; - - let mut connect_data = (0.0, 0.0); - let mut announce_data = (0.0, 0.0); - let mut scrape_data = (0.0, 0.0); - - fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { - let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}"); - let style = ProgressStyle::default_bar().template(&t); - - ProgressBar::new(iterations).with_style(style) - } + let mut connect_data = (0usize, Duration::new(0, 0)); + let mut announce_data = (0usize, Duration::new(0, 0)); + let mut scrape_data = (0usize, Duration::new(0, 0)); println!("# Benchmarking request handlers\n"); @@ -94,21 +90,26 @@ fn run(bench_config: BenchConfig){ }) .collect(); - ::std::thread::sleep(Duration::from_secs(1)); + let requests = Arc::new(requests); - let pb = create_progress_bar("Connect handler", num_rounds); + let pb = create_progress_bar("Connect handler", bench_config.num_rounds); - for _ in (0..num_rounds).progress_with(pb){ - let requests = requests.clone(); + for _ in (0..bench_config.num_rounds).progress_with(pb){ + let state = State::new(); - ::std::thread::sleep(Duration::from_millis(200)); + let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { + let requests = requests.clone(); + let state = state.clone(); - let d = connect::bench(requests); + ::std::thread::spawn(|| connect::bench(state, requests)) + }).collect(); - ::std::thread::sleep(Duration::from_millis(200)); + for handle in handles { + let (iterations, duration) = handle.join().unwrap(); - connect_data.0 += d.0; - connect_data.1 += d.1; + connect_data.0 += iterations; + connect_data.1 += duration; + } } } @@ -117,16 +118,15 @@ fn run(bench_config: BenchConfig){ let config = Config::default(); // Benchmark announce handler - let last_state: State = { - let state = State::new(); - + let last_torrents: Option> = { let requests = announce::create_requests( &mut rng, &info_hashes ); - // Create connections in state + // Create connections + let connections = Arc::new(DashMap::new()); let time = Time(Instant::now()); for (request, src) in requests.iter() { @@ -135,7 +135,7 @@ fn run(bench_config: BenchConfig){ socket_addr: *src, }; - state.connections.insert(key, time); + connections.insert(key, time); } let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter() @@ -148,34 +148,45 @@ fn run(bench_config: BenchConfig){ (buffer, src) }) .collect(); + + let requests = Arc::new(requests); - ::std::thread::sleep(Duration::from_secs(1)); + let pb = create_progress_bar("Announce handler", bench_config.num_rounds); - let pb = create_progress_bar("Announce handler", num_rounds); + let mut last_torrents = None; - for _ in (0..num_rounds).progress_with(pb) { - let requests = requests.clone(); - state.torrents.clear(); - state.torrents.shrink_to_fit(); + for i in (0..bench_config.num_rounds).progress_with(pb){ + let mut state = State::new(); - ::std::thread::sleep(Duration::from_millis(200)); + state.connections = connections.clone(); - let d = announce::bench(&state, &config, requests); + let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { + let requests = requests.clone(); + let state = state.clone(); + let config = config.clone(); - ::std::thread::sleep(Duration::from_millis(200)); + ::std::thread::spawn(move || announce::bench(&state, &config, requests)) + }).collect(); - announce_data.0 += d.0; - announce_data.1 += d.1; + for handle in handles { + let (iterations, duration) = handle.join().unwrap(); + + announce_data.0 += iterations; + announce_data.1 += duration; + } + + if i + 1 == bench_config.num_rounds { + last_torrents = Some(state.torrents); + } } - state + last_torrents }; // Benchmark scrape handler { - let state = last_state; - - state.connections.clear(); + let mut state = State::new(); + state.torrents = last_torrents.unwrap(); let requests = scrape::create_requests(&mut rng, &info_hashes); @@ -202,30 +213,37 @@ fn run(bench_config: BenchConfig){ (buffer, src) }) .collect(); + + let requests = Arc::new(requests); - ::std::thread::sleep(Duration::from_secs(1)); + let pb = create_progress_bar("Scrape handler", bench_config.num_rounds); - let pb = create_progress_bar("Scrape handler", num_rounds); + for _ in (0..bench_config.num_rounds).progress_with(pb){ + let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { + let requests = requests.clone(); + let state = state.clone(); - for _ in (0..num_rounds).progress_with(pb) { - let requests = requests.clone(); + ::std::thread::spawn(move || scrape::bench(&state, requests)) + }).collect(); - ::std::thread::sleep(Duration::from_millis(200)); + for handle in handles { + let (iterations, duration) = handle.join().unwrap(); - let d = scrape::bench(&state, requests); - - ::std::thread::sleep(Duration::from_millis(200)); - - scrape_data.0 += d.0; - scrape_data.1 += d.1; + scrape_data.0 += iterations; + scrape_data.1 += duration; + } } } - println!("\n## Average results over {} rounds\n", num_rounds); + println!( + "\n## Average results over {} rounds in {} threads\n", + bench_config.num_rounds, + bench_config.num_threads + ); - print_results("Connect handler: ", num_rounds, connect_data); - print_results("Announce handler:", num_rounds, announce_data); - print_results("Scrape handler: ", num_rounds, scrape_data); + print_results("Connect handler: ", &bench_config, connect_data.0, connect_data.1); + print_results("Announce handler:", &bench_config, announce_data.0, announce_data.1); + print_results("Scrape handler: ", &bench_config, scrape_data.0, scrape_data.1); } @@ -238,3 +256,11 @@ fn create_info_hashes(rng: &mut impl Rng) -> Vec { info_hashes } + + +fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { + let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}"); + let style = ProgressStyle::default_bar().template(&t); + + ProgressBar::new(iterations).with_style(style) +} \ No newline at end of file diff --git a/aquatic_bench/src/bin/bench_handlers/scrape.rs b/aquatic_bench/src/bin/bench_handlers/scrape.rs index 4e056d9..c485cd4 100644 --- a/aquatic_bench/src/bin/bench_handlers/scrape.rs +++ b/aquatic_bench/src/bin/bench_handlers/scrape.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use std::net::SocketAddr; -use std::time::Instant; +use std::time::{Duration, Instant}; +use std::sync::Arc; use rand::Rng; use rand_distr::Pareto; @@ -19,8 +20,8 @@ const SCRAPE_NUM_HASHES: usize = 10; pub fn bench( state: &State, - requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> -) -> (f64, f64) { + requests: Arc> +) -> (usize, Duration){ let mut responses = Vec::with_capacity(SCRAPE_REQUESTS); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -30,10 +31,10 @@ pub fn bench( let now = Instant::now(); - let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.into_iter() + let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.iter() .map(|(request_bytes, src)| { - if let Request::Scrape(r) = request_from_bytes(&request_bytes, 255).unwrap() { - (r, src) + if let Request::Scrape(r) = request_from_bytes(request_bytes, 255).unwrap() { + (r, *src) } else { unreachable!() } @@ -62,16 +63,13 @@ pub fn bench( let duration = Instant::now() - now; - let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0); - let time_per_request = duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64; - assert_eq!(num_responses, SCRAPE_REQUESTS); if dummy == 123u8 { println!("dummy info"); } - (requests_per_second, time_per_request) + (SCRAPE_REQUESTS, duration) }