diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index b966a86..8591911 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -10,14 +10,13 @@ use rand_distr::Pareto; mod common; mod config; -mod handler; -mod network; mod utils; +mod worker; use common::*; use config::Config; -use network::*; use utils::*; +use worker::*; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; diff --git a/aquatic_udp_load_test/src/utils.rs b/aquatic_udp_load_test/src/utils.rs index aecbe5a..bae1391 100644 --- a/aquatic_udp_load_test/src/utils.rs +++ b/aquatic_udp_load_test/src/utils.rs @@ -1,43 +1,8 @@ -use std::sync::Arc; - use rand::prelude::*; use rand_distr::Pareto; use aquatic_udp_protocol::*; -use crate::common::*; -use crate::config::Config; - -pub fn create_torrent_peer( - config: &Config, - rng: &mut impl Rng, - pareto: Pareto, - info_hashes: &Arc>, - connection_id: ConnectionId, -) -> TorrentPeer { - let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); - - let mut scrape_hash_indeces = Vec::new(); - - for _ in 0..num_scape_hashes { - scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) - } - - let info_hash_index = select_info_hash_index(config, rng, pareto); - - TorrentPeer { - info_hash: info_hashes[info_hash_index], - scrape_hash_indeces, - connection_id, - peer_id: generate_peer_id(), - port: Port(rand::random()), - } -} - -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { - pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) -} - pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto, max: usize) -> usize { let p: f64 = rng.sample(pareto); let p = (p.min(101.0f64) - 1.0) / 100.0; diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/worker/mod.rs similarity index 97% rename from aquatic_udp_load_test/src/network.rs rename to aquatic_udp_load_test/src/worker/mod.rs index bd4cf3a..26f5f5a 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -1,3 +1,5 @@ +mod request_gen; + use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; @@ -12,42 +14,11 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; use crate::config::Config; -use crate::{common::*, handler::process_response, utils::*}; +use crate::{common::*, utils::*}; +use request_gen::process_response; const MAX_PACKET_SIZE: usize = 4096; -pub fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, err - ); - } - } - - socket - .bind(&addr.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); - - socket - .connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into() -} - pub fn run_worker_thread( state: LoadTestState, pareto: Pareto, @@ -201,3 +172,35 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker *statistics = SocketWorkerLocalStatistics::default(); } + +fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, err + ); + } + } + + socket + .bind(&addr.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); + + socket + .connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into() +} diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/worker/request_gen.rs similarity index 86% rename from aquatic_udp_load_test/src/handler.rs rename to aquatic_udp_load_test/src/worker/request_gen.rs index b791c10..98091c8 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/worker/request_gen.rs @@ -186,3 +186,33 @@ fn create_scrape_request( }) .into() } + +fn create_torrent_peer( + config: &Config, + rng: &mut impl Rng, + pareto: Pareto, + info_hashes: &Arc>, + connection_id: ConnectionId, +) -> TorrentPeer { + let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); + + let mut scrape_hash_indeces = Vec::new(); + + for _ in 0..num_scape_hashes { + scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) + } + + let info_hash_index = select_info_hash_index(config, rng, pareto); + + TorrentPeer { + info_hash: info_hashes[info_hash_index], + scrape_hash_indeces, + connection_id, + peer_id: generate_peer_id(), + port: Port(rand::random()), + } +} + +fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { + pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) +}