From 5b0d364ccffadd39147f26e04807b47b92dbce1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 19 Jul 2020 23:03:30 +0200 Subject: [PATCH] WIP: start work on porting udp load test to http --- Cargo.lock | 18 +++ Cargo.toml | 1 + aquatic_http_load_test/Cargo.toml | 25 ++++ aquatic_http_load_test/src/common.rs | 187 +++++++++++++++++++++++ aquatic_http_load_test/src/handler.rs | 191 ++++++++++++++++++++++++ aquatic_http_load_test/src/main.rs | 204 ++++++++++++++++++++++++++ aquatic_http_load_test/src/network.rs | 198 +++++++++++++++++++++++++ aquatic_http_load_test/src/utils.rs | 53 +++++++ 8 files changed, 877 insertions(+) create mode 100644 aquatic_http_load_test/Cargo.toml create mode 100644 aquatic_http_load_test/src/common.rs create mode 100644 aquatic_http_load_test/src/handler.rs create mode 100644 aquatic_http_load_test/src/main.rs create mode 100644 aquatic_http_load_test/src/network.rs create mode 100644 aquatic_http_load_test/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 0bc7e1b..0ed52cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,24 @@ dependencies = [ "smartstring", ] +[[package]] +name = "aquatic_http_load_test" +version = "0.1.0" +dependencies = [ + "anyhow", + "aquatic_cli_helpers", + "aquatic_http_protocol", + "crossbeam-channel", + "mimalloc", + "mio", + "quickcheck", + "quickcheck_macros", + "rand", + "rand_distr", + "serde", + "socket2", +] + [[package]] name = "aquatic_http_protocol" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index abbcad2..45307c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "aquatic_common", "aquatic_common_tcp", "aquatic_http", + "aquatic_http_load_test", "aquatic_http_protocol", "aquatic_udp", "aquatic_udp_bench", diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml new file mode 100644 index 0000000..e6377b8 --- /dev/null +++ b/aquatic_http_load_test/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "aquatic_http_load_test" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" +license = "Apache-2.0" + +[[bin]] +name = "aquatic_http_load_test" + +[dependencies] +anyhow = "1" +aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } +aquatic_http_protocol = { path = "../aquatic_http_protocol" } +crossbeam-channel = "0.4" +mimalloc = { version = "0.1", default-features = false } +mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } +rand = { version = "0.7", features = ["small_rng"] } +rand_distr = "0.2" +serde = { version = "1", features = ["derive"] } +socket2 = { version = "0.3", features = ["reuseport"] } + +[dev-dependencies] +quickcheck = "0.9" +quickcheck_macros = "0.9" diff --git a/aquatic_http_load_test/src/common.rs b/aquatic_http_load_test/src/common.rs new file mode 100644 index 0000000..e24ef7d --- /dev/null +++ b/aquatic_http_load_test/src/common.rs @@ -0,0 +1,187 @@ +use std::net::SocketAddr; +use std::sync::{Arc, atomic::AtomicUsize}; + +use serde::{Serialize, Deserialize}; +use rand_distr::Pareto; + +pub use aquatic_http_protocol::common::*; +pub use aquatic_http_protocol::response::*; +pub use aquatic_http_protocol::request::*; + + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub struct ThreadId(pub u8); + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct Config { + /// Server address + pub server_address: SocketAddr, + /// Number of sockets and socket worker threads + /// + /// Sockets will bind to one port each, and with + /// multiple_client_ips = true, additionally to one IP each. + pub num_socket_workers: u8, + /// Number of workers generating requests from responses, as well as + /// requests not connected to previous ones. + pub num_request_workers: usize, + /// Run duration (quit and generate report after this many seconds) + pub duration: usize, + pub network: NetworkConfig, + pub handler: HandlerConfig, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct NetworkConfig { + /// True means bind to one localhost IP per socket. On macOS, this by + /// default causes all server responses to go to one socket worker. + /// Default option ("true") can cause issues on macOS. + /// + /// The point of multiple IPs is to possibly cause a better distribution + /// of requests to servers with SO_REUSEPORT option. + pub multiple_client_ips: bool, + /// Use Ipv6 only + pub ipv6_client: bool, + /// Number of first client port + pub first_port: u16, + /// Socket worker poll timeout in microseconds + pub poll_timeout: u64, + /// Socket worker polling event number + pub poll_event_capacity: usize, + /// Size of socket recv buffer. Use 0 for OS default. + /// + /// This setting can have a big impact on dropped packages. It might + /// require changing system defaults. Some examples of commands to set + /// recommended values for different operating systems: + /// + /// macOS: + /// $ sudo sysctl net.inet.udp.recvspace=6000000 + /// $ sudo sysctl net.inet.udp.maxdgram=500000 # Not necessary, but recommended + /// $ sudo sysctl kern.ipc.maxsockbuf=8388608 # Not necessary, but recommended + /// + /// Linux: + /// $ sudo sysctl -w net.core.rmem_max=104857600 + /// $ sudo sysctl -w net.core.rmem_default=104857600 + pub recv_buffer: usize, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct HandlerConfig { + /// Number of torrents to simulate + pub number_of_torrents: usize, + /// Maximum number of torrents to ask about in scrape requests + pub scrape_max_torrents: usize, + /// Handler: max number of responses to collect for before processing + pub max_responses_per_iter: usize, + /// Probability that a generated request is a announce request, as part + /// of sum of the various weight arguments. + pub weight_announce: usize, + /// Probability that a generated request is a scrape request, as part + /// of sum of the various weight arguments. + pub weight_scrape: usize, + /// Handler: max microseconds to wait for single response from channel + pub channel_timeout: u64, + /// Pareto shape + /// + /// Fake peers choose torrents according to Pareto distribution. + pub torrent_selection_pareto_shape: f64, + /// Probability that a generated peer is a seeder + pub peer_seeder_probability: f64, + /// Part of additional request creation calculation, meaning requests + /// which are not dependent on previous responses from server. Higher + /// means more. + pub additional_request_factor: f64, +} + + +impl Default for Config { + fn default() -> Self { + Self { + server_address: "127.0.0.1:3000".parse().unwrap(), + num_socket_workers: 1, + num_request_workers: 1, + duration: 0, + network: NetworkConfig::default(), + handler: HandlerConfig::default(), + } + } +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + multiple_client_ips: true, + ipv6_client: false, + first_port: 45_000, + poll_timeout: 276, + poll_event_capacity: 2_877, + recv_buffer: 6_000_000, + } + } +} + + +impl Default for HandlerConfig { + fn default() -> Self { + Self { + number_of_torrents: 10_000, + peer_seeder_probability: 0.25, + scrape_max_torrents: 50, + weight_announce: 5, + weight_scrape: 1, + additional_request_factor: 0.4, + max_responses_per_iter: 10_000, + channel_timeout: 200, + torrent_selection_pareto_shape: 2.0, + } + } +} + + +#[derive(PartialEq, Eq, Clone)] +pub struct TorrentPeer { + pub info_hash: InfoHash, + pub scrape_hash_indeces: Vec, + pub peer_id: PeerId, + pub port: u16, +} + + +#[derive(Default)] +pub struct Statistics { + pub requests: AtomicUsize, + pub response_peers: AtomicUsize, + pub responses_announce: AtomicUsize, + pub responses_scrape: AtomicUsize, + pub responses_failure: AtomicUsize, +} + + +#[derive(Clone)] +pub struct LoadTestState { + pub info_hashes: Arc>, + pub statistics: Arc, + pub pareto: Arc>, +} + + +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum RequestType { + Announce, + Scrape +} + + +#[derive(Default)] +pub struct SocketWorkerLocalStatistics { + pub requests: usize, + pub response_peers: usize, + pub responses_announce: usize, + pub responses_scrape: usize, + pub responses_failure: usize, +} \ No newline at end of file diff --git a/aquatic_http_load_test/src/handler.rs b/aquatic_http_load_test/src/handler.rs new file mode 100644 index 0000000..02e076a --- /dev/null +++ b/aquatic_http_load_test/src/handler.rs @@ -0,0 +1,191 @@ +use std::time::Duration; +use std::vec::Drain; + +use crossbeam_channel::{Receiver, Sender}; +use rand::distributions::WeightedIndex; +use rand::prelude::*; + +use crate::common::*; +use crate::utils::*; + + +pub fn run_handler_thread( + config: &Config, + state: LoadTestState, + request_senders: Vec>, + response_receiver: Receiver<(ThreadId, Response)>, +){ + let state = &state; + + let mut rng1 = SmallRng::from_rng(thread_rng()) + .expect("create SmallRng from thread_rng()"); + let mut rng2 = SmallRng::from_rng(thread_rng()) + .expect("create SmallRng from thread_rng()"); + + let timeout = Duration::from_micros(config.handler.channel_timeout); + + let mut responses = Vec::new(); + + loop { + for i in 0..config.handler.max_responses_per_iter { + let response = if i == 0 { + match response_receiver.recv(){ + Ok(r) => r, + Err(_) => break, // Really shouldn't happen + } + } else { + match response_receiver.recv_timeout(timeout){ + Ok(r) => r, + Err(_) => break, + } + }; + + responses.push(response); + } + + let requests = process_responses( + config, + &state, + &mut rng1, + responses.drain(..) + ); + + // Somewhat dubious heuristic for deciding how fast to create + // and send additional requests (requests not having anything + // to do with previously sent requests) + let num_additional_to_send = { + let num_additional_requests = requests.iter() + .map(|v| v.len()) + .sum::() as f64; + + let num_new_requests_per_socket = num_additional_requests / + config.num_socket_workers as f64; + + ((num_new_requests_per_socket / 1.2) * config.handler.additional_request_factor) as usize + 10 + }; + + for (channel_index, new_requests) in requests.into_iter().enumerate(){ + let channel = &request_senders[channel_index]; + + for _ in 0..num_additional_to_send { + let request = create_random_request( + &config, + &state, + &mut rng2, + ); + + channel.send(request) + .expect("send request to channel in handler worker"); + } + + for request in new_requests.into_iter(){ + channel.send(request) + .expect("send request to channel in handler worker"); + } + } + } +} + + +fn process_responses( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, + responses: Drain<(ThreadId, Response)> +) -> Vec> { + let mut new_requests = Vec::with_capacity( + config.num_socket_workers as usize + ); + + for _ in 0..config.num_socket_workers { + new_requests.push(Vec::new()); + } + + for (socket_thread_id, response) in responses.into_iter() { + let new_request = create_random_request(config, state, rng); + + new_requests[socket_thread_id.0 as usize].push(new_request); + } + + new_requests +} + + +pub fn create_random_request( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> Request { + let weights = vec![ + config.handler.weight_announce as u32, + config.handler.weight_scrape as u32, + ]; + + let items = vec![ + RequestType::Announce, + RequestType::Scrape, + ]; + + let dist = WeightedIndex::new(&weights) + .expect("random request weighted index"); + + match items[dist.sample(rng)] { + RequestType::Announce => create_announce_request( + config, + state, + rng, + ), + RequestType::Scrape => create_scrape_request( + config, + state, + rng, + ) + } +} + + +fn create_announce_request( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> Request { + let (event, bytes_left) = { + if rng.gen_bool(config.handler.peer_seeder_probability) { + (AnnounceEvent::Completed, 0) + } else { + (AnnounceEvent::Started, 50) + } + }; + + let info_hash_index = select_info_hash_index(config, &state, rng); + + Request::Announce(AnnounceRequest { + info_hash: state.info_hashes[info_hash_index], + peer_id: PeerId(rng.gen()), + bytes_left, + event, + key: None, + numwant: None, + compact: true, + port: rng.gen() + }) +} + + +fn create_scrape_request( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> Request { + let mut scrape_hashes = Vec::new(); + + for _ in 0..20 { + let info_hash_index = select_info_hash_index(config, &state, rng); + + scrape_hashes.push(state.info_hashes[info_hash_index]); + } + + Request::Scrape(ScrapeRequest { + info_hashes: scrape_hashes, + }) +} \ No newline at end of file diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs new file mode 100644 index 0000000..388ed06 --- /dev/null +++ b/aquatic_http_load_test/src/main.rs @@ -0,0 +1,204 @@ +use std::net::{SocketAddr, Ipv4Addr, Ipv6Addr}; +use std::thread; +use std::sync::{Arc, atomic::Ordering}; +use std::time::{Duration, Instant}; + +use crossbeam_channel::unbounded; +use rand::prelude::*; +use rand_distr::Pareto; + +mod common; +mod handler; +mod network; +mod utils; + +use common::*; +use utils::*; +use handler::create_random_request; +use network::*; +use handler::run_handler_thread; + + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + + +pub fn main(){ + aquatic_cli_helpers::run_app_with_cli_and_config::( + "aquatic: udp bittorrent tracker: load tester", + run, + ) +} + + +fn run(config: Config) -> ::anyhow::Result<()> { + if config.handler.weight_announce + config.handler.weight_scrape == 0 { + panic!("Error: at least one weight must be larger than zero."); + } + + println!("Starting client with config: {:#?}", config); + + let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); + + for _ in 0..config.handler.number_of_torrents { + info_hashes.push(generate_info_hash()); + } + + let pareto = Pareto::new( + 1.0, + config.handler.torrent_selection_pareto_shape + ).unwrap(); + + let state = LoadTestState { + info_hashes: Arc::new(info_hashes), + statistics: Arc::new(Statistics::default()), + pareto: Arc::new(pareto), + }; + + // Start socket workers + + let (response_sender, response_receiver) = unbounded(); + + let mut request_senders = Vec::new(); + + for i in 0..config.num_socket_workers { + let thread_id = ThreadId(i); + let (sender, receiver) = unbounded(); + let port = config.network.first_port + (i as u16); + + let addr = if config.network.multiple_client_ips { + let ip = if config.network.ipv6_client { // FIXME: test ipv6 + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1 + i as u16).into() + } else { + Ipv4Addr::new(127, 0, 0, 1 + i).into() + }; + + SocketAddr::new(ip, port) + } else { + let ip = if config.network.ipv6_client { + Ipv6Addr::LOCALHOST.into() + } else { + Ipv4Addr::LOCALHOST.into() + }; + + SocketAddr::new(ip, port) + }; + + request_senders.push(sender); + + let config = config.clone(); + let response_sender = response_sender.clone(); + let state = state.clone(); + + thread::spawn(move || run_socket_thread( + state, + response_sender, + receiver, + &config, + addr, + thread_id + )); + } + + for _ in 0..config.num_request_workers { + let config = config.clone(); + let state= state.clone(); + let request_senders = request_senders.clone(); + let response_receiver = response_receiver.clone(); + + thread::spawn(move || run_handler_thread( + &config, + state, + request_senders, + response_receiver, + )); + } + + // Bootstrap request cycle by adding a request to each request channel + for sender in request_senders.iter(){ + let request = create_random_request( + &config, + &state, + &mut thread_rng() + ); + + sender.send(request.into()) + .expect("bootstrap: add initial request to request queue"); + } + + monitor_statistics( + state, + &config + ); + + Ok(()) +} + + +fn monitor_statistics( + state: LoadTestState, + config: &Config, +){ + let start_time = Instant::now(); + let mut report_avg_response_vec: Vec = Vec::new(); + + let interval = 5; + let interval_f64 = interval as f64; + + loop { + thread::sleep(Duration::from_secs(interval)); + + let statistics = state.statistics.as_ref(); + + let responses_announce = statistics.responses_announce + .fetch_and(0, Ordering::SeqCst) as f64; + let response_peers = statistics.response_peers + .fetch_and(0, Ordering::SeqCst) as f64; + + let requests_per_second = statistics.requests + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_scrape_per_second = statistics.responses_scrape + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_failure_per_second = statistics.responses_failure + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + + let responses_announce_per_second = responses_announce / interval_f64; + + let responses_per_second = + responses_announce_per_second + + responses_scrape_per_second + + responses_failure_per_second; + + report_avg_response_vec.push(responses_per_second); + + println!(); + println!("Requests out: {:.2}/second", requests_per_second); + println!("Responses in: {:.2}/second", responses_per_second); + println!(" - Announce responses: {:.2}", responses_announce_per_second); + println!(" - Scrape responses: {:.2}", responses_scrape_per_second); + println!(" - Failure responses: {:.2}", responses_failure_per_second); + println!("Peers per announce response: {:.2}", response_peers / responses_announce); + + let time_elapsed = start_time.elapsed(); + let duration = Duration::from_secs(config.duration as u64); + + if config.duration != 0 && time_elapsed >= duration { + let report_len = report_avg_response_vec.len() as f64; + let report_sum: f64 = report_avg_response_vec.into_iter().sum(); + let report_avg: f64 = report_sum / report_len; + + println!( + concat!( + "\n# aquatic load test report\n\n", + "Test ran for {} seconds.\n", + "Average responses per second: {:.2}\n\nConfig: {:#?}\n" + ), + time_elapsed.as_secs(), + report_avg, + config + ); + + break + } + } +} diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs new file mode 100644 index 0000000..334964c --- /dev/null +++ b/aquatic_http_load_test/src/network.rs @@ -0,0 +1,198 @@ +use std::io::Cursor; +use std::net::SocketAddr; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use crossbeam_channel::{Receiver, Sender}; +use mio::{net::UdpSocket, Events, Poll, Interest, Token}; +use socket2::{Socket, Domain, Type, Protocol}; + +use crate::common::*; + + +const MAX_PACKET_SIZE: usize = 4096; + + +pub fn create_socket( + config: &Config, + addr: SocketAddr +) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4(){ + Socket::new(Domain::ipv4(), Type::dgram(), Some(Protocol::udp())) + } else { + Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp())) + }.expect("create socket"); + + socket.set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer){ + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, + err + ); + } + } + + socket.bind(&addr.into()) + .expect(&format!("socket: bind to {}", addr)); + + socket.connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into_udp_socket() +} + + +pub fn run_socket_thread( + state: LoadTestState, + response_channel_sender: Sender<(ThreadId, Response)>, + request_receiver: Receiver, + config: &Config, + addr: SocketAddr, + thread_id: ThreadId +) { + let mut socket = UdpSocket::from_std(create_socket(config, addr)); + let mut buffer = [0u8; MAX_PACKET_SIZE]; + + let token = Token(thread_id.0 as usize); + let interests = Interest::READABLE; + let timeout = Duration::from_micros(config.network.poll_timeout); + + let mut poll = Poll::new().expect("create poll"); + + poll.registry() + .register(&mut socket, token, interests) + .unwrap(); + + let mut events = Events::with_capacity(config.network.poll_event_capacity); + + let mut statistics = SocketWorkerLocalStatistics::default(); + let mut responses = Vec::new(); + + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); + + for event in events.iter(){ + if event.token() == token { + if event.is_readable(){ + read_responses( + thread_id, + &socket, + &mut buffer, + &mut statistics, + &mut responses + ); + + for r in responses.drain(..){ + response_channel_sender.send(r) + .expect(&format!( + "add response to channel in socket worker {}", + thread_id.0 + )); + } + + poll.registry() + .reregister(&mut socket, token, interests) + .unwrap(); + } + } + + send_requests( + &state, + &mut socket, + &mut buffer, + &request_receiver, + &mut statistics + ); + } + + send_requests( + &state, + &mut socket, + &mut buffer, + &request_receiver, + &mut statistics + ); + + state.statistics.requests + .fetch_add(statistics.requests, Ordering::SeqCst); + state.statistics.responses_announce + .fetch_add(statistics.responses_announce, Ordering::SeqCst); + state.statistics.responses_scrape + .fetch_add(statistics.responses_scrape, Ordering::SeqCst); + state.statistics.responses_failure + .fetch_add(statistics.responses_failure, Ordering::SeqCst); + state.statistics.response_peers + .fetch_add(statistics.response_peers, Ordering::SeqCst); + + statistics = SocketWorkerLocalStatistics::default(); + } +} + + +fn read_responses( + thread_id: ThreadId, + socket: &UdpSocket, + buffer: &mut [u8], + ls: &mut SocketWorkerLocalStatistics, + responses: &mut Vec<(ThreadId, Response)>, +){ + while let Ok(amt) = socket.recv(buffer) { + match response_from_bytes(&buffer[0..amt]){ + Ok(response) => { + match response { + Response::Announce(ref r) => { + ls.responses_announce += 1; + ls.response_peers += r.peers.len(); + }, + Response::Scrape(_) => { + ls.responses_scrape += 1; + }, + Response::Failure(_) => { + ls.responses_failure += 1; + }, + } + + responses.push((thread_id, response)) + }, + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + } +} + + +fn send_requests( + state: &LoadTestState, + socket: &mut UdpSocket, + buffer: &mut [u8], + receiver: &Receiver, + statistics: &mut SocketWorkerLocalStatistics, +){ + let mut cursor = Cursor::new(buffer); + + while let Ok(request) = receiver.try_recv() { + cursor.set_position(0); + + if let Err(err) = request_to_bytes(&mut cursor, request){ + eprintln!("request_to_bytes err: {}", err); + } + + let position = cursor.position() as usize; + let inner = cursor.get_ref(); + + match socket.send(&inner[..position]) { + Ok(_) => { + statistics.requests += 1; + }, + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } +} \ No newline at end of file diff --git a/aquatic_http_load_test/src/utils.rs b/aquatic_http_load_test/src/utils.rs new file mode 100644 index 0000000..b28eb9f --- /dev/null +++ b/aquatic_http_load_test/src/utils.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use rand_distr::{Standard, Pareto}; +use rand::prelude::*; + +use crate::common::*; + + +pub fn select_info_hash_index( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> usize { + pareto_usize(rng, state.pareto, config.handler.number_of_torrents - 1) +} + + +pub fn pareto_usize( + rng: &mut impl Rng, + pareto: Arc>, + max: usize, +) -> usize { + let p: f64 = pareto.sample(rng); + let p = (p.min(101.0f64) - 1.0) / 100.0; + + (p * max as f64) as usize +} + + +pub fn generate_peer_id() -> PeerId { + PeerId(random_20_bytes()) +} + + +pub fn generate_info_hash() -> InfoHash { + InfoHash(random_20_bytes()) +} + + +// Don't use SmallRng here for now +fn random_20_bytes() -> [u8; 20] { + let mut bytes = [0; 20]; + + for (i, b) in rand::thread_rng() + .sample_iter(&Standard) + .enumerate() + .take(20) { + + bytes[i] = b + } + + bytes +} \ No newline at end of file