aquatic_bench: test multiple threads at once, with disappointing results

DashMap doesn't scale as well as I had hoped. Only the scrape
handler performed somewhat better with more threads, since it
doesn't exlusively lock the torrent map. The announce and connect
handlers, however, gained barely nothing from more threads.
This commit is contained in:
Joakim Frostegård 2020-04-11 17:32:05 +02:00
parent eded822c31
commit 8d7cbb7926
8 changed files with 122 additions and 115 deletions

1
Cargo.lock generated
View file

@ -49,6 +49,7 @@ dependencies = [
"aquatic", "aquatic",
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
"dashmap",
"indicatif", "indicatif",
"mimalloc", "mimalloc",
"num-format", "num-format",

View file

@ -1,7 +1,6 @@
# TODO # TODO
* Benchmarks * Benchmarks
* Run multiple threads to test performance when contested?
* Iterate over whole returned buffer and run e.g. xor on it (.iter().fold()) * Iterate over whole returned buffer and run e.g. xor on it (.iter().fold())
* Generic bench function since current functions are almost identical * Generic bench function since current functions are almost identical
* Show percentile stats for peers per torrent * Show percentile stats for peers per torrent
@ -30,6 +29,9 @@
* mialloc good? * mialloc good?
* Use less bytes from PeerId for hashing? Would need to implement * Use less bytes from PeerId for hashing? Would need to implement
"faulty" PartialEq too (on PeerMapKey, which would be OK) "faulty" PartialEq too (on PeerMapKey, which would be OK)
* Seperate dashmap for torrent stats, meaning there will be no
contention across announce and scrape requests. Will however
require two lookups in announce requests. Do this?
* bittorrent_udp * bittorrent_udp
* ParseError enum maybe, with `Option<TransactionId>` * ParseError enum maybe, with `Option<TransactionId>`
* Avoid heap allocation in general if it can be avoided? * Avoid heap allocation in general if it can be avoided?

View file

@ -18,6 +18,7 @@ name = "plot_pareto"
aquatic = { path = "../aquatic" } aquatic = { path = "../aquatic" }
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
dashmap = "3"
indicatif = "0.14" indicatif = "0.14"
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
num-format = "0.4" num-format = "0.4"

View file

@ -1,6 +1,7 @@
use std::io::Cursor; use std::io::Cursor;
use std::time::Instant; use std::time::{Duration, Instant};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use rand::{Rng, SeedableRng, thread_rng, rngs::SmallRng}; use rand::{Rng, SeedableRng, thread_rng, rngs::SmallRng};
use rand_distr::Pareto; use rand_distr::Pareto;
@ -20,8 +21,8 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000;
pub fn bench( pub fn bench(
state: &State, state: &State,
config: &Config, config: &Config,
requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> requests: Arc<Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>>
) -> (f64, f64) { ) -> (usize, Duration){
let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS);
let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut buffer = [0u8; MAX_PACKET_SIZE];
@ -33,10 +34,10 @@ pub fn bench(
let now = Instant::now(); let now = Instant::now();
let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.into_iter() let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.iter()
.map(|(request_bytes, src)| { .map(|(request_bytes, src)| {
if let Request::Announce(r) = request_from_bytes(&request_bytes, 255).unwrap() { if let Request::Announce(r) = request_from_bytes(request_bytes, 255).unwrap() {
(r, src) (r, *src)
} else { } else {
unreachable!() unreachable!()
} }
@ -67,16 +68,13 @@ pub fn bench(
let duration = Instant::now() - now; let duration = Instant::now() - now;
let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0);
let time_per_request = duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64;
assert_eq!(num_responses, ANNOUNCE_REQUESTS); assert_eq!(num_responses, ANNOUNCE_REQUESTS);
if dummy == 123u8 { if dummy == 123u8 {
println!("dummy info"); println!("dummy info");
} }
(requests_per_second, time_per_request) (ANNOUNCE_REQUESTS, duration)
} }

View file

@ -4,6 +4,7 @@ use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BenchConfig { pub struct BenchConfig {
pub num_rounds: u64, pub num_rounds: u64,
pub num_threads: u64,
} }
@ -11,6 +12,7 @@ impl Default for BenchConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
num_rounds: 20, num_rounds: 20,
num_threads: 4,
} }
} }
} }

View file

@ -1,6 +1,7 @@
use std::io::Cursor; use std::io::Cursor;
use std::time::Instant; use std::time::{Duration, Instant};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use rand::{Rng, SeedableRng, thread_rng, rngs::{SmallRng, StdRng}}; use rand::{Rng, SeedableRng, thread_rng, rngs::{SmallRng, StdRng}};
@ -15,9 +16,9 @@ const ITERATIONS: usize = 10_000_000;
pub fn bench( pub fn bench(
requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> state: State,
) -> (f64, f64){ requests: Arc<Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>>
let state = State::new(); ) -> (usize, Duration){
let mut responses = Vec::with_capacity(ITERATIONS); let mut responses = Vec::with_capacity(ITERATIONS);
let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut buffer = [0u8; MAX_PACKET_SIZE];
@ -29,10 +30,10 @@ pub fn bench(
let now = Instant::now(); let now = Instant::now();
let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.into_iter() let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.iter()
.map(|(request_bytes, src)| { .map(|(request_bytes, src)| {
if let Request::Connect(r) = request_from_bytes(&request_bytes, 255).unwrap() { if let Request::Connect(r) = request_from_bytes(request_bytes, 255).unwrap() {
(r, src) (r, *src)
} else { } else {
unreachable!() unreachable!()
} }
@ -57,35 +58,13 @@ pub fn bench(
let duration = Instant::now() - now; let duration = Instant::now() - now;
let requests_per_second = ITERATIONS as f64 / (duration.as_micros() as f64 / 1000000.0);
let time_per_request = duration.as_nanos() as f64 / ITERATIONS as f64;
assert_eq!(num_responses, ITERATIONS); assert_eq!(num_responses, ITERATIONS);
// println!("\nrequests/second: {:.2}", requests_per_second);
// println!("time per request: {:.2}ns", time_per_request);
/*
let mut dummy = 0usize;
for (response, _src) in responses {
if let Response::Connect(response) = response {
if response.connection_id.0 > 0 {
dummy += 1;
}
}
}
if dummy == ITERATIONS {
println!("dummy test output: {}", dummy);
}
*/
if dummy == 123u8 { if dummy == 123u8 {
println!("dummy info"); println!("dummy info");
} }
(requests_per_second, time_per_request) (ITERATIONS, duration)
} }

View file

@ -2,17 +2,19 @@
//! //!
//! Example summary output: //! Example summary output:
//! ``` //! ```
//! ## Average results over 20 rounds //! ## Average results over 20 rounds in 4 threads
//! //!
//! Connect handler: 2 473 860 requests/second, 404.94 ns/request //! Connect handler: 2 543 896 requests/second, 393.10 ns/request
//! Announce handler: 302 665 requests/second, 3306.17 ns/request //! Announce handler: 382 055 requests/second, 2617.42 ns/request
//! Scrape handler: 745 598 requests/second, 1341.30 ns/request //! Scrape handler: 1 168 651 requests/second, 855.69 ns/request
//! ``` //! ```
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::io::Cursor; use std::io::Cursor;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use dashmap::DashMap;
use indicatif::{ProgressBar, ProgressStyle, ProgressIterator}; use indicatif::{ProgressBar, ProgressStyle, ProgressIterator};
use num_format::{Locale, ToFormattedString}; use num_format::{Locale, ToFormattedString};
use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng};
@ -39,18 +41,21 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn print_results( fn print_results(
request_type: &str, request_type: &str,
num_rounds: u64, config: &BenchConfig,
data: (f64, f64) num_items: usize,
duration: Duration,
) { ) {
let per_second = ( let per_second = (
(data.0 / (num_rounds as f64) ((config.num_threads as f64 * num_items as f64) / (duration.as_micros() as f64 / 1000000.0)
) as usize).to_formatted_string(&Locale::se); ) as usize).to_formatted_string(&Locale::se);
let time_per_request = duration.as_nanos() as f64 / (config.num_threads as f64 * num_items as f64);
println!( println!(
"{} {:>10} requests/second, {:>8.2} ns/request", "{} {:>10} requests/second, {:>8.2} ns/request",
request_type, request_type,
per_second, per_second,
data.1 / (num_rounds as f64) time_per_request,
); );
} }
@ -64,18 +69,9 @@ fn main(){
fn run(bench_config: BenchConfig){ fn run(bench_config: BenchConfig){
let num_rounds = bench_config.num_rounds; let mut connect_data = (0usize, Duration::new(0, 0));
let mut announce_data = (0usize, Duration::new(0, 0));
let mut connect_data = (0.0, 0.0); let mut scrape_data = (0usize, Duration::new(0, 0));
let mut announce_data = (0.0, 0.0);
let mut scrape_data = (0.0, 0.0);
fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar {
let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}");
let style = ProgressStyle::default_bar().template(&t);
ProgressBar::new(iterations).with_style(style)
}
println!("# Benchmarking request handlers\n"); println!("# Benchmarking request handlers\n");
@ -94,21 +90,26 @@ fn run(bench_config: BenchConfig){
}) })
.collect(); .collect();
::std::thread::sleep(Duration::from_secs(1)); let requests = Arc::new(requests);
let pb = create_progress_bar("Connect handler", num_rounds); let pb = create_progress_bar("Connect handler", bench_config.num_rounds);
for _ in (0..num_rounds).progress_with(pb){ for _ in (0..bench_config.num_rounds).progress_with(pb){
let requests = requests.clone(); let state = State::new();
::std::thread::sleep(Duration::from_millis(200)); let handles: Vec<_> = (0..bench_config.num_threads).map(|_| {
let requests = requests.clone();
let state = state.clone();
let d = connect::bench(requests); ::std::thread::spawn(|| connect::bench(state, requests))
}).collect();
::std::thread::sleep(Duration::from_millis(200)); for handle in handles {
let (iterations, duration) = handle.join().unwrap();
connect_data.0 += d.0; connect_data.0 += iterations;
connect_data.1 += d.1; connect_data.1 += duration;
}
} }
} }
@ -117,16 +118,15 @@ fn run(bench_config: BenchConfig){
let config = Config::default(); let config = Config::default();
// Benchmark announce handler // Benchmark announce handler
let last_state: State = { let last_torrents: Option<Arc<TorrentMap>> = {
let state = State::new();
let requests = announce::create_requests( let requests = announce::create_requests(
&mut rng, &mut rng,
&info_hashes &info_hashes
); );
// Create connections in state // Create connections
let connections = Arc::new(DashMap::new());
let time = Time(Instant::now()); let time = Time(Instant::now());
for (request, src) in requests.iter() { for (request, src) in requests.iter() {
@ -135,7 +135,7 @@ fn run(bench_config: BenchConfig){
socket_addr: *src, socket_addr: *src,
}; };
state.connections.insert(key, time); connections.insert(key, time);
} }
let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter() let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter()
@ -148,34 +148,45 @@ fn run(bench_config: BenchConfig){
(buffer, src) (buffer, src)
}) })
.collect(); .collect();
let requests = Arc::new(requests);
::std::thread::sleep(Duration::from_secs(1)); let pb = create_progress_bar("Announce handler", bench_config.num_rounds);
let pb = create_progress_bar("Announce handler", num_rounds); let mut last_torrents = None;
for _ in (0..num_rounds).progress_with(pb) { for i in (0..bench_config.num_rounds).progress_with(pb){
let requests = requests.clone(); let mut state = State::new();
state.torrents.clear();
state.torrents.shrink_to_fit();
::std::thread::sleep(Duration::from_millis(200)); state.connections = connections.clone();
let d = announce::bench(&state, &config, requests); let handles: Vec<_> = (0..bench_config.num_threads).map(|_| {
let requests = requests.clone();
let state = state.clone();
let config = config.clone();
::std::thread::sleep(Duration::from_millis(200)); ::std::thread::spawn(move || announce::bench(&state, &config, requests))
}).collect();
announce_data.0 += d.0; for handle in handles {
announce_data.1 += d.1; let (iterations, duration) = handle.join().unwrap();
announce_data.0 += iterations;
announce_data.1 += duration;
}
if i + 1 == bench_config.num_rounds {
last_torrents = Some(state.torrents);
}
} }
state last_torrents
}; };
// Benchmark scrape handler // Benchmark scrape handler
{ {
let state = last_state; let mut state = State::new();
state.torrents = last_torrents.unwrap();
state.connections.clear();
let requests = scrape::create_requests(&mut rng, &info_hashes); let requests = scrape::create_requests(&mut rng, &info_hashes);
@ -202,30 +213,37 @@ fn run(bench_config: BenchConfig){
(buffer, src) (buffer, src)
}) })
.collect(); .collect();
let requests = Arc::new(requests);
::std::thread::sleep(Duration::from_secs(1)); let pb = create_progress_bar("Scrape handler", bench_config.num_rounds);
let pb = create_progress_bar("Scrape handler", num_rounds); for _ in (0..bench_config.num_rounds).progress_with(pb){
let handles: Vec<_> = (0..bench_config.num_threads).map(|_| {
let requests = requests.clone();
let state = state.clone();
for _ in (0..num_rounds).progress_with(pb) { ::std::thread::spawn(move || scrape::bench(&state, requests))
let requests = requests.clone(); }).collect();
::std::thread::sleep(Duration::from_millis(200)); for handle in handles {
let (iterations, duration) = handle.join().unwrap();
let d = scrape::bench(&state, requests); scrape_data.0 += iterations;
scrape_data.1 += duration;
::std::thread::sleep(Duration::from_millis(200)); }
scrape_data.0 += d.0;
scrape_data.1 += d.1;
} }
} }
println!("\n## Average results over {} rounds\n", num_rounds); println!(
"\n## Average results over {} rounds in {} threads\n",
bench_config.num_rounds,
bench_config.num_threads
);
print_results("Connect handler: ", num_rounds, connect_data); print_results("Connect handler: ", &bench_config, connect_data.0, connect_data.1);
print_results("Announce handler:", num_rounds, announce_data); print_results("Announce handler:", &bench_config, announce_data.0, announce_data.1);
print_results("Scrape handler: ", num_rounds, scrape_data); print_results("Scrape handler: ", &bench_config, scrape_data.0, scrape_data.1);
} }
@ -238,3 +256,11 @@ fn create_info_hashes(rng: &mut impl Rng) -> Vec<InfoHash> {
info_hashes info_hashes
} }
fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar {
let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}");
let style = ProgressStyle::default_bar().template(&t);
ProgressBar::new(iterations).with_style(style)
}

View file

@ -1,6 +1,7 @@
use std::io::Cursor; use std::io::Cursor;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant; use std::time::{Duration, Instant};
use std::sync::Arc;
use rand::Rng; use rand::Rng;
use rand_distr::Pareto; use rand_distr::Pareto;
@ -19,8 +20,8 @@ const SCRAPE_NUM_HASHES: usize = 10;
pub fn bench( pub fn bench(
state: &State, state: &State,
requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> requests: Arc<Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>>
) -> (f64, f64) { ) -> (usize, Duration){
let mut responses = Vec::with_capacity(SCRAPE_REQUESTS); let mut responses = Vec::with_capacity(SCRAPE_REQUESTS);
let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut buffer = [0u8; MAX_PACKET_SIZE];
@ -30,10 +31,10 @@ pub fn bench(
let now = Instant::now(); let now = Instant::now();
let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.into_iter() let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.iter()
.map(|(request_bytes, src)| { .map(|(request_bytes, src)| {
if let Request::Scrape(r) = request_from_bytes(&request_bytes, 255).unwrap() { if let Request::Scrape(r) = request_from_bytes(request_bytes, 255).unwrap() {
(r, src) (r, *src)
} else { } else {
unreachable!() unreachable!()
} }
@ -62,16 +63,13 @@ pub fn bench(
let duration = Instant::now() - now; let duration = Instant::now() - now;
let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0);
let time_per_request = duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64;
assert_eq!(num_responses, SCRAPE_REQUESTS); assert_eq!(num_responses, SCRAPE_REQUESTS);
if dummy == 123u8 { if dummy == 123u8 {
println!("dummy info"); println!("dummy info");
} }
(requests_per_second, time_per_request) (SCRAPE_REQUESTS, duration)
} }