diff --git a/Cargo.lock b/Cargo.lock index b33b4ac..d67a7ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,6 +39,7 @@ name = "aquatic_bench" version = "0.1.0" dependencies = [ "aquatic", + "bittorrent_udp", "indicatif", "mimalloc", "num-format", diff --git a/TODO.md b/TODO.md index 5ec79c6..376a988 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ * Stack-allocated vector? * Benchmarks * num_rounds command line argument - * Better black_box (or make sure to consume data) * Send in connect reponse ids to other functions as integration test * Save last results, check if difference is significant? * ProgressBar: `[{elapsed_precise}]` and eta_precise? @@ -15,6 +14,7 @@ * Performance * cpu-target=native good? * mialloc good? + * https://docs.rs/zerocopy/0.3.0/zerocopy/index.html for requests and responses? * bittorrent_udp * ParseError enum maybe, with `Option` * Avoid allocating in conversion to bytes, send in a mutable buffer diff --git a/aquatic/src/lib/network.rs b/aquatic/src/lib/network.rs index 7d4b139..a6b670f 100644 --- a/aquatic/src/lib/network.rs +++ b/aquatic/src/lib/network.rs @@ -1,6 +1,6 @@ use std::sync::atomic::Ordering; use std::net::SocketAddr; -use std::io::ErrorKind; +use std::io::{Cursor, ErrorKind}; use mio::{Events, Poll, Interest, Token}; use mio::net::UdpSocket; @@ -115,7 +115,7 @@ fn handle_readable_socket( let mut responses_sent: usize = 0; loop { - match socket.recv_from(buffer) { + match socket.recv_from(&mut buffer[..]) { Ok((amt, src)) => { let request = request_from_bytes( &buffer[..amt], @@ -179,10 +179,16 @@ fn handle_readable_socket( scrape_requests.drain(..), ); - for (response, src) in responses.drain(..) { - let bytes = response_to_bytes(&response, IpVersion::IPv4); + let mut cursor = Cursor::new(buffer); - match socket.send_to(&bytes[..], src){ + for (response, src) in responses.drain(..) { + cursor.set_position(0); + + response_to_bytes(&mut cursor, response, IpVersion::IPv4); + + let amt = cursor.position() as usize; + + match socket.send_to(&cursor.get_ref()[..amt], src){ Ok(_bytes_sent) => { responses_sent += 1; }, diff --git a/aquatic_bench/Cargo.toml b/aquatic_bench/Cargo.toml index 4b40267..5d704d8 100644 --- a/aquatic_bench/Cargo.toml +++ b/aquatic_bench/Cargo.toml @@ -16,6 +16,7 @@ name = "plot_pareto" [dependencies] aquatic = { path = "../aquatic" } +bittorrent_udp = { path = "../bittorrent_udp" } indicatif = "0.14" mimalloc = { version = "0.1", default-features = false } num-format = "0.4" diff --git a/aquatic_bench/src/bin/bench_handlers/announce.rs b/aquatic_bench/src/bin/bench_handlers/announce.rs index df473ca..89c1535 100644 --- a/aquatic_bench/src/bin/bench_handlers/announce.rs +++ b/aquatic_bench/src/bin/bench_handlers/announce.rs @@ -1,3 +1,4 @@ +use std::io::Cursor; use std::time::Instant; use std::net::SocketAddr; @@ -8,6 +9,7 @@ use aquatic::handlers::*; use aquatic::common::*; use aquatic::config::Config; use aquatic_bench::*; +use bittorrent_udp::converters::*; use crate::common::*; @@ -18,47 +20,59 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000; pub fn bench( state: &State, config: &Config, - requests: Vec<(AnnounceRequest, SocketAddr)>, + requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> ) -> (f64, f64) { let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); - let mut requests = requests; - let requests = requests.drain(..); + + let mut buffer = [0u8; MAX_PACKET_SIZE]; + let mut cursor = Cursor::new(buffer.as_mut()); + let mut num_responses: usize = 0; + let mut dummy = 0u8; let now = Instant::now(); + let mut requests: Vec<(AnnounceRequest, SocketAddr)> = requests.into_iter() + .map(|(request_bytes, src)| { + if let Request::Announce(r) = request_from_bytes(&request_bytes, 255).unwrap() { + (r, src) + } else { + unreachable!() + } + }) + .collect(); + + let requests = requests.drain(..); + handle_announce_requests( &state, config, &mut responses, requests, ); + + for (response, _) in responses.drain(..) { + if let Response::Announce(_) = response { + num_responses += 1; + } + + cursor.set_position(0); + + response_to_bytes(&mut cursor, response, IpVersion::IPv4); + + dummy ^= cursor.get_ref()[0]; + } let duration = Instant::now() - now; - let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0); + let requests_per_second = ANNOUNCE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0); let time_per_request = duration.as_nanos() as f64 / ANNOUNCE_REQUESTS as f64; - // println!("\nrequests/second: {:.2}", requests_per_second); - // println!("time per request: {:.2}ns", time_per_request); + assert_eq!(num_responses, ANNOUNCE_REQUESTS); - // let mut total_num_peers = 0.0f64; - let mut num_responses: usize = 0; - - for (response, _src) in responses.drain(..) { - if let Response::Announce(_response) = response { - // let n = response.peers.len() as f64; - - // total_num_peers += n; - num_responses += 1; - } + if dummy == 123u8 { + println!("dummy info"); } - if num_responses != ANNOUNCE_REQUESTS { - println!("ERROR: only {} responses received", num_responses); - } - - // println!("avg num peers returned: {:.2}", total_num_peers / ANNOUNCE_REQUESTS as f64); - (requests_per_second, time_per_request) } diff --git a/aquatic_bench/src/bin/bench_handlers/common.rs b/aquatic_bench/src/bin/bench_handlers/common.rs index 3f3c413..9ae756b 100644 --- a/aquatic_bench/src/bin/bench_handlers/common.rs +++ b/aquatic_bench/src/bin/bench_handlers/common.rs @@ -1,2 +1,5 @@ pub const PARETO_SHAPE: f64 = 0.1; -pub const NUM_INFO_HASHES: usize = 10_000; \ No newline at end of file +pub const NUM_INFO_HASHES: usize = 10_000; + +/// Save memory by not allocating more per request +pub const MAX_REQUEST_BYTES: usize = 256; \ No newline at end of file diff --git a/aquatic_bench/src/bin/bench_handlers/connect.rs b/aquatic_bench/src/bin/bench_handlers/connect.rs index 205c076..fdf8aa2 100644 --- a/aquatic_bench/src/bin/bench_handlers/connect.rs +++ b/aquatic_bench/src/bin/bench_handlers/connect.rs @@ -1,3 +1,4 @@ +use std::io::Cursor; use std::time::Instant; use std::net::SocketAddr; @@ -5,51 +6,82 @@ use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use aquatic::common::*; use aquatic::handlers::handle_connect_requests; +use bittorrent_udp::converters::*; + +use crate::common::*; const ITERATIONS: usize = 10_000_000; pub fn bench( - requests: Vec<(ConnectRequest, SocketAddr)> + requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> ) -> (f64, f64){ let state = State::new(); let mut responses = Vec::with_capacity(ITERATIONS); - let mut requests = requests; - let requests = requests.drain(..); + + let mut buffer = [0u8; MAX_PACKET_SIZE]; + let mut cursor = Cursor::new(buffer.as_mut()); + let mut num_responses: usize = 0; + let mut dummy = 0u8; let now = Instant::now(); + let mut requests: Vec<(ConnectRequest, SocketAddr)> = requests.into_iter() + .map(|(request_bytes, src)| { + if let Request::Connect(r) = request_from_bytes(&request_bytes, 255).unwrap() { + (r, src) + } else { + unreachable!() + } + }) + .collect(); + + let requests = requests.drain(..); + handle_connect_requests(&state, &mut responses, requests); + for (response, _) in responses.drain(..){ + if let Response::Connect(_) = response { + num_responses += 1; + } + + cursor.set_position(0); + + response_to_bytes(&mut cursor, response, IpVersion::IPv4); + + dummy ^= cursor.get_ref()[0]; + } + let duration = Instant::now() - now; - let requests_per_second = ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0); + let requests_per_second = ITERATIONS as f64 / (duration.as_micros() as f64 / 1000000.0); let time_per_request = duration.as_nanos() as f64 / ITERATIONS as f64; + assert_eq!(num_responses, ITERATIONS); + // println!("\nrequests/second: {:.2}", requests_per_second); // println!("time per request: {:.2}ns", time_per_request); + /* let mut dummy = 0usize; - let mut num_responses: usize = 0; for (response, _src) in responses { if let Response::Connect(response) = response { if response.connection_id.0 > 0 { dummy += 1; } - - num_responses += 1; } } - if num_responses != ITERATIONS { - println!("ERROR: only {} responses received", num_responses); - } - if dummy == ITERATIONS { println!("dummy test output: {}", dummy); } + */ + + if dummy == 123u8 { + println!("dummy info"); + } (requests_per_second, time_per_request) } diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index 7b1c788..4a6e6f0 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -2,14 +2,16 @@ //! //! Example summary output: //! ``` -//! ## Average results over 100 rounds +//! ## Average results over 50 rounds //! -//! Connect handler: 3 459 722 requests/second, 289.22 ns/request -//! Announce handler: 390 674 requests/second, 2568.55 ns/request -//! Scrape handler: 1 039 374 requests/second, 963.02 ns/request +//! Connect handler: 2 514 978 requests/second, 397.87 ns/request +//! Announce handler: 246 744 requests/second, 4054.58 ns/request +//! Scrape handler: 499 385 requests/second, 2007.23 ns/request //! ``` use std::time::{Duration, Instant}; +use std::io::Cursor; +use std::net::SocketAddr; use indicatif::{ProgressBar, ProgressStyle, ProgressIterator}; use num_format::{Locale, ToFormattedString}; @@ -17,6 +19,7 @@ use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use aquatic::common::*; use aquatic::config::Config; +use bittorrent_udp::converters::*; mod announce; @@ -24,6 +27,8 @@ mod common; mod connect; mod scrape; +use common::*; + #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; @@ -46,7 +51,7 @@ macro_rules! print_results { fn main(){ - let num_rounds = 100; + let num_rounds = 50; let mut connect_data = (0.0, 0.0); let mut announce_data = (0.0, 0.0); @@ -64,12 +69,30 @@ fn main(){ { let requests = connect::create_requests(); + let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter() + .map(|(request, src)| { + let mut buffer = [0u8; MAX_REQUEST_BYTES]; + let mut cursor = Cursor::new(buffer.as_mut()); + + request_to_bytes(&mut cursor, Request::Connect(request)); + + (buffer, src) + }) + .collect(); + ::std::thread::sleep(Duration::from_secs(1)); let pb = create_progress_bar("Connect handler", num_rounds); for _ in (0..num_rounds).progress_with(pb){ - let d = connect::bench(requests.clone()); + let requests = requests.clone(); + + ::std::thread::sleep(Duration::from_millis(200)); + + let d = connect::bench(requests); + + ::std::thread::sleep(Duration::from_millis(200)); + connect_data.0 += d.0; connect_data.1 += d.1; } @@ -85,6 +108,30 @@ fn main(){ &info_hashes ); + let state = State::new(); + + let time = Time(Instant::now()); + + for (request, src) in requests.iter() { + let key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: *src, + }; + + state.connections.insert(key, time); + } + + let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter() + .map(|(request, src)| { + let mut buffer = [0u8; MAX_REQUEST_BYTES]; + let mut cursor = Cursor::new(buffer.as_mut()); + + request_to_bytes(&mut cursor, Request::Announce(request)); + + (buffer, src) + }) + .collect(); + let mut state_for_scrape = State::new(); ::std::thread::sleep(Duration::from_secs(1)); @@ -92,20 +139,14 @@ fn main(){ let pb = create_progress_bar("Announce handler", num_rounds); for round in (0..num_rounds).progress_with(pb) { - let state = State::new(); + let requests = requests.clone(); - let time = Time(Instant::now()); + ::std::thread::sleep(Duration::from_millis(200)); - for (request, src) in requests.iter() { - let key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; + let d = announce::bench(&state, &config, requests); - state.connections.insert(key, time); - } + ::std::thread::sleep(Duration::from_millis(200)); - let d = announce::bench(&state, &config, requests.clone()); announce_data.0 += d.0; announce_data.1 += d.1; @@ -135,12 +176,30 @@ fn main(){ state.connections.insert(key, time); } + let requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> = requests.into_iter() + .map(|(request, src)| { + let mut buffer = [0u8; MAX_REQUEST_BYTES]; + let mut cursor = Cursor::new(buffer.as_mut()); + + request_to_bytes(&mut cursor, Request::Scrape(request)); + + (buffer, src) + }) + .collect(); + ::std::thread::sleep(Duration::from_secs(1)); let pb = create_progress_bar("Scrape handler", num_rounds); for _ in (0..num_rounds).progress_with(pb) { - let d = scrape::bench(&state, requests.clone()); + let requests = requests.clone(); + + ::std::thread::sleep(Duration::from_millis(200)); + + let d = scrape::bench(&state, requests); + + ::std::thread::sleep(Duration::from_millis(200)); + scrape_data.0 += d.0; scrape_data.1 += d.1; } diff --git a/aquatic_bench/src/bin/bench_handlers/scrape.rs b/aquatic_bench/src/bin/bench_handlers/scrape.rs index 3342153..aa70a85 100644 --- a/aquatic_bench/src/bin/bench_handlers/scrape.rs +++ b/aquatic_bench/src/bin/bench_handlers/scrape.rs @@ -1,5 +1,6 @@ -use std::time::Instant; +use std::io::Cursor; use std::net::SocketAddr; +use std::time::Instant; use rand::Rng; use rand_distr::Pareto; @@ -7,6 +8,7 @@ use rand_distr::Pareto; use aquatic::handlers::*; use aquatic::common::*; use aquatic_bench::*; +use bittorrent_udp::converters::*; use crate::common::*; @@ -17,48 +19,58 @@ const SCRAPE_NUM_HASHES: usize = 10; pub fn bench( state: &State, - requests: Vec<(ScrapeRequest, SocketAddr)>, + requests: Vec<([u8; MAX_REQUEST_BYTES], SocketAddr)> ) -> (f64, f64) { let mut responses = Vec::with_capacity(SCRAPE_REQUESTS); - let mut scrape_requests = requests; - let scrape_requests = scrape_requests.drain(..); + + let mut buffer = [0u8; MAX_PACKET_SIZE]; + let mut cursor = Cursor::new(buffer.as_mut()); + let mut num_responses: usize = 0; + let mut dummy = 0u8; let now = Instant::now(); + let mut requests: Vec<(ScrapeRequest, SocketAddr)> = requests.into_iter() + .map(|(request_bytes, src)| { + if let Request::Scrape(r) = request_from_bytes(&request_bytes, 255).unwrap() { + (r, src) + } else { + unreachable!() + } + }) + .collect(); + + let requests = requests.drain(..); + handle_scrape_requests( &state, &mut responses, - scrape_requests, + requests, ); + for (response, _src) in responses.drain(..){ + if let Response::Scrape(_) = response { + num_responses += 1; + } + + cursor.set_position(0); + + response_to_bytes(&mut cursor, response, IpVersion::IPv4); + + dummy ^= cursor.get_ref()[0]; + } + let duration = Instant::now() - now; - let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_millis() as f64 / 1000.0); + let requests_per_second = SCRAPE_REQUESTS as f64 / (duration.as_micros() as f64 / 1000000.0); let time_per_request = duration.as_nanos() as f64 / SCRAPE_REQUESTS as f64; - // println!("\nrequests/second: {:.2}", requests_per_second); - // println!("time per request: {:.2}ns", time_per_request); + assert_eq!(num_responses, SCRAPE_REQUESTS); - // let mut total_num_peers = 0.0f64; - let mut num_responses: usize = 0; - - for (response, _src) in responses.drain(..){ - if let Response::Scrape(_response) = response { - // for stats in response.torrent_stats { - // total_num_peers += f64::from(stats.seeders.0); - // total_num_peers += f64::from(stats.leechers.0); - // } - - num_responses += 1; - } + if dummy == 123u8 { + println!("dummy info"); } - if num_responses != SCRAPE_REQUESTS { - println!("ERROR: only {} responses received", num_responses); - } - - // println!("avg num peers reported: {:.2}", total_num_peers / (SCRAPE_REQUESTS as f64 * SCRAPE_NUM_HASHES as f64)); - (requests_per_second, time_per_request) } diff --git a/bittorrent_udp/src/converters/requests.rs b/bittorrent_udp/src/converters/requests.rs index e9e616e..1d80bd6 100644 --- a/bittorrent_udp/src/converters/requests.rs +++ b/bittorrent_udp/src/converters/requests.rs @@ -1,7 +1,7 @@ use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian}; use std::io; -use std::io::Read; +use std::io::{Read, Write}; use std::net::Ipv4Addr; use crate::types; @@ -12,9 +12,10 @@ use super::common::*; const MAGIC_NUMBER: i64 = 4_497_486_125_440; -pub fn request_to_bytes(request: &types::Request) -> Vec { - let mut bytes = Vec::new(); - +pub fn request_to_bytes( + bytes: &mut impl Write, + request: types::Request +){ match request { types::Request::Connect(r) => { bytes.write_i64::(MAGIC_NUMBER).unwrap(); @@ -27,8 +28,8 @@ pub fn request_to_bytes(request: &types::Request) -> Vec { bytes.write_i32::(1).unwrap(); bytes.write_i32::(r.transaction_id.0).unwrap(); - bytes.extend(r.info_hash.0.iter()); - bytes.extend(r.peer_id.0.iter()); + bytes.write_all(&r.info_hash.0).unwrap(); + bytes.write_all(&r.peer_id.0).unwrap(); bytes.write_i64::(r.bytes_downloaded.0).unwrap(); bytes.write_i64::(r.bytes_left.0).unwrap(); @@ -36,7 +37,7 @@ pub fn request_to_bytes(request: &types::Request) -> Vec { bytes.write_i32::(event_to_i32(r.event)).unwrap(); - bytes.extend(&r.ip_address.map_or([0; 4], |ip| ip.octets())); + bytes.write_all(&r.ip_address.map_or([0; 4], |ip| ip.octets())).unwrap(); bytes.write_u32::(0).unwrap(); // IP bytes.write_u32::(r.key.0).unwrap(); @@ -50,14 +51,12 @@ pub fn request_to_bytes(request: &types::Request) -> Vec { bytes.write_i32::(r.transaction_id.0).unwrap(); for info_hash in &r.info_hashes { - bytes.extend(info_hash.0.iter()); + bytes.write_all(&info_hash.0).unwrap(); } } _ => () // Invalid requests should never happen } - - bytes } @@ -65,7 +64,6 @@ pub fn request_from_bytes( bytes: &[u8], max_scrape_torrents: u8, ) -> Result { - let mut bytes = io::Cursor::new(bytes); let connection_id = bytes.read_i64::()?; diff --git a/bittorrent_udp/src/converters/responses.rs b/bittorrent_udp/src/converters/responses.rs index 12727b6..65f22ec 100644 --- a/bittorrent_udp/src/converters/responses.rs +++ b/bittorrent_udp/src/converters/responses.rs @@ -1,18 +1,17 @@ use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian}; use std::io; -use std::io::Read; +use std::io::{Read, Write}; use std::net::{IpAddr, Ipv6Addr, Ipv4Addr}; use crate::types; pub fn response_to_bytes( - response: &types::Response, + bytes: &mut impl Write, + response: types::Response, ip_version: types::IpVersion -) -> Vec { - let mut bytes = Vec::new(); - +){ match response { types::Response::Connect(r) => { bytes.write_i32::(0).unwrap(); @@ -29,19 +28,21 @@ pub fn response_to_bytes( // Write peer IPs and ports. Silently ignore peers with wrong // IP version - for peer in r.peers.iter(){ + for peer in r.peers { let mut correct = false; match peer.ip_address { IpAddr::V4(ip) => { if let types::IpVersion::IPv4 = ip_version { - bytes.extend(&ip.octets()); + bytes.write_all(&ip.octets()).unwrap(); + correct = true; } }, IpAddr::V6(ip) => { if let types::IpVersion::IPv6 = ip_version { - bytes.extend(&ip.octets()); + bytes.write_all(&ip.octets()).unwrap(); + correct = true; } } @@ -57,7 +58,7 @@ pub fn response_to_bytes( bytes.write_i32::(2).unwrap(); bytes.write_i32::(r.transaction_id.0).unwrap(); - for torrent_stat in r.torrent_stats.iter(){ + for torrent_stat in r.torrent_stats { bytes.write_i32::(torrent_stat.seeders.0) .unwrap(); bytes.write_i32::(torrent_stat.completed.0) @@ -71,11 +72,9 @@ pub fn response_to_bytes( bytes.write_i32::(3).unwrap(); bytes.write_i32::(r.transaction_id.0).unwrap(); - bytes.extend(r.message.as_bytes().iter()); + bytes.write_all(r.message.as_bytes()).unwrap(); }, } - - bytes }