diff --git a/Cargo.lock b/Cargo.lock index 0fee5ea..de9e853 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,12 +27,56 @@ dependencies = [ "indexmap", "mio", "net2", + "plotly", "quickcheck", "quickcheck_macros", "rand", "rand_distr", ] +[[package]] +name = "askama" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a1fb9e41eb366cbcd267da2094be5b7e62fdbca9f82091e7503e80f885050d" +dependencies = [ + "askama_derive", + "askama_escape", + "askama_shared", +] + +[[package]] +name = "askama_derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1012c270085fa35ece6a48a569544fde85b6d9ee41074c7b706cc912a03f939" +dependencies = [ + "askama_shared", + "nom", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "askama_escape" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a577aeba5fec1aafb9f195d98cfcc38a78b588e4ebf9b15f62ca1c7aa33795a" + +[[package]] +name = "askama_shared" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee517f4e33c27b129928e71d8a044d54c513e72e0b72ec5c4f5f1823e9de353" +dependencies = [ + "askama_escape", + "humansize", + "num-traits", + "serde", + "toml", +] + [[package]] name = "autocfg" version = "1.0.0" @@ -119,6 +163,12 @@ dependencies = [ "libc", ] +[[package]] +name = "humansize" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cab2627acfc432780848602f3f558f7e9dd427352224b0d9324025796d2a5e" + [[package]] name = "indexmap" version = "1.3.2" @@ -128,6 +178,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "itoa" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8b7a7c0c47db5545ed3fef7468ee7bb5b74691498139e4b3f6a20685dc6dd8e" + [[package]] name = "lazy_static" version = "1.4.0" @@ -190,6 +246,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nom" +version = "5.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b471253da97532da4b61552249c521e01e736071f71c1a4f7ebbfbf0a06aad6" +dependencies = [ + "memchr", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.3" @@ -199,6 +265,83 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6ea62e9d81a77cd3ee9a2a5b9b609447857f3d358704331e4ef39eb247fcba" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfb0800a0291891dd9f4fe7bd9c19384f98f7fbe0cd0f39a2c6b88b9868bbc00" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.12.0" @@ -209,6 +352,20 @@ dependencies = [ "libc", ] +[[package]] +name = "plotly" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2734cda9d5551d513e535acacd6c0016c2631a55f1cdbf32bc1a0bd675db1ac" +dependencies = [ + "askama", + "num", + "rand", + "rand_distr", + "serde", + "serde_json", +] + [[package]] name = "ppv-lite86" version = "0.2.6" @@ -346,6 +503,43 @@ version = "0.6.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" +[[package]] +name = "ryu" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76" + +[[package]] +name = "serde" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da07b57ee2623368351e9a0488bb0b261322a15a6e0ae53e243cbdc0f4208da9" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "socket2" version = "0.3.12" @@ -378,12 +572,27 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "toml" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" +dependencies = [ + "serde", +] + [[package]] name = "unicode-xid" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "version_check" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078775d0255232fb988e6fccf26ddc9d1ac274299aaedcedce21c6f72cc533ce" + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index d40093c..3700afb 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -17,12 +17,16 @@ name = "bench_connect" [[bin]] name = "bench_announce_scrape" +[[bin]] +name = "plot_pareto" + [dependencies] bittorrent_udp = { path = "../bittorrent_udp" } dashmap = "3" indexmap = "1" net2 = "0.2" rand_distr = "0.2" +plotly = "0.4" [dependencies.mio] version = "0.7" diff --git a/aquatic/src/bin/bench_announce_scrape.rs b/aquatic/src/bin/bench_announce_scrape.rs index 8696649..c4cfad2 100644 --- a/aquatic/src/bin/bench_announce_scrape.rs +++ b/aquatic/src/bin/bench_announce_scrape.rs @@ -1,18 +1,23 @@ +//! Benchmark announce and scrape handlers + use std::time::{Duration, Instant}; use std::net::SocketAddr; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use rand_distr::Pareto; +use aquatic::bench_utils::*; use aquatic::common::*; use aquatic::handler::*; -const PARETO_SHAPE: f64 = 3.0; -const ANNOUNCE_ITERATIONS: usize = 500_000; -const SCRAPE_ITERATIONS: usize = 500_000; +const PARETO_SHAPE: f64 = 0.1; +const NUM_INFO_HASHES: usize = 10_000; + +const ANNOUNCE_REQUESTS: usize = 100_000; + +const SCRAPE_REQUESTS: usize = 1_000_000; const SCRAPE_NUM_HASHES: usize = 10; -const NUM_INFO_HASHES: usize = 500_000; fn main(){ @@ -20,115 +25,145 @@ fn main(){ let info_hashes = create_info_hashes(&mut rng); let state = State::new(); - { - println!("benchmark: handle_announce_requests\n"); - - println!("generating data.."); - - let mut responses = Vec::with_capacity(ANNOUNCE_ITERATIONS); - - let mut announce_requests = create_announce_requests(&mut rng, &info_hashes); - - let time = Time(Instant::now()); - - for (request, src) in announce_requests.iter() { - let key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; - - state.connections.insert(key, time); - } - - let announce_requests = announce_requests.drain(..); - - ::std::thread::sleep(Duration::from_secs(1)); - - let now = Instant::now(); - - println!("running benchmark.."); - - handle_announce_requests( - &state, - &mut responses, - announce_requests, - ); - - let duration = Instant::now() - now; - - println!("\nrequests/second: {:.2}", ANNOUNCE_ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0)); - println!("time per request: {:.2}ns", duration.as_nanos() as f64 / ANNOUNCE_ITERATIONS as f64); - - let mut total_num_peers = 0.0f64; - let mut max_num_peers = 0.0f64; - - for (response, _src) in responses.drain(..) { - if let Response::Announce(response) = response { - let n = response.peers.len() as f64; - - total_num_peers += n; - max_num_peers = max_num_peers.max(n); - } - } - - println!("avg num peers returned: {:.2}", total_num_peers / ANNOUNCE_ITERATIONS as f64); - println!("max num peers returned: {:.2}", max_num_peers); - } + bench_announce(&mut rng, &state, &info_hashes); state.connections.clear(); - { - println!("\n\nbenchmark: handle_scrape_requests\n"); + println!("\n"); - println!("generating data.."); + ::std::thread::sleep(Duration::from_secs(1)); - let mut responses = Vec::with_capacity(SCRAPE_ITERATIONS); + bench_scrape(&mut rng, &state, &info_hashes); +} - let mut scrape_requests = create_scrape_requests(&mut rng, &info_hashes); - let time = Time(Instant::now()); +fn bench_announce( + rng: &mut impl Rng, + state: &State, + info_hashes: &Vec +){ + println!("# benchmark: handle_announce_requests\n"); - for (request, src) in scrape_requests.iter() { - let key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; + println!("generating data.."); - state.connections.insert(key, time); - } + let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); - let scrape_requests = scrape_requests.drain(..); + let mut announce_requests = create_announce_requests(rng, &info_hashes); - ::std::thread::sleep(Duration::from_secs(1)); + let time = Time(Instant::now()); - let now = Instant::now(); + for (request, src) in announce_requests.iter() { + let key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: *src, + }; - println!("running benchmark.."); - - handle_scrape_requests( - &state, - &mut responses, - scrape_requests, - ); - - let duration = Instant::now() - now; - - println!("\nrequests/second: {:.2}", SCRAPE_ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0)); - println!("time per request: {:.2}ns", duration.as_nanos() as f64 / SCRAPE_ITERATIONS as f64); - - let mut total_num_peers = 0.0f64; - - for (response, _src) in responses.drain(..){ - if let Response::Scrape(response) = response { - for stats in response.torrent_stats { - total_num_peers += f64::from(stats.seeders.0); - total_num_peers += f64::from(stats.leechers.0); - } - } - } - - println!("avg num peers reported: {:.2}", total_num_peers / (SCRAPE_ITERATIONS as f64 * SCRAPE_NUM_HASHES as f64)); + state.connections.insert(key, time); } + + let announce_requests = announce_requests.drain(..); + + ::std::thread::sleep(Duration::from_secs(1)); + + let now = Instant::now(); + + println!("running benchmark.."); + + handle_announce_requests( + &state, + &mut responses, + announce_requests, + ); + + let duration = Instant::now() - now; + + println!("\nrequests/second: {:.2}", ANNOUNCE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0)); + println!("time per request: {:.2}ns", duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64); + + let mut total_num_peers = 0.0f64; + let mut max_num_peers = 0.0f64; + let mut num_responses: usize = 0; + + for (response, _src) in responses.drain(..) { + if let Response::Announce(response) = response { + let n = response.peers.len() as f64; + + total_num_peers += n; + max_num_peers = max_num_peers.max(n); + num_responses += 1; + } + } + + if num_responses != ANNOUNCE_REQUESTS { + println!("ERROR: only {} responses received", num_responses); + } + + println!("avg num peers returned: {:.2}", total_num_peers / ANNOUNCE_REQUESTS as f64); + println!("max num peers returned: {:.2}", max_num_peers); +} + + +fn bench_scrape( + rng: &mut impl Rng, + state: &State, + info_hashes: &Vec +){ + println!("# benchmark: handle_scrape_requests\n"); + println!("generating data.."); + + let mut responses = Vec::with_capacity(SCRAPE_REQUESTS); + + let mut scrape_requests = create_scrape_requests(rng, &info_hashes); + + let time = Time(Instant::now()); + + for (request, src) in scrape_requests.iter() { + let key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: *src, + }; + + state.connections.insert(key, time); + } + + let scrape_requests = scrape_requests.drain(..); + + ::std::thread::sleep(Duration::from_secs(1)); + + let now = Instant::now(); + + println!("running benchmark.."); + + handle_scrape_requests( + &state, + &mut responses, + scrape_requests, + ); + + let duration = Instant::now() - now; + + println!("\nrequests/second: {:.2}", SCRAPE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0)); + println!("time per request: {:.2}ns", duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64); + + let mut total_num_peers = 0.0f64; + let mut num_responses: usize = 0; + + for (response, _src) in responses.drain(..){ + if let Response::Scrape(response) = response { + for stats in response.torrent_stats { + total_num_peers += f64::from(stats.seeders.0); + total_num_peers += f64::from(stats.leechers.0); + } + + num_responses += 1; + } + } + + if num_responses != SCRAPE_REQUESTS { + println!("ERROR: only {} responses received", num_responses); + } + + println!("avg num peers reported: {:.2}", total_num_peers / (SCRAPE_REQUESTS as f64 * SCRAPE_NUM_HASHES as f64)); } @@ -142,7 +177,7 @@ fn create_announce_requests( let mut requests = Vec::new(); - for _ in 0..ANNOUNCE_ITERATIONS { + for _ in 0..ANNOUNCE_REQUESTS { let info_hash_index = pareto_usize(rng, pareto, max_index); let request = AnnounceRequest { @@ -169,7 +204,6 @@ fn create_announce_requests( } - fn create_scrape_requests( rng: &mut impl Rng, info_hashes: &Vec @@ -180,7 +214,7 @@ fn create_scrape_requests( let mut requests = Vec::new(); - for _ in 0..SCRAPE_ITERATIONS { + for _ in 0..SCRAPE_REQUESTS { let mut request_info_hashes = Vec::new(); for _ in 0..SCRAPE_NUM_HASHES { @@ -211,16 +245,4 @@ fn create_info_hashes(rng: &mut impl Rng) -> Vec { } info_hashes -} - - -fn pareto_usize( - rng: &mut impl Rng, - pareto: Pareto, - max: usize, -) -> usize { - let p: f64 = rng.sample(pareto); - let p = (p.min(101.0f64) - 1.0) / 100.0; - - (p * max as f64) as usize } \ No newline at end of file diff --git a/aquatic/src/bin/plot_pareto.rs b/aquatic/src/bin/plot_pareto.rs new file mode 100644 index 0000000..f598f95 --- /dev/null +++ b/aquatic/src/bin/plot_pareto.rs @@ -0,0 +1,44 @@ +use plotly::{Plot, Scatter, Layout}; +use plotly::common::Title; +use plotly::layout::Axis; +use rand::{thread_rng, rngs::SmallRng, SeedableRng}; +use rand_distr::Pareto; + +use aquatic::bench_utils::pareto_usize; + + +fn main(){ + let mut plot = Plot::new(); + let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); + + const LEN: usize = 1_000; + const MAX_VAL: usize = LEN - 1; + + for pareto_shape in [0.1, 0.2, 0.3, 0.4, 0.5].iter() { + let pareto = Pareto::new(1.0, *pareto_shape).unwrap(); + + let mut y_axis = [0; LEN]; + + for _ in 1..1_000_000 { + let index = pareto_usize(&mut rng, pareto, MAX_VAL); + + y_axis[index] += 1; + } + + let x_axis: Vec = (0..MAX_VAL).into_iter().collect(); + + let trace = Scatter::new(x_axis, y_axis.to_vec()) + .name(&format!("pareto shape = {}", pareto_shape)); + + plot.add_trace(trace); + } + + let layout = Layout::new() + .title(Title::new("Pareto distribution")) + .xaxis(Axis::new().title(Title::new("Info hash index"))) + .yaxis(Axis::new().title(Title::new("Num requests"))); + + plot.set_layout(layout); + + plot.show(); +} diff --git a/aquatic/src/lib/bench_utils.rs b/aquatic/src/lib/bench_utils.rs new file mode 100644 index 0000000..1cc91c8 --- /dev/null +++ b/aquatic/src/lib/bench_utils.rs @@ -0,0 +1,14 @@ +use rand::Rng; +use rand_distr::Pareto; + + +pub fn pareto_usize( + rng: &mut impl Rng, + pareto: Pareto, + max: usize, +) -> usize { + let p: f64 = rng.sample(pareto); + let p = (p.min(101.0f64) - 1.0) / 100.0; + + (p * max as f64) as usize +} \ No newline at end of file diff --git a/aquatic/src/lib/handler.rs b/aquatic/src/lib/handler.rs index 7b6f35e..e57198a 100644 --- a/aquatic/src/lib/handler.rs +++ b/aquatic/src/lib/handler.rs @@ -118,7 +118,15 @@ pub fn handle_scrape_requests( ){ let empty_stats = create_torrent_scrape_statistics(0, 0); - responses.extend(requests.map(|(request, src)| { + responses.extend(requests.filter_map(|(request, src)| { + let connection_key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: src, + }; + + if !state.connections.contains_key(&connection_key){ + return None; + } let mut stats: Vec = Vec::with_capacity(256); for info_hash in request.info_hashes.iter() { @@ -137,7 +145,7 @@ pub fn handle_scrape_requests( torrent_stats: stats, }); - (response, src) + Some((response, src)) })); } diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index a1d7761..b974003 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -1,5 +1,6 @@ use std::time::Duration; +pub mod bench_utils; pub mod common; pub mod handler; pub mod network;