bench: add pareto plot util; use other shape in bench; refactor

This commit is contained in:
Joakim Frostegård 2020-04-05 16:00:48 +02:00
parent 5d462f808d
commit ab457aaf82
7 changed files with 418 additions and 116 deletions

View file

@ -17,12 +17,16 @@ name = "bench_connect"
[[bin]]
name = "bench_announce_scrape"
[[bin]]
name = "plot_pareto"
[dependencies]
bittorrent_udp = { path = "../bittorrent_udp" }
dashmap = "3"
indexmap = "1"
net2 = "0.2"
rand_distr = "0.2"
plotly = "0.4"
[dependencies.mio]
version = "0.7"

View file

@ -1,18 +1,23 @@
//! Benchmark announce and scrape handlers
use std::time::{Duration, Instant};
use std::net::SocketAddr;
use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng};
use rand_distr::Pareto;
use aquatic::bench_utils::*;
use aquatic::common::*;
use aquatic::handler::*;
const PARETO_SHAPE: f64 = 3.0;
const ANNOUNCE_ITERATIONS: usize = 500_000;
const SCRAPE_ITERATIONS: usize = 500_000;
const PARETO_SHAPE: f64 = 0.1;
const NUM_INFO_HASHES: usize = 10_000;
const ANNOUNCE_REQUESTS: usize = 100_000;
const SCRAPE_REQUESTS: usize = 1_000_000;
const SCRAPE_NUM_HASHES: usize = 10;
const NUM_INFO_HASHES: usize = 500_000;
fn main(){
@ -20,115 +25,145 @@ fn main(){
let info_hashes = create_info_hashes(&mut rng);
let state = State::new();
{
println!("benchmark: handle_announce_requests\n");
println!("generating data..");
let mut responses = Vec::with_capacity(ANNOUNCE_ITERATIONS);
let mut announce_requests = create_announce_requests(&mut rng, &info_hashes);
let time = Time(Instant::now());
for (request, src) in announce_requests.iter() {
let key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: *src,
};
state.connections.insert(key, time);
}
let announce_requests = announce_requests.drain(..);
::std::thread::sleep(Duration::from_secs(1));
let now = Instant::now();
println!("running benchmark..");
handle_announce_requests(
&state,
&mut responses,
announce_requests,
);
let duration = Instant::now() - now;
println!("\nrequests/second: {:.2}", ANNOUNCE_ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0));
println!("time per request: {:.2}ns", duration.as_nanos() as f64 / ANNOUNCE_ITERATIONS as f64);
let mut total_num_peers = 0.0f64;
let mut max_num_peers = 0.0f64;
for (response, _src) in responses.drain(..) {
if let Response::Announce(response) = response {
let n = response.peers.len() as f64;
total_num_peers += n;
max_num_peers = max_num_peers.max(n);
}
}
println!("avg num peers returned: {:.2}", total_num_peers / ANNOUNCE_ITERATIONS as f64);
println!("max num peers returned: {:.2}", max_num_peers);
}
bench_announce(&mut rng, &state, &info_hashes);
state.connections.clear();
{
println!("\n\nbenchmark: handle_scrape_requests\n");
println!("\n");
println!("generating data..");
::std::thread::sleep(Duration::from_secs(1));
let mut responses = Vec::with_capacity(SCRAPE_ITERATIONS);
bench_scrape(&mut rng, &state, &info_hashes);
}
let mut scrape_requests = create_scrape_requests(&mut rng, &info_hashes);
let time = Time(Instant::now());
fn bench_announce(
rng: &mut impl Rng,
state: &State,
info_hashes: &Vec<InfoHash>
){
println!("# benchmark: handle_announce_requests\n");
for (request, src) in scrape_requests.iter() {
let key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: *src,
};
println!("generating data..");
state.connections.insert(key, time);
}
let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS);
let scrape_requests = scrape_requests.drain(..);
let mut announce_requests = create_announce_requests(rng, &info_hashes);
::std::thread::sleep(Duration::from_secs(1));
let time = Time(Instant::now());
let now = Instant::now();
for (request, src) in announce_requests.iter() {
let key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: *src,
};
println!("running benchmark..");
handle_scrape_requests(
&state,
&mut responses,
scrape_requests,
);
let duration = Instant::now() - now;
println!("\nrequests/second: {:.2}", SCRAPE_ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0));
println!("time per request: {:.2}ns", duration.as_nanos() as f64 / SCRAPE_ITERATIONS as f64);
let mut total_num_peers = 0.0f64;
for (response, _src) in responses.drain(..){
if let Response::Scrape(response) = response {
for stats in response.torrent_stats {
total_num_peers += f64::from(stats.seeders.0);
total_num_peers += f64::from(stats.leechers.0);
}
}
}
println!("avg num peers reported: {:.2}", total_num_peers / (SCRAPE_ITERATIONS as f64 * SCRAPE_NUM_HASHES as f64));
state.connections.insert(key, time);
}
let announce_requests = announce_requests.drain(..);
::std::thread::sleep(Duration::from_secs(1));
let now = Instant::now();
println!("running benchmark..");
handle_announce_requests(
&state,
&mut responses,
announce_requests,
);
let duration = Instant::now() - now;
println!("\nrequests/second: {:.2}", ANNOUNCE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0));
println!("time per request: {:.2}ns", duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64);
let mut total_num_peers = 0.0f64;
let mut max_num_peers = 0.0f64;
let mut num_responses: usize = 0;
for (response, _src) in responses.drain(..) {
if let Response::Announce(response) = response {
let n = response.peers.len() as f64;
total_num_peers += n;
max_num_peers = max_num_peers.max(n);
num_responses += 1;
}
}
if num_responses != ANNOUNCE_REQUESTS {
println!("ERROR: only {} responses received", num_responses);
}
println!("avg num peers returned: {:.2}", total_num_peers / ANNOUNCE_REQUESTS as f64);
println!("max num peers returned: {:.2}", max_num_peers);
}
fn bench_scrape(
rng: &mut impl Rng,
state: &State,
info_hashes: &Vec<InfoHash>
){
println!("# benchmark: handle_scrape_requests\n");
println!("generating data..");
let mut responses = Vec::with_capacity(SCRAPE_REQUESTS);
let mut scrape_requests = create_scrape_requests(rng, &info_hashes);
let time = Time(Instant::now());
for (request, src) in scrape_requests.iter() {
let key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: *src,
};
state.connections.insert(key, time);
}
let scrape_requests = scrape_requests.drain(..);
::std::thread::sleep(Duration::from_secs(1));
let now = Instant::now();
println!("running benchmark..");
handle_scrape_requests(
&state,
&mut responses,
scrape_requests,
);
let duration = Instant::now() - now;
println!("\nrequests/second: {:.2}", SCRAPE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0));
println!("time per request: {:.2}ns", duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64);
let mut total_num_peers = 0.0f64;
let mut num_responses: usize = 0;
for (response, _src) in responses.drain(..){
if let Response::Scrape(response) = response {
for stats in response.torrent_stats {
total_num_peers += f64::from(stats.seeders.0);
total_num_peers += f64::from(stats.leechers.0);
}
num_responses += 1;
}
}
if num_responses != SCRAPE_REQUESTS {
println!("ERROR: only {} responses received", num_responses);
}
println!("avg num peers reported: {:.2}", total_num_peers / (SCRAPE_REQUESTS as f64 * SCRAPE_NUM_HASHES as f64));
}
@ -142,7 +177,7 @@ fn create_announce_requests(
let mut requests = Vec::new();
for _ in 0..ANNOUNCE_ITERATIONS {
for _ in 0..ANNOUNCE_REQUESTS {
let info_hash_index = pareto_usize(rng, pareto, max_index);
let request = AnnounceRequest {
@ -169,7 +204,6 @@ fn create_announce_requests(
}
fn create_scrape_requests(
rng: &mut impl Rng,
info_hashes: &Vec<InfoHash>
@ -180,7 +214,7 @@ fn create_scrape_requests(
let mut requests = Vec::new();
for _ in 0..SCRAPE_ITERATIONS {
for _ in 0..SCRAPE_REQUESTS {
let mut request_info_hashes = Vec::new();
for _ in 0..SCRAPE_NUM_HASHES {
@ -211,16 +245,4 @@ fn create_info_hashes(rng: &mut impl Rng) -> Vec<InfoHash> {
}
info_hashes
}
fn pareto_usize(
rng: &mut impl Rng,
pareto: Pareto<f64>,
max: usize,
) -> usize {
let p: f64 = rng.sample(pareto);
let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize
}

View file

@ -0,0 +1,44 @@
use plotly::{Plot, Scatter, Layout};
use plotly::common::Title;
use plotly::layout::Axis;
use rand::{thread_rng, rngs::SmallRng, SeedableRng};
use rand_distr::Pareto;
use aquatic::bench_utils::pareto_usize;
fn main(){
let mut plot = Plot::new();
let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
const LEN: usize = 1_000;
const MAX_VAL: usize = LEN - 1;
for pareto_shape in [0.1, 0.2, 0.3, 0.4, 0.5].iter() {
let pareto = Pareto::new(1.0, *pareto_shape).unwrap();
let mut y_axis = [0; LEN];
for _ in 1..1_000_000 {
let index = pareto_usize(&mut rng, pareto, MAX_VAL);
y_axis[index] += 1;
}
let x_axis: Vec<usize> = (0..MAX_VAL).into_iter().collect();
let trace = Scatter::new(x_axis, y_axis.to_vec())
.name(&format!("pareto shape = {}", pareto_shape));
plot.add_trace(trace);
}
let layout = Layout::new()
.title(Title::new("Pareto distribution"))
.xaxis(Axis::new().title(Title::new("Info hash index")))
.yaxis(Axis::new().title(Title::new("Num requests")));
plot.set_layout(layout);
plot.show();
}

View file

@ -0,0 +1,14 @@
use rand::Rng;
use rand_distr::Pareto;
pub fn pareto_usize(
rng: &mut impl Rng,
pareto: Pareto<f64>,
max: usize,
) -> usize {
let p: f64 = rng.sample(pareto);
let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize
}

View file

@ -118,7 +118,15 @@ pub fn handle_scrape_requests(
){
let empty_stats = create_torrent_scrape_statistics(0, 0);
responses.extend(requests.map(|(request, src)| {
responses.extend(requests.filter_map(|(request, src)| {
let connection_key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: src,
};
if !state.connections.contains_key(&connection_key){
return None;
}
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(256);
for info_hash in request.info_hashes.iter() {
@ -137,7 +145,7 @@ pub fn handle_scrape_requests(
torrent_stats: stats,
});
(response, src)
Some((response, src))
}));
}

View file

@ -1,5 +1,6 @@
use std::time::Duration;
pub mod bench_utils;
pub mod common;
pub mod handler;
pub mod network;