diff --git a/Cargo.lock b/Cargo.lock index 94e4c9c..57bf59f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,7 +51,7 @@ dependencies = [ "aquatic", "bittorrent_udp", "cli_helpers", - "dashmap", + "crossbeam-channel", "indicatif", "mimalloc", "num-format", @@ -260,17 +260,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "dashmap" -version = "3.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0899c830f359a74834c84ed43b4c0cb6fd714a6fa64e20ab78c78f8cf86d8fc0" -dependencies = [ - "ahash", - "cfg-if", - "num_cpus", -] - [[package]] name = "encode_unicode" version = "0.3.6" @@ -583,16 +572,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "number_prefix" version = "0.3.0" diff --git a/aquatic_bench/Cargo.toml b/aquatic_bench/Cargo.toml index b20ccce..9049671 100644 --- a/aquatic_bench/Cargo.toml +++ b/aquatic_bench/Cargo.toml @@ -18,7 +18,7 @@ name = "plot_pareto" aquatic = { path = "../aquatic" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } -dashmap = "3" +crossbeam-channel = "0.4" indicatif = "0.14" mimalloc = { version = "0.1", default-features = false } num-format = "0.4" diff --git a/aquatic_bench/src/bin/bench_handlers/common.rs b/aquatic_bench/src/bin/bench_handlers/common.rs index 9ae756b..391172b 100644 --- a/aquatic_bench/src/bin/bench_handlers/common.rs +++ b/aquatic_bench/src/bin/bench_handlers/common.rs @@ -1,5 +1,36 @@ +use std::time::Duration; + +use indicatif::{ProgressBar, ProgressStyle}; +use num_format::{Locale, ToFormattedString}; + + pub const PARETO_SHAPE: f64 = 0.1; pub const NUM_INFO_HASHES: usize = 10_000; -/// Save memory by not allocating more per request -pub const MAX_REQUEST_BYTES: usize = 256; \ No newline at end of file + +pub fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { + let t = format!("{:<16} {}", name, "{wide_bar} {pos:>2}/{len:>2}"); + let style = ProgressStyle::default_bar().template(&t); + + ProgressBar::new(iterations).with_style(style) +} + + +pub fn print_results( + request_type: &str, + num_responses: usize, + duration: Duration, +) { + let per_second = ( + (num_responses as f64 / (duration.as_micros() as f64 / 1000000.0) + ) as usize).to_formatted_string(&Locale::se); + + let time_per_request = duration.as_nanos() as f64 / (num_responses as f64); + + println!( + "{} {:>10} requests/second, {:>8.2} ns/request", + request_type, + per_second, + time_per_request, + ); +} diff --git a/aquatic_bench/src/bin/bench_handlers/config.rs b/aquatic_bench/src/bin/bench_handlers/config.rs index 02c3573..f54e698 100644 --- a/aquatic_bench/src/bin/bench_handlers/config.rs +++ b/aquatic_bench/src/bin/bench_handlers/config.rs @@ -3,8 +3,9 @@ use serde::{Serialize, Deserialize}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BenchConfig { - pub num_rounds: u64, - pub num_threads: u64, + pub num_rounds: usize, + pub num_threads: usize, + pub num_connect_requests: usize, } @@ -12,7 +13,8 @@ impl Default for BenchConfig { fn default() -> Self { Self { num_rounds: 20, - num_threads: 4, + num_threads: 2, + num_connect_requests: 5_000_000, } } } \ No newline at end of file diff --git a/aquatic_bench/src/bin/bench_handlers/connect.rs b/aquatic_bench/src/bin/bench_handlers/connect.rs index 4be414e..dd61225 100644 --- a/aquatic_bench/src/bin/bench_handlers/connect.rs +++ b/aquatic_bench/src/bin/bench_handlers/connect.rs @@ -1,77 +1,101 @@ -use std::io::Cursor; -use std::time::{Duration, Instant}; +use std::time::Instant; + +use crossbeam_channel::unbounded; +use indicatif::ProgressIterator; +use rand::{Rng, SeedableRng, thread_rng, rngs::SmallRng}; use std::net::SocketAddr; -use std::sync::Arc; - -use rand::{Rng, SeedableRng, thread_rng, rngs::{SmallRng, StdRng}}; +use aquatic::handlers; use aquatic::common::*; -use aquatic::handlers::handle_connect_requests; -use bittorrent_udp::converters::*; +use aquatic::config::Config; use crate::common::*; +use crate::config::BenchConfig; -const ITERATIONS: usize = 10_000_000; +pub fn bench_connect_handler(bench_config: BenchConfig){ + // Setup common state, spawn request handlers + let state = State::new(); + let aquatic_config = Config::default(); -pub fn bench( - state: State, - requests: Arc> -) -> (usize, Duration){ - let mut buffer = [0u8; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buffer.as_mut()); - let mut num_responses: usize = 0; - let mut dummy = 0u8; + let (request_sender, request_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); - let mut rng = StdRng::from_rng(thread_rng()).unwrap(); + for _ in 0..bench_config.num_threads { + let state = state.clone(); + let config = aquatic_config.clone(); + let request_receiver = request_receiver.clone(); + let response_sender = response_sender.clone(); - let now = Instant::now(); + ::std::thread::spawn(move || { + handlers::run_request_worker( + state, + config, + request_receiver, + response_sender + ) + }); + } - let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.iter() - .map(|(request_bytes, src)| { - if let Request::Connect(r) = request_from_bytes(request_bytes, 255).unwrap() { - (r, *src) - } else { - unreachable!() + // Setup connect benchmark data + + let requests = create_requests( + bench_config.num_connect_requests + ); + + let p = aquatic_config.handlers.max_requests_per_iter * bench_config.num_threads; + let mut num_responses = 0usize; + + let mut dummy: i64 = thread_rng().gen(); + + let pb = create_progress_bar("Connect handler", bench_config.num_rounds as u64); + + // Start connect benchmark + + let before = Instant::now(); + + for round in (0..bench_config.num_rounds).progress_with(pb){ + for request_chunk in requests.chunks(p){ + for (request, src) in request_chunk { + request_sender.send((request.clone().into(), *src)).unwrap(); } - }) - .collect(); - - let requests = requests.drain(..); - handle_connect_requests(&state, &mut rng, requests); - - while let Ok((response, _)) = state.response_queue.pop(){ - if let Response::Connect(_) = response { - num_responses += 1; + while let Ok((Response::Connect(r), _)) = response_receiver.try_recv() { + num_responses += 1; + dummy ^= r.connection_id.0; + } } - cursor.set_position(0); + let total = bench_config.num_connect_requests * (round + 1); - response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap(); - - dummy ^= cursor.get_ref()[0]; + while num_responses < total { + match response_receiver.recv(){ + Ok((Response::Connect(r), _)) => { + num_responses += 1; + dummy ^= r.connection_id.0; + }, + _ => {} + } + } } - let duration = Instant::now() - now; + let elapsed = before.elapsed(); - assert_eq!(num_responses, ITERATIONS); + print_results("Connect handler:", num_responses, elapsed); - if dummy == 123u8 { - println!("dummy info"); + if dummy == 0 { + println!("dummy dummy"); } - - (ITERATIONS, duration) } -pub fn create_requests() -> Vec<(ConnectRequest, SocketAddr)> { +pub fn create_requests(number: usize) -> Vec<(ConnectRequest, SocketAddr)> { let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); let mut requests = Vec::new(); - for _ in 0..ITERATIONS { + for _ in 0..number { let request = ConnectRequest { transaction_id: TransactionId(rng.gen()), }; @@ -82,4 +106,4 @@ pub fn create_requests() -> Vec<(ConnectRequest, SocketAddr)> { } requests -} \ No newline at end of file +} diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index 19dea5f..9540901 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -14,7 +14,6 @@ use std::io::Cursor; use std::net::SocketAddr; use std::sync::Arc; -use dashmap::DashMap; use indicatif::{ProgressBar, ProgressStyle, ProgressIterator}; use num_format::{Locale, ToFormattedString}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; @@ -25,11 +24,11 @@ use bittorrent_udp::converters::*; use cli_helpers::run_app_with_cli_and_config; -mod announce; +// mod announce; mod common; mod config; mod connect; -mod scrape; +// mod scrape; use common::*; use config::BenchConfig; @@ -39,35 +38,18 @@ use config::BenchConfig; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -fn print_results( - request_type: &str, - config: &BenchConfig, - num_items: usize, - duration: Duration, -) { - let per_second = ( - ((config.num_threads as f64 * num_items as f64) / (duration.as_micros() as f64 / 1000000.0) - ) as usize).to_formatted_string(&Locale::se); - - let time_per_request = duration.as_nanos() as f64 / (config.num_threads as f64 * num_items as f64); - - println!( - "{} {:>10} requests/second, {:>8.2} ns/request", - request_type, - per_second, - time_per_request, - ); -} - - fn main(){ run_app_with_cli_and_config::( "aquatic benchmarker", - run + connect::bench_connect_handler ) } + + +/* + fn run(bench_config: BenchConfig){ let mut connect_data = (0usize, Duration::new(0, 0)); let mut announce_data = (0usize, Duration::new(0, 0)); @@ -94,7 +76,7 @@ fn run(bench_config: BenchConfig){ let requests = Arc::new(requests); - let pb = create_progress_bar("Connect handler", bench_config.num_rounds); + let pb = create_progress_bar("Connect handler", bench_config.num_rounds as u64); for _ in (0..bench_config.num_rounds).progress_with(pb){ let state = State::new(); @@ -264,4 +246,6 @@ fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { let style = ProgressStyle::default_bar().template(&t); ProgressBar::new(iterations).with_style(style) -} \ No newline at end of file +} + +*/ \ No newline at end of file