benchmark byte conversion as well; bittorrent_udp: use buffers

This commit is contained in:
Joakim Frostegård 2020-04-07 19:49:29 +02:00
parent 51590a3d6c
commit a99688a5a2
11 changed files with 231 additions and 106 deletions

View file

@ -16,6 +16,7 @@ name = "plot_pareto"
[dependencies]
aquatic = { path = "../aquatic" }
bittorrent_udp = { path = "../bittorrent_udp" }
indicatif = "0.14"
mimalloc = { version = "0.1", default-features = false }
num-format = "0.4"

View file

@ -1,3 +1,4 @@
use std::io::Cursor;
use std::time::Instant;
use std::net::SocketAddr;
@ -8,6 +9,7 @@ use aquatic::handlers::*;
use aquatic::common::*;
use aquatic::config::Config;
use aquatic_bench::*;
use bittorrent_udp::converters::*;
use crate::common::*;
@ -18,47 +20,59 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000;
pub fn bench(
state: &State,
config: &Config,
requests: Vec<(AnnounceRequest, SocketAddr)>,
requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>
) -> (f64, f64) {
let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS);
let mut requests = requests;
let requests = requests.drain(..);
let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(buffer.as_mut());
let mut num_responses: usize = 0;
let mut dummy = 0u8;
let now = Instant::now();
let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.into_iter()
.map(|(request_bytes, src)| {
if let Request::Announce(r) = request_from_bytes(&request_bytes, 255).unwrap() {
(r, src)
} else {
unreachable!()
}
})
.collect();
let requests = requests.drain(..);
handle_announce_requests(
&state,
config,
&mut responses,
requests,
);
for (response, _) in responses.drain(..) {
if let Response::Announce(_) = response {
num_responses += 1;
}
cursor.set_position(0);
response_to_bytes(&mut cursor, response, IpVersion::IPv4);
dummy ^= cursor.get_ref()[0];
}
let duration = Instant::now() - now;
let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0);
let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0);
let time_per_request = duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64;
// println!("\nrequests/second: {:.2}", requests_per_second);
// println!("time per request: {:.2}ns", time_per_request);
assert_eq!(num_responses, ANNOUNCE_REQUESTS);
// let mut total_num_peers = 0.0f64;
let mut num_responses: usize = 0;
for (response, _src) in responses.drain(..) {
if let Response::Announce(_response) = response {
// let n = response.peers.len() as f64;
// total_num_peers += n;
num_responses += 1;
}
if dummy == 123u8 {
println!("dummy info");
}
if num_responses != ANNOUNCE_REQUESTS {
println!("ERROR: only {} responses received", num_responses);
}
// println!("avg num peers returned: {:.2}", total_num_peers / ANNOUNCE_REQUESTS as f64);
(requests_per_second, time_per_request)
}

View file

@ -1,2 +1,5 @@
pub const PARETO_SHAPE: f64 = 0.1;
pub const NUM_INFO_HASHES: usize = 10_000;
pub const NUM_INFO_HASHES: usize = 10_000;
/// Save memory by not allocating more per request
pub const MAX_REQUEST_BYTES: usize = 256;

View file

@ -1,3 +1,4 @@
use std::io::Cursor;
use std::time::Instant;
use std::net::SocketAddr;
@ -5,51 +6,82 @@ use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng};
use aquatic::common::*;
use aquatic::handlers::handle_connect_requests;
use bittorrent_udp::converters::*;
use crate::common::*;
const ITERATIONS: usize = 10_000_000;
pub fn bench(
requests: Vec<(ConnectRequest, SocketAddr)>
requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>
) -> (f64, f64){
let state = State::new();
let mut responses = Vec::with_capacity(ITERATIONS);
let mut requests = requests;
let requests = requests.drain(..);
let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(buffer.as_mut());
let mut num_responses: usize = 0;
let mut dummy = 0u8;
let now = Instant::now();
let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.into_iter()
.map(|(request_bytes, src)| {
if let Request::Connect(r) = request_from_bytes(&request_bytes, 255).unwrap() {
(r, src)
} else {
unreachable!()
}
})
.collect();
let requests = requests.drain(..);
handle_connect_requests(&state, &mut responses, requests);
for (response, _) in responses.drain(..){
if let Response::Connect(_) = response {
num_responses += 1;
}
cursor.set_position(0);
response_to_bytes(&mut cursor, response, IpVersion::IPv4);
dummy ^= cursor.get_ref()[0];
}
let duration = Instant::now() - now;
let requests_per_second = ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0);
let requests_per_second = ITERATIONS as f64 / (duration.as_micros() as f64 / 1000000.0);
let time_per_request = duration.as_nanos() as f64 / ITERATIONS as f64;
assert_eq!(num_responses, ITERATIONS);
// println!("\nrequests/second: {:.2}", requests_per_second);
// println!("time per request: {:.2}ns", time_per_request);
/*
let mut dummy = 0usize;
let mut num_responses: usize = 0;
for (response, _src) in responses {
if let Response::Connect(response) = response {
if response.connection_id.0 > 0 {
dummy += 1;
}
num_responses += 1;
}
}
if num_responses != ITERATIONS {
println!("ERROR: only {} responses received", num_responses);
}
if dummy == ITERATIONS {
println!("dummy test output: {}", dummy);
}
*/
if dummy == 123u8 {
println!("dummy info");
}
(requests_per_second, time_per_request)
}

View file

@ -2,14 +2,16 @@
//!
//! Example summary output:
//! ```
//! ## Average results over 100 rounds
//! ## Average results over 50 rounds
//!
//! Connect handler: 3 459 722 requests/second, 289.22 ns/request
//! Announce handler: 390 674 requests/second, 2568.55 ns/request
//! Scrape handler: 1 039 374 requests/second, 963.02 ns/request
//! Connect handler: 2 514 978 requests/second, 397.87 ns/request
//! Announce handler: 246 744 requests/second, 4054.58 ns/request
//! Scrape handler: 499 385 requests/second, 2007.23 ns/request
//! ```
use std::time::{Duration, Instant};
use std::io::Cursor;
use std::net::SocketAddr;
use indicatif::{ProgressBar, ProgressStyle, ProgressIterator};
use num_format::{Locale, ToFormattedString};
@ -17,6 +19,7 @@ use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng};
use aquatic::common::*;
use aquatic::config::Config;
use bittorrent_udp::converters::*;
mod announce;
@ -24,6 +27,8 @@ mod common;
mod connect;
mod scrape;
use common::*;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
@ -46,7 +51,7 @@ macro_rules! print_results {
fn main(){
let num_rounds = 100;
let num_rounds = 50;
let mut connect_data = (0.0, 0.0);
let mut announce_data = (0.0, 0.0);
@ -64,12 +69,30 @@ fn main(){
{
let requests = connect::create_requests();
let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter()
.map(|(request, src)| {
let mut buffer = [0u8; MAX_REQUEST_BYTES];
let mut cursor = Cursor::new(buffer.as_mut());
request_to_bytes(&mut cursor, Request::Connect(request));
(buffer, src)
})
.collect();
::std::thread::sleep(Duration::from_secs(1));
let pb = create_progress_bar("Connect handler", num_rounds);
for _ in (0..num_rounds).progress_with(pb){
let d = connect::bench(requests.clone());
let requests = requests.clone();
::std::thread::sleep(Duration::from_millis(200));
let d = connect::bench(requests);
::std::thread::sleep(Duration::from_millis(200));
connect_data.0 += d.0;
connect_data.1 += d.1;
}
@ -85,6 +108,30 @@ fn main(){
&info_hashes
);
let state = State::new();
let time = Time(Instant::now());
for (request, src) in requests.iter() {
let key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: *src,
};
state.connections.insert(key, time);
}
let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter()
.map(|(request, src)| {
let mut buffer = [0u8; MAX_REQUEST_BYTES];
let mut cursor = Cursor::new(buffer.as_mut());
request_to_bytes(&mut cursor, Request::Announce(request));
(buffer, src)
})
.collect();
let mut state_for_scrape = State::new();
::std::thread::sleep(Duration::from_secs(1));
@ -92,20 +139,14 @@ fn main(){
let pb = create_progress_bar("Announce handler", num_rounds);
for round in (0..num_rounds).progress_with(pb) {
let state = State::new();
let requests = requests.clone();
let time = Time(Instant::now());
::std::thread::sleep(Duration::from_millis(200));
for (request, src) in requests.iter() {
let key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: *src,
};
let d = announce::bench(&state, &config, requests);
state.connections.insert(key, time);
}
::std::thread::sleep(Duration::from_millis(200));
let d = announce::bench(&state, &config, requests.clone());
announce_data.0 += d.0;
announce_data.1 += d.1;
@ -135,12 +176,30 @@ fn main(){
state.connections.insert(key, time);
}
let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter()
.map(|(request, src)| {
let mut buffer = [0u8; MAX_REQUEST_BYTES];
let mut cursor = Cursor::new(buffer.as_mut());
request_to_bytes(&mut cursor, Request::Scrape(request));
(buffer, src)
})
.collect();
::std::thread::sleep(Duration::from_secs(1));
let pb = create_progress_bar("Scrape handler", num_rounds);
for _ in (0..num_rounds).progress_with(pb) {
let d = scrape::bench(&state, requests.clone());
let requests = requests.clone();
::std::thread::sleep(Duration::from_millis(200));
let d = scrape::bench(&state, requests);
::std::thread::sleep(Duration::from_millis(200));
scrape_data.0 += d.0;
scrape_data.1 += d.1;
}

View file

@ -1,5 +1,6 @@
use std::time::Instant;
use std::io::Cursor;
use std::net::SocketAddr;
use std::time::Instant;
use rand::Rng;
use rand_distr::Pareto;
@ -7,6 +8,7 @@ use rand_distr::Pareto;
use aquatic::handlers::*;
use aquatic::common::*;
use aquatic_bench::*;
use bittorrent_udp::converters::*;
use crate::common::*;
@ -17,48 +19,58 @@ const SCRAPE_NUM_HASHES: usize = 10;
pub fn bench(
state: &State,
requests: Vec<(ScrapeRequest, SocketAddr)>,
requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)>
) -> (f64, f64) {
let mut responses = Vec::with_capacity(SCRAPE_REQUESTS);
let mut scrape_requests = requests;
let scrape_requests = scrape_requests.drain(..);
let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut cursor = Cursor::new(buffer.as_mut());
let mut num_responses: usize = 0;
let mut dummy = 0u8;
let now = Instant::now();
let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.into_iter()
.map(|(request_bytes, src)| {
if let Request::Scrape(r) = request_from_bytes(&request_bytes, 255).unwrap() {
(r, src)
} else {
unreachable!()
}
})
.collect();
let requests = requests.drain(..);
handle_scrape_requests(
&state,
&mut responses,
scrape_requests,
requests,
);
for (response, _src) in responses.drain(..){
if let Response::Scrape(_) = response {
num_responses += 1;
}
cursor.set_position(0);
response_to_bytes(&mut cursor, response, IpVersion::IPv4);
dummy ^= cursor.get_ref()[0];
}
let duration = Instant::now() - now;
let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0);
let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0);
let time_per_request = duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64;
// println!("\nrequests/second: {:.2}", requests_per_second);
// println!("time per request: {:.2}ns", time_per_request);
assert_eq!(num_responses, SCRAPE_REQUESTS);
// let mut total_num_peers = 0.0f64;
let mut num_responses: usize = 0;
for (response, _src) in responses.drain(..){
if let Response::Scrape(_response) = response {
// for stats in response.torrent_stats {
// total_num_peers += f64::from(stats.seeders.0);
// total_num_peers += f64::from(stats.leechers.0);
// }
num_responses += 1;
}
if dummy == 123u8 {
println!("dummy info");
}
if num_responses != SCRAPE_REQUESTS {
println!("ERROR: only {} responses received", num_responses);
}
// println!("avg num peers reported: {:.2}", total_num_peers / (SCRAPE_REQUESTS as f64 * SCRAPE_NUM_HASHES as f64));
(requests_per_second, time_per_request)
}