diff --git a/Cargo.lock b/Cargo.lock index 55fb5fc..0fee5ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,7 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", + "rand_distr", ] [[package]] @@ -294,6 +295,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_distr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96977acbdd3a6576fb1d27391900035bf3863d4a16422973a409b488cf29ffb2" +dependencies = [ + "rand", +] + [[package]] name = "rand_hc" version = "0.2.0" diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index 5964c37..3c93f86 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -4,24 +4,34 @@ version = "0.1.0" authors = ["Joakim FrostegÄrd "] edition = "2018" +[lib] +name = "aquatic" +path = "src/lib/lib.rs" + [[bin]] name = "aquatic" -path = "src/main.rs" + +[[bin]] +name = "bench_connect" + +[[bin]] +name = "bench_announce" [dependencies] bittorrent_udp = { path = "../bittorrent_udp" } dashmap = "3" indexmap = "1" net2 = "0.2" - -[dependencies.rand] -version = "0.7" -features = ["small_rng"] +rand_distr = "0.2" [dependencies.mio] version = "0.7" features = ["udp", "os-poll", "os-util"] +[dependencies.rand] +version = "0.7" +features = ["small_rng"] + [dev-dependencies] quickcheck = "0.9" quickcheck_macros = "0.9" \ No newline at end of file diff --git a/aquatic/src/bin/aquatic.rs b/aquatic/src/bin/aquatic.rs new file mode 100644 index 0000000..834b5b5 --- /dev/null +++ b/aquatic/src/bin/aquatic.rs @@ -0,0 +1,5 @@ +use aquatic; + +fn main(){ + aquatic::run(); +} \ No newline at end of file diff --git a/aquatic/src/bin/bench_announce.rs b/aquatic/src/bin/bench_announce.rs new file mode 100644 index 0000000..ee84280 --- /dev/null +++ b/aquatic/src/bin/bench_announce.rs @@ -0,0 +1,128 @@ +use std::time::{Duration, Instant}; +use std::net::SocketAddr; + +use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; +use rand_distr::Pareto; + +use aquatic::common::*; +use aquatic::handler::handle_announce_requests; + + +const ITERATIONS: usize = 5_000_000; +const NUM_INFO_HASHES: usize = ITERATIONS; + + +fn main(){ + println!("benchmark: handle_announce_requests\n"); + + println!("generating data.."); + + let state = State::new(); + let mut responses = Vec::new(); + + let mut requests = create_requests(); + + let time = Time(Instant::now()); + + for (request, src) in requests.iter() { + let key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: *src, + }; + + state.connections.insert(key, time); + } + + let requests = requests.drain(..); + + ::std::thread::sleep(Duration::from_secs(1)); + + let now = Instant::now(); + + println!("running benchmark.."); + + handle_announce_requests( + &state, + &mut responses, + requests, + ); + + let duration = Instant::now() - now; + + println!("\nrequests/second: {:.2}", ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0)); + println!("time per request: {:.2}ns", duration.as_nanos() as f64 / ITERATIONS as f64); + + let mut total_num_peers = 0.0f64; + let mut max_num_peers = 0.0f64; + + for (response, _src) in responses { + if let Response::Announce(response) = response { + let n = response.peers.len() as f64; + + total_num_peers += n; + max_num_peers = max_num_peers.max(n); + } + } + + println!("avg num peers: {:.2}", total_num_peers / ITERATIONS as f64); + println!("max num peers: {:.2}", max_num_peers); +} + + +fn create_requests() -> Vec<(AnnounceRequest, SocketAddr)> { + let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); + let pareto = Pareto::new(1., 6.).unwrap(); + + let info_hashes = create_info_hashes(&mut rng); + let max_index = info_hashes.len() - 1; + + let mut requests = Vec::new(); + + for _ in 0..ITERATIONS { + let info_hash_index = pareto_usize(&mut rng, pareto, max_index); + + let request = AnnounceRequest { + connection_id: ConnectionId(rng.gen()), + transaction_id: TransactionId(rng.gen()), + info_hash: info_hashes[info_hash_index], + peer_id: PeerId(rng.gen()), + bytes_downloaded: NumberOfBytes(rng.gen()), + bytes_uploaded: NumberOfBytes(rng.gen()), + bytes_left: NumberOfBytes(rng.gen()), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(rng.gen()), + peers_wanted: NumberOfPeers(rng.gen()), + port: Port(rng.gen()) + }; + + let src = SocketAddr::from(([rng.gen(), rng.gen(), rng.gen(), rng.gen()], rng.gen())); + + requests.push((request, src)); + } + + requests +} + + +fn create_info_hashes(rng: &mut impl Rng) -> Vec { + let mut info_hashes = Vec::new(); + + for _ in 0..NUM_INFO_HASHES { + info_hashes.push(InfoHash(rng.gen())); + } + + info_hashes +} + + +fn pareto_usize( + rng: &mut impl Rng, + pareto: Pareto, + max: usize, +) -> usize { + let p: f64 = rng.sample(pareto); + let p = (p.min(101.0f64) - 1.0) / 100.0; + + (p * max as f64) as usize +} \ No newline at end of file diff --git a/aquatic/src/bin/bench_connect.rs b/aquatic/src/bin/bench_connect.rs new file mode 100644 index 0000000..0bf3a99 --- /dev/null +++ b/aquatic/src/bin/bench_connect.rs @@ -0,0 +1,65 @@ +use std::time::Instant; +use std::net::SocketAddr; + +use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; + +use aquatic::common::*; +use aquatic::handler::handle_connect_requests; + + +const ITERATIONS: usize = 10_000_000; + + +fn main(){ + println!("benchmark: handle_connect_requests\n"); + + let state = State::new(); + let mut responses = Vec::new(); + + let mut requests = create_requests(); + let requests = requests.drain(..); + + println!("running benchmark.."); + + let now = Instant::now(); + + handle_connect_requests(&state, &mut responses, requests); + + let duration = Instant::now() - now; + + println!("\nrequests/second: {:.2}", ITERATIONS as f64 / (duration.as_millis() as f64 / 1000.0)); + println!("time per request: {:.2}ns", duration.as_nanos() as f64 / ITERATIONS as f64); + + let mut dummy = 0usize; + + for (response, _src) in responses { + if let Response::Connect(response) = response { + if response.connection_id.0 > 0 { + dummy += 1; + } + } + } + + if dummy == ITERATIONS { + println!("dummy test output: {}", dummy); + } +} + + +fn create_requests() -> Vec<(ConnectRequest, SocketAddr)> { + let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); + + let mut requests = Vec::new(); + + for _ in 0..ITERATIONS { + let request = ConnectRequest { + transaction_id: TransactionId(rng.gen()), + }; + + let src = SocketAddr::from(([rng.gen(), rng.gen(), rng.gen(), rng.gen()], rng.gen())); + + requests.push((request, src)); + } + + requests +} \ No newline at end of file diff --git a/aquatic/src/common.rs b/aquatic/src/lib/common.rs similarity index 100% rename from aquatic/src/common.rs rename to aquatic/src/lib/common.rs diff --git a/aquatic/src/handler.rs b/aquatic/src/lib/handler.rs similarity index 99% rename from aquatic/src/handler.rs rename to aquatic/src/lib/handler.rs index d9e0887..7b6f35e 100644 --- a/aquatic/src/handler.rs +++ b/aquatic/src/lib/handler.rs @@ -96,7 +96,7 @@ pub fn handle_announce_requests( } } - let response_peers = extract_response_peers(&torrent_data.peers, 100); // FIXME num peers + let response_peers = extract_response_peers(&torrent_data.peers, 255); // FIXME num peers let response = Response::Announce(AnnounceResponse { transaction_id: request.transaction_id, diff --git a/aquatic/src/main.rs b/aquatic/src/lib/lib.rs similarity index 88% rename from aquatic/src/main.rs rename to aquatic/src/lib/lib.rs index 5cd949a..a1d7761 100644 --- a/aquatic/src/main.rs +++ b/aquatic/src/lib/lib.rs @@ -1,14 +1,14 @@ use std::time::Duration; -mod common; -mod handler; -mod network; -mod tasks; +pub mod common; +pub mod handler; +pub mod network; +pub mod tasks; use common::State; -fn main(){ +pub fn run(){ let addr = ([127, 0, 0, 1], 3000).into(); let state = State::new(); let socket = network::create_socket(addr, 4096 * 8); diff --git a/aquatic/src/network.rs b/aquatic/src/lib/network.rs similarity index 100% rename from aquatic/src/network.rs rename to aquatic/src/lib/network.rs diff --git a/aquatic/src/tasks.rs b/aquatic/src/lib/tasks.rs similarity index 100% rename from aquatic/src/tasks.rs rename to aquatic/src/lib/tasks.rs