diff --git a/Cargo.lock b/Cargo.lock index e00d467..e3b44bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,7 @@ dependencies = [ "aquatic", "bittorrent_udp", "cli_helpers", + "dashmap", "indicatif", "mimalloc", "num-format", diff --git a/TODO.md b/TODO.md index 198c205..34e40fb 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,6 @@ # TODO * Benchmarks - * Run multiple threads to test performance when contested? * Iterate over whole returned buffer and run e.g. xor on it (.iter().fold()) * Generic bench function since current functions are almost identical * Show percentile stats for peers per torrent @@ -30,6 +29,9 @@ * mialloc good? * Use less bytes from PeerId for hashing? Would need to implement "faulty" PartialEq too (on PeerMapKey, which would be OK) + * Seperate dashmap for torrent stats, meaning there will be no + contention across announce and scrape requests. Will however + require two lookups in announce requests. Do this? * bittorrent_udp * ParseError enum maybe, with `Option` * Avoid heap allocation in general if it can be avoided? diff --git a/aquatic_bench/Cargo.toml b/aquatic_bench/Cargo.toml index ddf376d..b20ccce 100644 --- a/aquatic_bench/Cargo.toml +++ b/aquatic_bench/Cargo.toml @@ -18,6 +18,7 @@ name = "plot_pareto" aquatic = { path = "../aquatic" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } +dashmap = "3" indicatif = "0.14" mimalloc = { version = "0.1", default-features = false } num-format = "0.4" diff --git a/aquatic_bench/src/bin/bench_handlers/announce.rs b/aquatic_bench/src/bin/bench_handlers/announce.rs index 6c42a94..f1ec56e 100644 --- a/aquatic_bench/src/bin/bench_handlers/announce.rs +++ b/aquatic_bench/src/bin/bench_handlers/announce.rs @@ -1,6 +1,7 @@ use std::io::Cursor; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::net::SocketAddr; +use std::sync::Arc; use rand::{Rng, SeedableRng, thread_rng, rngs::SmallRng}; use rand_distr::Pareto; @@ -20,8 +21,8 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000; pub fn bench( state: &State, config: &Config, - requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> -) -> (f64, f64) { + requests: Arc> +) -> (usize, Duration){ let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -33,10 +34,10 @@ pub fn bench( let now = Instant::now(); - let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.into_iter() + let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.iter() .map(|(request_bytes, src)| { - if let Request::Announce(r) = request_from_bytes(&request_bytes, 255).unwrap() { - (r, src) + if let Request::Announce(r) = request_from_bytes(request_bytes, 255).unwrap() { + (r, *src) } else { unreachable!() } @@ -67,16 +68,13 @@ pub fn bench( let duration = Instant::now() - now; - let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0); - let time_per_request = duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64; - assert_eq!(num_responses, ANNOUNCE_REQUESTS); if dummy == 123u8 { println!("dummy info"); } - (requests_per_second, time_per_request) + (ANNOUNCE_REQUESTS, duration) } diff --git a/aquatic_bench/src/bin/bench_handlers/config.rs b/aquatic_bench/src/bin/bench_handlers/config.rs index 256a275..02c3573 100644 --- a/aquatic_bench/src/bin/bench_handlers/config.rs +++ b/aquatic_bench/src/bin/bench_handlers/config.rs @@ -4,6 +4,7 @@ use serde::{Serialize, Deserialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BenchConfig { pub num_rounds: u64, + pub num_threads: u64, } @@ -11,6 +12,7 @@ impl Default for BenchConfig { fn default() -> Self { Self { num_rounds: 20, + num_threads: 4, } } } \ No newline at end of file diff --git a/aquatic_bench/src/bin/bench_handlers/connect.rs b/aquatic_bench/src/bin/bench_handlers/connect.rs index b48d8d2..ffd11d4 100644 --- a/aquatic_bench/src/bin/bench_handlers/connect.rs +++ b/aquatic_bench/src/bin/bench_handlers/connect.rs @@ -1,6 +1,7 @@ use std::io::Cursor; -use std::time::Instant; +use std::time::{Duration, Instant}; use std::net::SocketAddr; +use std::sync::Arc; use rand::{Rng, SeedableRng, thread_rng, rngs::{SmallRng, StdRng}}; @@ -15,9 +16,9 @@ const ITERATIONS: usize = 10_000_000; pub fn bench( - requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> -) -> (f64, f64){ - let state = State::new(); + state: State, + requests: Arc> +) -> (usize, Duration){ let mut responses = Vec::with_capacity(ITERATIONS); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -29,10 +30,10 @@ pub fn bench( let now = Instant::now(); - let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.into_iter() + let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.iter() .map(|(request_bytes, src)| { - if let Request::Connect(r) = request_from_bytes(&request_bytes, 255).unwrap() { - (r, src) + if let Request::Connect(r) = request_from_bytes(request_bytes, 255).unwrap() { + (r, *src) } else { unreachable!() } @@ -57,35 +58,13 @@ pub fn bench( let duration = Instant::now() - now; - let requests_per_second = ITERATIONS as f64 / (duration.as_micros() as f64 / 1000000.0); - let time_per_request = duration.as_nanos() as f64 / ITERATIONS as f64; - assert_eq!(num_responses, ITERATIONS); - // println!("\nrequests/second: {:.2}", requests_per_second); - // println!("time per request: {:.2}ns", time_per_request); - - /* - let mut dummy = 0usize; - - for (response, _src) in responses { - if let Response::Connect(response) = response { - if response.connection_id.0 > 0 { - dummy += 1; - } - } - } - - if dummy == ITERATIONS { - println!("dummy test output: {}", dummy); - } - */ - if dummy == 123u8 { println!("dummy info"); } - (requests_per_second, time_per_request) + (ITERATIONS, duration) } diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index b67d108..9ce33ae 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -2,17 +2,19 @@ //! //! Example summary output: //! ``` -//! ## Average results over 20 rounds +//! ## Average results over 20 rounds in 4 threads //! -//! Connect handler: 2 473 860 requests/second, 404.94 ns/request -//! Announce handler: 302 665 requests/second, 3306.17 ns/request -//! Scrape handler: 745 598 requests/second, 1341.30 ns/request +//! Connect handler: 2 543 896 requests/second, 393.10 ns/request +//! Announce handler: 382 055 requests/second, 2617.42 ns/request +//! Scrape handler: 1 168 651 requests/second, 855.69 ns/request //! ``` use std::time::{Duration, Instant}; use std::io::Cursor; use std::net::SocketAddr; +use std::sync::Arc; +use dashmap::DashMap; use indicatif::{ProgressBar, ProgressStyle, ProgressIterator}; use num_format::{Locale, ToFormattedString}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; @@ -39,18 +41,21 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn print_results( request_type: &str, - num_rounds: u64, - data: (f64, f64) + config: &BenchConfig, + num_items: usize, + duration: Duration, ) { let per_second = ( - (data.0 / (num_rounds as f64) + ((config.num_threads as f64 * num_items as f64) / (duration.as_micros() as f64 / 1000000.0) ) as usize).to_formatted_string(&Locale::se); + let time_per_request = duration.as_nanos() as f64 / (config.num_threads as f64 * num_items as f64); + println!( "{} {:>10} requests/second, {:>8.2} ns/request", request_type, per_second, - data.1 / (num_rounds as f64) + time_per_request, ); } @@ -64,18 +69,9 @@ fn main(){ fn run(bench_config: BenchConfig){ - let num_rounds = bench_config.num_rounds; - - let mut connect_data = (0.0, 0.0); - let mut announce_data = (0.0, 0.0); - let mut scrape_data = (0.0, 0.0); - - fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { - let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}"); - let style = ProgressStyle::default_bar().template(&t); - - ProgressBar::new(iterations).with_style(style) - } + let mut connect_data = (0usize, Duration::new(0, 0)); + let mut announce_data = (0usize, Duration::new(0, 0)); + let mut scrape_data = (0usize, Duration::new(0, 0)); println!("# Benchmarking request handlers\n"); @@ -94,21 +90,26 @@ fn run(bench_config: BenchConfig){ }) .collect(); - ::std::thread::sleep(Duration::from_secs(1)); + let requests = Arc::new(requests); - let pb = create_progress_bar("Connect handler", num_rounds); + let pb = create_progress_bar("Connect handler", bench_config.num_rounds); - for _ in (0..num_rounds).progress_with(pb){ - let requests = requests.clone(); + for _ in (0..bench_config.num_rounds).progress_with(pb){ + let state = State::new(); - ::std::thread::sleep(Duration::from_millis(200)); + let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { + let requests = requests.clone(); + let state = state.clone(); - let d = connect::bench(requests); + ::std::thread::spawn(|| connect::bench(state, requests)) + }).collect(); - ::std::thread::sleep(Duration::from_millis(200)); + for handle in handles { + let (iterations, duration) = handle.join().unwrap(); - connect_data.0 += d.0; - connect_data.1 += d.1; + connect_data.0 += iterations; + connect_data.1 += duration; + } } } @@ -117,16 +118,15 @@ fn run(bench_config: BenchConfig){ let config = Config::default(); // Benchmark announce handler - let last_state: State = { - let state = State::new(); - + let last_torrents: Option> = { let requests = announce::create_requests( &mut rng, &info_hashes ); - // Create connections in state + // Create connections + let connections = Arc::new(DashMap::new()); let time = Time(Instant::now()); for (request, src) in requests.iter() { @@ -135,7 +135,7 @@ fn run(bench_config: BenchConfig){ socket_addr: *src, }; - state.connections.insert(key, time); + connections.insert(key, time); } let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter() @@ -148,34 +148,45 @@ fn run(bench_config: BenchConfig){ (buffer, src) }) .collect(); + + let requests = Arc::new(requests); - ::std::thread::sleep(Duration::from_secs(1)); + let pb = create_progress_bar("Announce handler", bench_config.num_rounds); - let pb = create_progress_bar("Announce handler", num_rounds); + let mut last_torrents = None; - for _ in (0..num_rounds).progress_with(pb) { - let requests = requests.clone(); - state.torrents.clear(); - state.torrents.shrink_to_fit(); + for i in (0..bench_config.num_rounds).progress_with(pb){ + let mut state = State::new(); - ::std::thread::sleep(Duration::from_millis(200)); + state.connections = connections.clone(); - let d = announce::bench(&state, &config, requests); + let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { + let requests = requests.clone(); + let state = state.clone(); + let config = config.clone(); - ::std::thread::sleep(Duration::from_millis(200)); + ::std::thread::spawn(move || announce::bench(&state, &config, requests)) + }).collect(); - announce_data.0 += d.0; - announce_data.1 += d.1; + for handle in handles { + let (iterations, duration) = handle.join().unwrap(); + + announce_data.0 += iterations; + announce_data.1 += duration; + } + + if i + 1 == bench_config.num_rounds { + last_torrents = Some(state.torrents); + } } - state + last_torrents }; // Benchmark scrape handler { - let state = last_state; - - state.connections.clear(); + let mut state = State::new(); + state.torrents = last_torrents.unwrap(); let requests = scrape::create_requests(&mut rng, &info_hashes); @@ -202,30 +213,37 @@ fn run(bench_config: BenchConfig){ (buffer, src) }) .collect(); + + let requests = Arc::new(requests); - ::std::thread::sleep(Duration::from_secs(1)); + let pb = create_progress_bar("Scrape handler", bench_config.num_rounds); - let pb = create_progress_bar("Scrape handler", num_rounds); + for _ in (0..bench_config.num_rounds).progress_with(pb){ + let handles: Vec<_> = (0..bench_config.num_threads).map(|_| { + let requests = requests.clone(); + let state = state.clone(); - for _ in (0..num_rounds).progress_with(pb) { - let requests = requests.clone(); + ::std::thread::spawn(move || scrape::bench(&state, requests)) + }).collect(); - ::std::thread::sleep(Duration::from_millis(200)); + for handle in handles { + let (iterations, duration) = handle.join().unwrap(); - let d = scrape::bench(&state, requests); - - ::std::thread::sleep(Duration::from_millis(200)); - - scrape_data.0 += d.0; - scrape_data.1 += d.1; + scrape_data.0 += iterations; + scrape_data.1 += duration; + } } } - println!("\n## Average results over {} rounds\n", num_rounds); + println!( + "\n## Average results over {} rounds in {} threads\n", + bench_config.num_rounds, + bench_config.num_threads + ); - print_results("Connect handler: ", num_rounds, connect_data); - print_results("Announce handler:", num_rounds, announce_data); - print_results("Scrape handler: ", num_rounds, scrape_data); + print_results("Connect handler: ", &bench_config, connect_data.0, connect_data.1); + print_results("Announce handler:", &bench_config, announce_data.0, announce_data.1); + print_results("Scrape handler: ", &bench_config, scrape_data.0, scrape_data.1); } @@ -238,3 +256,11 @@ fn create_info_hashes(rng: &mut impl Rng) -> Vec { info_hashes } + + +fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { + let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}"); + let style = ProgressStyle::default_bar().template(&t); + + ProgressBar::new(iterations).with_style(style) +} \ No newline at end of file diff --git a/aquatic_bench/src/bin/bench_handlers/scrape.rs b/aquatic_bench/src/bin/bench_handlers/scrape.rs index 4e056d9..c485cd4 100644 --- a/aquatic_bench/src/bin/bench_handlers/scrape.rs +++ b/aquatic_bench/src/bin/bench_handlers/scrape.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use std::net::SocketAddr; -use std::time::Instant; +use std::time::{Duration, Instant}; +use std::sync::Arc; use rand::Rng; use rand_distr::Pareto; @@ -19,8 +20,8 @@ const SCRAPE_NUM_HASHES: usize = 10; pub fn bench( state: &State, - requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> -) -> (f64, f64) { + requests: Arc> +) -> (usize, Duration){ let mut responses = Vec::with_capacity(SCRAPE_REQUESTS); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -30,10 +31,10 @@ pub fn bench( let now = Instant::now(); - let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.into_iter() + let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.iter() .map(|(request_bytes, src)| { - if let Request::Scrape(r) = request_from_bytes(&request_bytes, 255).unwrap() { - (r, src) + if let Request::Scrape(r) = request_from_bytes(request_bytes, 255).unwrap() { + (r, *src) } else { unreachable!() } @@ -62,16 +63,13 @@ pub fn bench( let duration = Instant::now() - now; - let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0); - let time_per_request = duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64; - assert_eq!(num_responses, SCRAPE_REQUESTS); if dummy == 123u8 { println!("dummy info"); } - (requests_per_second, time_per_request) + (SCRAPE_REQUESTS, duration) }