aquatic_bench: start work on fixing it: create new connect bencher

This commit is contained in:
Joakim Frostegård 2020-04-13 01:34:05 +02:00
parent 6c4013ca55
commit f4ca9c2795
6 changed files with 121 additions and 101 deletions

23
Cargo.lock generated
View file

@ -51,7 +51,7 @@ dependencies = [
"aquatic", "aquatic",
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
"dashmap", "crossbeam-channel",
"indicatif", "indicatif",
"mimalloc", "mimalloc",
"num-format", "num-format",
@ -260,17 +260,6 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "dashmap"
version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0899c830f359a74834c84ed43b4c0cb6fd714a6fa64e20ab78c78f8cf86d8fc0"
dependencies = [
"ahash",
"cfg-if",
"num_cpus",
]
[[package]] [[package]]
name = "encode_unicode" name = "encode_unicode"
version = "0.3.6" version = "0.3.6"
@ -583,16 +572,6 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "num_cpus"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "number_prefix" name = "number_prefix"
version = "0.3.0" version = "0.3.0"

View file

@ -18,7 +18,7 @@ name = "plot_pareto"
aquatic = { path = "../aquatic" } aquatic = { path = "../aquatic" }
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
dashmap = "3" crossbeam-channel = "0.4"
indicatif = "0.14" indicatif = "0.14"
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
num-format = "0.4" num-format = "0.4"

View file

@ -1,5 +1,36 @@
use std::time::Duration;
use indicatif::{ProgressBar, ProgressStyle};
use num_format::{Locale, ToFormattedString};
pub const PARETO_SHAPE: f64 = 0.1; pub const PARETO_SHAPE: f64 = 0.1;
pub const NUM_INFO_HASHES: usize = 10_000; pub const NUM_INFO_HASHES: usize = 10_000;
/// Save memory by not allocating more per request
pub const MAX_REQUEST_BYTES: usize = 256; pub fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar {
let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}");
let style = ProgressStyle::default_bar().template(&t);
ProgressBar::new(iterations).with_style(style)
}
pub fn print_results(
request_type: &str,
num_responses: usize,
duration: Duration,
) {
let per_second = (
(num_responses as f64 / (duration.as_micros() as f64 / 1000000.0)
) as usize).to_formatted_string(&Locale::se);
let time_per_request = duration.as_nanos() as f64 / (num_responses as f64);
println!(
"{} {:>10} requests/second, {:>8.2} ns/request",
request_type,
per_second,
time_per_request,
);
}

View file

@ -3,8 +3,9 @@ use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BenchConfig { pub struct BenchConfig {
pub num_rounds: u64, pub num_rounds: usize,
pub num_threads: u64, pub num_threads: usize,
pub num_connect_requests: usize,
} }
@ -12,7 +13,8 @@ impl Default for BenchConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
num_rounds: 20, num_rounds: 20,
num_threads: 4, num_threads: 2,
num_connect_requests: 5_000_000,
} }
} }
} }

View file

@ -1,77 +1,101 @@
use std::io::Cursor; use std::time::Instant;
use std::time::{Duration, Instant};
use crossbeam_channel::unbounded;
use indicatif::ProgressIterator;
use rand::{Rng, SeedableRng, thread_rng, rngs::SmallRng};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use rand::{Rng, SeedableRng, thread_rng, rngs::{SmallRng, StdRng}};
use aquatic::handlers;
use aquatic::common::*; use aquatic::common::*;
use aquatic::handlers::handle_connect_requests; use aquatic::config::Config;
use bittorrent_udp::converters::*;
use crate::common::*; use crate::common::*;
use crate::config::BenchConfig;
const ITERATIONS: usize = 10_000_000; pub fn bench_connect_handler(bench_config: BenchConfig){
// Setup common state, spawn request handlers
let state = State::new();
let aquatic_config = Config::default();
pub fn bench( let (request_sender, request_receiver) = unbounded();
state: State, let (response_sender, response_receiver) = unbounded();
requests: Arc<Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>>
) -> (usize, Duration){
let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(buffer.as_mut());
let mut num_responses: usize = 0;
let mut dummy = 0u8;
let mut rng = StdRng::from_rng(thread_rng()).unwrap(); for _ in 0..bench_config.num_threads {
let state = state.clone();
let config = aquatic_config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
let now = Instant::now(); ::std::thread::spawn(move || {
handlers::run_request_worker(
state,
config,
request_receiver,
response_sender
)
});
}
let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.iter() // Setup connect benchmark data
.map(|(request_bytes, src)| {
if let Request::Connect(r) = request_from_bytes(request_bytes, 255).unwrap() { let requests = create_requests(
(r, *src) bench_config.num_connect_requests
} else { );
unreachable!()
let p = aquatic_config.handlers.max_requests_per_iter * bench_config.num_threads;
let mut num_responses = 0usize;
let mut dummy: i64 = thread_rng().gen();
let pb = create_progress_bar("Connect handler", bench_config.num_rounds as u64);
// Start connect benchmark
let before = Instant::now();
for round in (0..bench_config.num_rounds).progress_with(pb){
for request_chunk in requests.chunks(p){
for (request, src) in request_chunk {
request_sender.send((request.clone().into(), *src)).unwrap();
} }
})
.collect();
let requests = requests.drain(..);
handle_connect_requests(&state, &mut rng, requests); while let Ok((Response::Connect(r), _)) = response_receiver.try_recv() {
num_responses += 1;
while let Ok((response, _)) = state.response_queue.pop(){ dummy ^= r.connection_id.0;
if let Response::Connect(_) = response { }
num_responses += 1;
} }
cursor.set_position(0); let total = bench_config.num_connect_requests * (round + 1);
response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap(); while num_responses < total {
match response_receiver.recv(){
dummy ^= cursor.get_ref()[0]; Ok((Response::Connect(r), _)) => {
num_responses += 1;
dummy ^= r.connection_id.0;
},
_ => {}
}
}
} }
let duration = Instant::now() - now; let elapsed = before.elapsed();
assert_eq!(num_responses, ITERATIONS); print_results("Connect handler:", num_responses, elapsed);
if dummy == 123u8 { if dummy == 0 {
println!("dummy info"); println!("dummy dummy");
} }
(ITERATIONS, duration)
} }
pub fn create_requests() -> Vec<(ConnectRequest, SocketAddr)> { pub fn create_requests(number: usize) -> Vec<(ConnectRequest, SocketAddr)> {
let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
let mut requests = Vec::new(); let mut requests = Vec::new();
for _ in 0..ITERATIONS { for _ in 0..number {
let request = ConnectRequest { let request = ConnectRequest {
transaction_id: TransactionId(rng.gen()), transaction_id: TransactionId(rng.gen()),
}; };
@ -82,4 +106,4 @@ pub fn create_requests() -> Vec<(ConnectRequest, SocketAddr)> {
} }
requests requests
} }

View file

@ -14,7 +14,6 @@ use std::io::Cursor;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use dashmap::DashMap;
use indicatif::{ProgressBar, ProgressStyle, ProgressIterator}; use indicatif::{ProgressBar, ProgressStyle, ProgressIterator};
use num_format::{Locale, ToFormattedString}; use num_format::{Locale, ToFormattedString};
use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng};
@ -25,11 +24,11 @@ use bittorrent_udp::converters::*;
use cli_helpers::run_app_with_cli_and_config; use cli_helpers::run_app_with_cli_and_config;
mod announce; // mod announce;
mod common; mod common;
mod config; mod config;
mod connect; mod connect;
mod scrape; // mod scrape;
use common::*; use common::*;
use config::BenchConfig; use config::BenchConfig;
@ -39,35 +38,18 @@ use config::BenchConfig;
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn print_results(
request_type: &str,
config: &BenchConfig,
num_items: usize,
duration: Duration,
) {
let per_second = (
((config.num_threads as f64 * num_items as f64) / (duration.as_micros() as f64 / 1000000.0)
) as usize).to_formatted_string(&Locale::se);
let time_per_request = duration.as_nanos() as f64 / (config.num_threads as f64 * num_items as f64);
println!(
"{} {:>10} requests/second, {:>8.2} ns/request",
request_type,
per_second,
time_per_request,
);
}
fn main(){ fn main(){
run_app_with_cli_and_config::<BenchConfig>( run_app_with_cli_and_config::<BenchConfig>(
"aquatic benchmarker", "aquatic benchmarker",
run connect::bench_connect_handler
) )
} }
/*
fn run(bench_config: BenchConfig){ fn run(bench_config: BenchConfig){
let mut connect_data = (0usize, Duration::new(0, 0)); let mut connect_data = (0usize, Duration::new(0, 0));
let mut announce_data = (0usize, Duration::new(0, 0)); let mut announce_data = (0usize, Duration::new(0, 0));
@ -94,7 +76,7 @@ fn run(bench_config: BenchConfig){
let requests = Arc::new(requests); let requests = Arc::new(requests);
let pb = create_progress_bar("Connect handler", bench_config.num_rounds); let pb = create_progress_bar("Connect handler", bench_config.num_rounds as u64);
for _ in (0..bench_config.num_rounds).progress_with(pb){ for _ in (0..bench_config.num_rounds).progress_with(pb){
let state = State::new(); let state = State::new();
@ -264,4 +246,6 @@ fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar {
let style = ProgressStyle::default_bar().template(&t); let style = ProgressStyle::default_bar().template(&t);
ProgressBar::new(iterations).with_style(style) ProgressBar::new(iterations).with_style(style)
} }
*/