mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
WIP: start work on porting udp load test to http
This commit is contained in:
parent
4ac2012a2a
commit
5b0d364ccf
8 changed files with 877 additions and 0 deletions
18
Cargo.lock
generated
18
Cargo.lock
generated
|
|
@ -97,6 +97,24 @@ dependencies = [
|
||||||
"smartstring",
|
"smartstring",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "aquatic_http_load_test"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"aquatic_cli_helpers",
|
||||||
|
"aquatic_http_protocol",
|
||||||
|
"crossbeam-channel",
|
||||||
|
"mimalloc",
|
||||||
|
"mio",
|
||||||
|
"quickcheck",
|
||||||
|
"quickcheck_macros",
|
||||||
|
"rand",
|
||||||
|
"rand_distr",
|
||||||
|
"serde",
|
||||||
|
"socket2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "aquatic_http_protocol"
|
name = "aquatic_http_protocol"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ members = [
|
||||||
"aquatic_common",
|
"aquatic_common",
|
||||||
"aquatic_common_tcp",
|
"aquatic_common_tcp",
|
||||||
"aquatic_http",
|
"aquatic_http",
|
||||||
|
"aquatic_http_load_test",
|
||||||
"aquatic_http_protocol",
|
"aquatic_http_protocol",
|
||||||
"aquatic_udp",
|
"aquatic_udp",
|
||||||
"aquatic_udp_bench",
|
"aquatic_udp_bench",
|
||||||
|
|
|
||||||
25
aquatic_http_load_test/Cargo.toml
Normal file
25
aquatic_http_load_test/Cargo.toml
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
[package]
|
||||||
|
name = "aquatic_http_load_test"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
|
||||||
|
edition = "2018"
|
||||||
|
license = "Apache-2.0"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "aquatic_http_load_test"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1"
|
||||||
|
aquatic_cli_helpers = { path = "../aquatic_cli_helpers" }
|
||||||
|
aquatic_http_protocol = { path = "../aquatic_http_protocol" }
|
||||||
|
crossbeam-channel = "0.4"
|
||||||
|
mimalloc = { version = "0.1", default-features = false }
|
||||||
|
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
|
||||||
|
rand = { version = "0.7", features = ["small_rng"] }
|
||||||
|
rand_distr = "0.2"
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
socket2 = { version = "0.3", features = ["reuseport"] }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
quickcheck = "0.9"
|
||||||
|
quickcheck_macros = "0.9"
|
||||||
187
aquatic_http_load_test/src/common.rs
Normal file
187
aquatic_http_load_test/src/common.rs
Normal file
|
|
@ -0,0 +1,187 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::{Arc, atomic::AtomicUsize};
|
||||||
|
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
use rand_distr::Pareto;
|
||||||
|
|
||||||
|
pub use aquatic_http_protocol::common::*;
|
||||||
|
pub use aquatic_http_protocol::response::*;
|
||||||
|
pub use aquatic_http_protocol::request::*;
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||||
|
pub struct ThreadId(pub u8);
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct Config {
|
||||||
|
/// Server address
|
||||||
|
pub server_address: SocketAddr,
|
||||||
|
/// Number of sockets and socket worker threads
|
||||||
|
///
|
||||||
|
/// Sockets will bind to one port each, and with
|
||||||
|
/// multiple_client_ips = true, additionally to one IP each.
|
||||||
|
pub num_socket_workers: u8,
|
||||||
|
/// Number of workers generating requests from responses, as well as
|
||||||
|
/// requests not connected to previous ones.
|
||||||
|
pub num_request_workers: usize,
|
||||||
|
/// Run duration (quit and generate report after this many seconds)
|
||||||
|
pub duration: usize,
|
||||||
|
pub network: NetworkConfig,
|
||||||
|
pub handler: HandlerConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct NetworkConfig {
|
||||||
|
/// True means bind to one localhost IP per socket. On macOS, this by
|
||||||
|
/// default causes all server responses to go to one socket worker.
|
||||||
|
/// Default option ("true") can cause issues on macOS.
|
||||||
|
///
|
||||||
|
/// The point of multiple IPs is to possibly cause a better distribution
|
||||||
|
/// of requests to servers with SO_REUSEPORT option.
|
||||||
|
pub multiple_client_ips: bool,
|
||||||
|
/// Use Ipv6 only
|
||||||
|
pub ipv6_client: bool,
|
||||||
|
/// Number of first client port
|
||||||
|
pub first_port: u16,
|
||||||
|
/// Socket worker poll timeout in microseconds
|
||||||
|
pub poll_timeout: u64,
|
||||||
|
/// Socket worker polling event number
|
||||||
|
pub poll_event_capacity: usize,
|
||||||
|
/// Size of socket recv buffer. Use 0 for OS default.
|
||||||
|
///
|
||||||
|
/// This setting can have a big impact on dropped packages. It might
|
||||||
|
/// require changing system defaults. Some examples of commands to set
|
||||||
|
/// recommended values for different operating systems:
|
||||||
|
///
|
||||||
|
/// macOS:
|
||||||
|
/// $ sudo sysctl net.inet.udp.recvspace=6000000
|
||||||
|
/// $ sudo sysctl net.inet.udp.maxdgram=500000 # Not necessary, but recommended
|
||||||
|
/// $ sudo sysctl kern.ipc.maxsockbuf=8388608 # Not necessary, but recommended
|
||||||
|
///
|
||||||
|
/// Linux:
|
||||||
|
/// $ sudo sysctl -w net.core.rmem_max=104857600
|
||||||
|
/// $ sudo sysctl -w net.core.rmem_default=104857600
|
||||||
|
pub recv_buffer: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub struct HandlerConfig {
|
||||||
|
/// Number of torrents to simulate
|
||||||
|
pub number_of_torrents: usize,
|
||||||
|
/// Maximum number of torrents to ask about in scrape requests
|
||||||
|
pub scrape_max_torrents: usize,
|
||||||
|
/// Handler: max number of responses to collect for before processing
|
||||||
|
pub max_responses_per_iter: usize,
|
||||||
|
/// Probability that a generated request is a announce request, as part
|
||||||
|
/// of sum of the various weight arguments.
|
||||||
|
pub weight_announce: usize,
|
||||||
|
/// Probability that a generated request is a scrape request, as part
|
||||||
|
/// of sum of the various weight arguments.
|
||||||
|
pub weight_scrape: usize,
|
||||||
|
/// Handler: max microseconds to wait for single response from channel
|
||||||
|
pub channel_timeout: u64,
|
||||||
|
/// Pareto shape
|
||||||
|
///
|
||||||
|
/// Fake peers choose torrents according to Pareto distribution.
|
||||||
|
pub torrent_selection_pareto_shape: f64,
|
||||||
|
/// Probability that a generated peer is a seeder
|
||||||
|
pub peer_seeder_probability: f64,
|
||||||
|
/// Part of additional request creation calculation, meaning requests
|
||||||
|
/// which are not dependent on previous responses from server. Higher
|
||||||
|
/// means more.
|
||||||
|
pub additional_request_factor: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl Default for Config {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
server_address: "127.0.0.1:3000".parse().unwrap(),
|
||||||
|
num_socket_workers: 1,
|
||||||
|
num_request_workers: 1,
|
||||||
|
duration: 0,
|
||||||
|
network: NetworkConfig::default(),
|
||||||
|
handler: HandlerConfig::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for NetworkConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
multiple_client_ips: true,
|
||||||
|
ipv6_client: false,
|
||||||
|
first_port: 45_000,
|
||||||
|
poll_timeout: 276,
|
||||||
|
poll_event_capacity: 2_877,
|
||||||
|
recv_buffer: 6_000_000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl Default for HandlerConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
number_of_torrents: 10_000,
|
||||||
|
peer_seeder_probability: 0.25,
|
||||||
|
scrape_max_torrents: 50,
|
||||||
|
weight_announce: 5,
|
||||||
|
weight_scrape: 1,
|
||||||
|
additional_request_factor: 0.4,
|
||||||
|
max_responses_per_iter: 10_000,
|
||||||
|
channel_timeout: 200,
|
||||||
|
torrent_selection_pareto_shape: 2.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone)]
|
||||||
|
pub struct TorrentPeer {
|
||||||
|
pub info_hash: InfoHash,
|
||||||
|
pub scrape_hash_indeces: Vec<usize>,
|
||||||
|
pub peer_id: PeerId,
|
||||||
|
pub port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct Statistics {
|
||||||
|
pub requests: AtomicUsize,
|
||||||
|
pub response_peers: AtomicUsize,
|
||||||
|
pub responses_announce: AtomicUsize,
|
||||||
|
pub responses_scrape: AtomicUsize,
|
||||||
|
pub responses_failure: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LoadTestState {
|
||||||
|
pub info_hashes: Arc<Vec<InfoHash>>,
|
||||||
|
pub statistics: Arc<Statistics>,
|
||||||
|
pub pareto: Arc<Pareto<f64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone, Copy)]
|
||||||
|
pub enum RequestType {
|
||||||
|
Announce,
|
||||||
|
Scrape
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct SocketWorkerLocalStatistics {
|
||||||
|
pub requests: usize,
|
||||||
|
pub response_peers: usize,
|
||||||
|
pub responses_announce: usize,
|
||||||
|
pub responses_scrape: usize,
|
||||||
|
pub responses_failure: usize,
|
||||||
|
}
|
||||||
191
aquatic_http_load_test/src/handler.rs
Normal file
191
aquatic_http_load_test/src/handler.rs
Normal file
|
|
@ -0,0 +1,191 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::vec::Drain;
|
||||||
|
|
||||||
|
use crossbeam_channel::{Receiver, Sender};
|
||||||
|
use rand::distributions::WeightedIndex;
|
||||||
|
use rand::prelude::*;
|
||||||
|
|
||||||
|
use crate::common::*;
|
||||||
|
use crate::utils::*;
|
||||||
|
|
||||||
|
|
||||||
|
pub fn run_handler_thread(
|
||||||
|
config: &Config,
|
||||||
|
state: LoadTestState,
|
||||||
|
request_senders: Vec<Sender<Request>>,
|
||||||
|
response_receiver: Receiver<(ThreadId, Response)>,
|
||||||
|
){
|
||||||
|
let state = &state;
|
||||||
|
|
||||||
|
let mut rng1 = SmallRng::from_rng(thread_rng())
|
||||||
|
.expect("create SmallRng from thread_rng()");
|
||||||
|
let mut rng2 = SmallRng::from_rng(thread_rng())
|
||||||
|
.expect("create SmallRng from thread_rng()");
|
||||||
|
|
||||||
|
let timeout = Duration::from_micros(config.handler.channel_timeout);
|
||||||
|
|
||||||
|
let mut responses = Vec::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
for i in 0..config.handler.max_responses_per_iter {
|
||||||
|
let response = if i == 0 {
|
||||||
|
match response_receiver.recv(){
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(_) => break, // Really shouldn't happen
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match response_receiver.recv_timeout(timeout){
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
responses.push(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
let requests = process_responses(
|
||||||
|
config,
|
||||||
|
&state,
|
||||||
|
&mut rng1,
|
||||||
|
responses.drain(..)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Somewhat dubious heuristic for deciding how fast to create
|
||||||
|
// and send additional requests (requests not having anything
|
||||||
|
// to do with previously sent requests)
|
||||||
|
let num_additional_to_send = {
|
||||||
|
let num_additional_requests = requests.iter()
|
||||||
|
.map(|v| v.len())
|
||||||
|
.sum::<usize>() as f64;
|
||||||
|
|
||||||
|
let num_new_requests_per_socket = num_additional_requests /
|
||||||
|
config.num_socket_workers as f64;
|
||||||
|
|
||||||
|
((num_new_requests_per_socket / 1.2) * config.handler.additional_request_factor) as usize + 10
|
||||||
|
};
|
||||||
|
|
||||||
|
for (channel_index, new_requests) in requests.into_iter().enumerate(){
|
||||||
|
let channel = &request_senders[channel_index];
|
||||||
|
|
||||||
|
for _ in 0..num_additional_to_send {
|
||||||
|
let request = create_random_request(
|
||||||
|
&config,
|
||||||
|
&state,
|
||||||
|
&mut rng2,
|
||||||
|
);
|
||||||
|
|
||||||
|
channel.send(request)
|
||||||
|
.expect("send request to channel in handler worker");
|
||||||
|
}
|
||||||
|
|
||||||
|
for request in new_requests.into_iter(){
|
||||||
|
channel.send(request)
|
||||||
|
.expect("send request to channel in handler worker");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn process_responses(
|
||||||
|
config: &Config,
|
||||||
|
state: &LoadTestState,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
responses: Drain<(ThreadId, Response)>
|
||||||
|
) -> Vec<Vec<Request>> {
|
||||||
|
let mut new_requests = Vec::with_capacity(
|
||||||
|
config.num_socket_workers as usize
|
||||||
|
);
|
||||||
|
|
||||||
|
for _ in 0..config.num_socket_workers {
|
||||||
|
new_requests.push(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (socket_thread_id, response) in responses.into_iter() {
|
||||||
|
let new_request = create_random_request(config, state, rng);
|
||||||
|
|
||||||
|
new_requests[socket_thread_id.0 as usize].push(new_request);
|
||||||
|
}
|
||||||
|
|
||||||
|
new_requests
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn create_random_request(
|
||||||
|
config: &Config,
|
||||||
|
state: &LoadTestState,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
) -> Request {
|
||||||
|
let weights = vec![
|
||||||
|
config.handler.weight_announce as u32,
|
||||||
|
config.handler.weight_scrape as u32,
|
||||||
|
];
|
||||||
|
|
||||||
|
let items = vec![
|
||||||
|
RequestType::Announce,
|
||||||
|
RequestType::Scrape,
|
||||||
|
];
|
||||||
|
|
||||||
|
let dist = WeightedIndex::new(&weights)
|
||||||
|
.expect("random request weighted index");
|
||||||
|
|
||||||
|
match items[dist.sample(rng)] {
|
||||||
|
RequestType::Announce => create_announce_request(
|
||||||
|
config,
|
||||||
|
state,
|
||||||
|
rng,
|
||||||
|
),
|
||||||
|
RequestType::Scrape => create_scrape_request(
|
||||||
|
config,
|
||||||
|
state,
|
||||||
|
rng,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn create_announce_request(
|
||||||
|
config: &Config,
|
||||||
|
state: &LoadTestState,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
) -> Request {
|
||||||
|
let (event, bytes_left) = {
|
||||||
|
if rng.gen_bool(config.handler.peer_seeder_probability) {
|
||||||
|
(AnnounceEvent::Completed, 0)
|
||||||
|
} else {
|
||||||
|
(AnnounceEvent::Started, 50)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let info_hash_index = select_info_hash_index(config, &state, rng);
|
||||||
|
|
||||||
|
Request::Announce(AnnounceRequest {
|
||||||
|
info_hash: state.info_hashes[info_hash_index],
|
||||||
|
peer_id: PeerId(rng.gen()),
|
||||||
|
bytes_left,
|
||||||
|
event,
|
||||||
|
key: None,
|
||||||
|
numwant: None,
|
||||||
|
compact: true,
|
||||||
|
port: rng.gen()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn create_scrape_request(
|
||||||
|
config: &Config,
|
||||||
|
state: &LoadTestState,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
) -> Request {
|
||||||
|
let mut scrape_hashes = Vec::new();
|
||||||
|
|
||||||
|
for _ in 0..20 {
|
||||||
|
let info_hash_index = select_info_hash_index(config, &state, rng);
|
||||||
|
|
||||||
|
scrape_hashes.push(state.info_hashes[info_hash_index]);
|
||||||
|
}
|
||||||
|
|
||||||
|
Request::Scrape(ScrapeRequest {
|
||||||
|
info_hashes: scrape_hashes,
|
||||||
|
})
|
||||||
|
}
|
||||||
204
aquatic_http_load_test/src/main.rs
Normal file
204
aquatic_http_load_test/src/main.rs
Normal file
|
|
@ -0,0 +1,204 @@
|
||||||
|
use std::net::{SocketAddr, Ipv4Addr, Ipv6Addr};
|
||||||
|
use std::thread;
|
||||||
|
use std::sync::{Arc, atomic::Ordering};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use crossbeam_channel::unbounded;
|
||||||
|
use rand::prelude::*;
|
||||||
|
use rand_distr::Pareto;
|
||||||
|
|
||||||
|
mod common;
|
||||||
|
mod handler;
|
||||||
|
mod network;
|
||||||
|
mod utils;
|
||||||
|
|
||||||
|
use common::*;
|
||||||
|
use utils::*;
|
||||||
|
use handler::create_random_request;
|
||||||
|
use network::*;
|
||||||
|
use handler::run_handler_thread;
|
||||||
|
|
||||||
|
|
||||||
|
#[global_allocator]
|
||||||
|
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||||
|
|
||||||
|
|
||||||
|
pub fn main(){
|
||||||
|
aquatic_cli_helpers::run_app_with_cli_and_config::<Config>(
|
||||||
|
"aquatic: udp bittorrent tracker: load tester",
|
||||||
|
run,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
|
if config.handler.weight_announce + config.handler.weight_scrape == 0 {
|
||||||
|
panic!("Error: at least one weight must be larger than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Starting client with config: {:#?}", config);
|
||||||
|
|
||||||
|
let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents);
|
||||||
|
|
||||||
|
for _ in 0..config.handler.number_of_torrents {
|
||||||
|
info_hashes.push(generate_info_hash());
|
||||||
|
}
|
||||||
|
|
||||||
|
let pareto = Pareto::new(
|
||||||
|
1.0,
|
||||||
|
config.handler.torrent_selection_pareto_shape
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
let state = LoadTestState {
|
||||||
|
info_hashes: Arc::new(info_hashes),
|
||||||
|
statistics: Arc::new(Statistics::default()),
|
||||||
|
pareto: Arc::new(pareto),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start socket workers
|
||||||
|
|
||||||
|
let (response_sender, response_receiver) = unbounded();
|
||||||
|
|
||||||
|
let mut request_senders = Vec::new();
|
||||||
|
|
||||||
|
for i in 0..config.num_socket_workers {
|
||||||
|
let thread_id = ThreadId(i);
|
||||||
|
let (sender, receiver) = unbounded();
|
||||||
|
let port = config.network.first_port + (i as u16);
|
||||||
|
|
||||||
|
let addr = if config.network.multiple_client_ips {
|
||||||
|
let ip = if config.network.ipv6_client { // FIXME: test ipv6
|
||||||
|
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1 + i as u16).into()
|
||||||
|
} else {
|
||||||
|
Ipv4Addr::new(127, 0, 0, 1 + i).into()
|
||||||
|
};
|
||||||
|
|
||||||
|
SocketAddr::new(ip, port)
|
||||||
|
} else {
|
||||||
|
let ip = if config.network.ipv6_client {
|
||||||
|
Ipv6Addr::LOCALHOST.into()
|
||||||
|
} else {
|
||||||
|
Ipv4Addr::LOCALHOST.into()
|
||||||
|
};
|
||||||
|
|
||||||
|
SocketAddr::new(ip, port)
|
||||||
|
};
|
||||||
|
|
||||||
|
request_senders.push(sender);
|
||||||
|
|
||||||
|
let config = config.clone();
|
||||||
|
let response_sender = response_sender.clone();
|
||||||
|
let state = state.clone();
|
||||||
|
|
||||||
|
thread::spawn(move || run_socket_thread(
|
||||||
|
state,
|
||||||
|
response_sender,
|
||||||
|
receiver,
|
||||||
|
&config,
|
||||||
|
addr,
|
||||||
|
thread_id
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
for _ in 0..config.num_request_workers {
|
||||||
|
let config = config.clone();
|
||||||
|
let state= state.clone();
|
||||||
|
let request_senders = request_senders.clone();
|
||||||
|
let response_receiver = response_receiver.clone();
|
||||||
|
|
||||||
|
thread::spawn(move || run_handler_thread(
|
||||||
|
&config,
|
||||||
|
state,
|
||||||
|
request_senders,
|
||||||
|
response_receiver,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bootstrap request cycle by adding a request to each request channel
|
||||||
|
for sender in request_senders.iter(){
|
||||||
|
let request = create_random_request(
|
||||||
|
&config,
|
||||||
|
&state,
|
||||||
|
&mut thread_rng()
|
||||||
|
);
|
||||||
|
|
||||||
|
sender.send(request.into())
|
||||||
|
.expect("bootstrap: add initial request to request queue");
|
||||||
|
}
|
||||||
|
|
||||||
|
monitor_statistics(
|
||||||
|
state,
|
||||||
|
&config
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn monitor_statistics(
|
||||||
|
state: LoadTestState,
|
||||||
|
config: &Config,
|
||||||
|
){
|
||||||
|
let start_time = Instant::now();
|
||||||
|
let mut report_avg_response_vec: Vec<f64> = Vec::new();
|
||||||
|
|
||||||
|
let interval = 5;
|
||||||
|
let interval_f64 = interval as f64;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
thread::sleep(Duration::from_secs(interval));
|
||||||
|
|
||||||
|
let statistics = state.statistics.as_ref();
|
||||||
|
|
||||||
|
let responses_announce = statistics.responses_announce
|
||||||
|
.fetch_and(0, Ordering::SeqCst) as f64;
|
||||||
|
let response_peers = statistics.response_peers
|
||||||
|
.fetch_and(0, Ordering::SeqCst) as f64;
|
||||||
|
|
||||||
|
let requests_per_second = statistics.requests
|
||||||
|
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
||||||
|
let responses_scrape_per_second = statistics.responses_scrape
|
||||||
|
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
||||||
|
let responses_failure_per_second = statistics.responses_failure
|
||||||
|
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
|
||||||
|
|
||||||
|
let responses_announce_per_second = responses_announce / interval_f64;
|
||||||
|
|
||||||
|
let responses_per_second =
|
||||||
|
responses_announce_per_second +
|
||||||
|
responses_scrape_per_second +
|
||||||
|
responses_failure_per_second;
|
||||||
|
|
||||||
|
report_avg_response_vec.push(responses_per_second);
|
||||||
|
|
||||||
|
println!();
|
||||||
|
println!("Requests out: {:.2}/second", requests_per_second);
|
||||||
|
println!("Responses in: {:.2}/second", responses_per_second);
|
||||||
|
println!(" - Announce responses: {:.2}", responses_announce_per_second);
|
||||||
|
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
|
||||||
|
println!(" - Failure responses: {:.2}", responses_failure_per_second);
|
||||||
|
println!("Peers per announce response: {:.2}", response_peers / responses_announce);
|
||||||
|
|
||||||
|
let time_elapsed = start_time.elapsed();
|
||||||
|
let duration = Duration::from_secs(config.duration as u64);
|
||||||
|
|
||||||
|
if config.duration != 0 && time_elapsed >= duration {
|
||||||
|
let report_len = report_avg_response_vec.len() as f64;
|
||||||
|
let report_sum: f64 = report_avg_response_vec.into_iter().sum();
|
||||||
|
let report_avg: f64 = report_sum / report_len;
|
||||||
|
|
||||||
|
println!(
|
||||||
|
concat!(
|
||||||
|
"\n# aquatic load test report\n\n",
|
||||||
|
"Test ran for {} seconds.\n",
|
||||||
|
"Average responses per second: {:.2}\n\nConfig: {:#?}\n"
|
||||||
|
),
|
||||||
|
time_elapsed.as_secs(),
|
||||||
|
report_avg,
|
||||||
|
config
|
||||||
|
);
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
198
aquatic_http_load_test/src/network.rs
Normal file
198
aquatic_http_load_test/src/network.rs
Normal file
|
|
@ -0,0 +1,198 @@
|
||||||
|
use std::io::Cursor;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crossbeam_channel::{Receiver, Sender};
|
||||||
|
use mio::{net::UdpSocket, Events, Poll, Interest, Token};
|
||||||
|
use socket2::{Socket, Domain, Type, Protocol};
|
||||||
|
|
||||||
|
use crate::common::*;
|
||||||
|
|
||||||
|
|
||||||
|
const MAX_PACKET_SIZE: usize = 4096;
|
||||||
|
|
||||||
|
|
||||||
|
pub fn create_socket(
|
||||||
|
config: &Config,
|
||||||
|
addr: SocketAddr
|
||||||
|
) -> ::std::net::UdpSocket {
|
||||||
|
let socket = if addr.is_ipv4(){
|
||||||
|
Socket::new(Domain::ipv4(), Type::dgram(), Some(Protocol::udp()))
|
||||||
|
} else {
|
||||||
|
Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))
|
||||||
|
}.expect("create socket");
|
||||||
|
|
||||||
|
socket.set_nonblocking(true)
|
||||||
|
.expect("socket: set nonblocking");
|
||||||
|
|
||||||
|
if config.network.recv_buffer != 0 {
|
||||||
|
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer){
|
||||||
|
eprintln!(
|
||||||
|
"socket: failed setting recv buffer to {}: {:?}",
|
||||||
|
config.network.recv_buffer,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.bind(&addr.into())
|
||||||
|
.expect(&format!("socket: bind to {}", addr));
|
||||||
|
|
||||||
|
socket.connect(&config.server_address.into())
|
||||||
|
.expect("socket: connect to server");
|
||||||
|
|
||||||
|
socket.into_udp_socket()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn run_socket_thread(
|
||||||
|
state: LoadTestState,
|
||||||
|
response_channel_sender: Sender<(ThreadId, Response)>,
|
||||||
|
request_receiver: Receiver<Request>,
|
||||||
|
config: &Config,
|
||||||
|
addr: SocketAddr,
|
||||||
|
thread_id: ThreadId
|
||||||
|
) {
|
||||||
|
let mut socket = UdpSocket::from_std(create_socket(config, addr));
|
||||||
|
let mut buffer = [0u8; MAX_PACKET_SIZE];
|
||||||
|
|
||||||
|
let token = Token(thread_id.0 as usize);
|
||||||
|
let interests = Interest::READABLE;
|
||||||
|
let timeout = Duration::from_micros(config.network.poll_timeout);
|
||||||
|
|
||||||
|
let mut poll = Poll::new().expect("create poll");
|
||||||
|
|
||||||
|
poll.registry()
|
||||||
|
.register(&mut socket, token, interests)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut events = Events::with_capacity(config.network.poll_event_capacity);
|
||||||
|
|
||||||
|
let mut statistics = SocketWorkerLocalStatistics::default();
|
||||||
|
let mut responses = Vec::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
poll.poll(&mut events, Some(timeout))
|
||||||
|
.expect("failed polling");
|
||||||
|
|
||||||
|
for event in events.iter(){
|
||||||
|
if event.token() == token {
|
||||||
|
if event.is_readable(){
|
||||||
|
read_responses(
|
||||||
|
thread_id,
|
||||||
|
&socket,
|
||||||
|
&mut buffer,
|
||||||
|
&mut statistics,
|
||||||
|
&mut responses
|
||||||
|
);
|
||||||
|
|
||||||
|
for r in responses.drain(..){
|
||||||
|
response_channel_sender.send(r)
|
||||||
|
.expect(&format!(
|
||||||
|
"add response to channel in socket worker {}",
|
||||||
|
thread_id.0
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
poll.registry()
|
||||||
|
.reregister(&mut socket, token, interests)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
send_requests(
|
||||||
|
&state,
|
||||||
|
&mut socket,
|
||||||
|
&mut buffer,
|
||||||
|
&request_receiver,
|
||||||
|
&mut statistics
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
send_requests(
|
||||||
|
&state,
|
||||||
|
&mut socket,
|
||||||
|
&mut buffer,
|
||||||
|
&request_receiver,
|
||||||
|
&mut statistics
|
||||||
|
);
|
||||||
|
|
||||||
|
state.statistics.requests
|
||||||
|
.fetch_add(statistics.requests, Ordering::SeqCst);
|
||||||
|
state.statistics.responses_announce
|
||||||
|
.fetch_add(statistics.responses_announce, Ordering::SeqCst);
|
||||||
|
state.statistics.responses_scrape
|
||||||
|
.fetch_add(statistics.responses_scrape, Ordering::SeqCst);
|
||||||
|
state.statistics.responses_failure
|
||||||
|
.fetch_add(statistics.responses_failure, Ordering::SeqCst);
|
||||||
|
state.statistics.response_peers
|
||||||
|
.fetch_add(statistics.response_peers, Ordering::SeqCst);
|
||||||
|
|
||||||
|
statistics = SocketWorkerLocalStatistics::default();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn read_responses(
|
||||||
|
thread_id: ThreadId,
|
||||||
|
socket: &UdpSocket,
|
||||||
|
buffer: &mut [u8],
|
||||||
|
ls: &mut SocketWorkerLocalStatistics,
|
||||||
|
responses: &mut Vec<(ThreadId, Response)>,
|
||||||
|
){
|
||||||
|
while let Ok(amt) = socket.recv(buffer) {
|
||||||
|
match response_from_bytes(&buffer[0..amt]){
|
||||||
|
Ok(response) => {
|
||||||
|
match response {
|
||||||
|
Response::Announce(ref r) => {
|
||||||
|
ls.responses_announce += 1;
|
||||||
|
ls.response_peers += r.peers.len();
|
||||||
|
},
|
||||||
|
Response::Scrape(_) => {
|
||||||
|
ls.responses_scrape += 1;
|
||||||
|
},
|
||||||
|
Response::Failure(_) => {
|
||||||
|
ls.responses_failure += 1;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
responses.push((thread_id, response))
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Received invalid response: {:#?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn send_requests(
|
||||||
|
state: &LoadTestState,
|
||||||
|
socket: &mut UdpSocket,
|
||||||
|
buffer: &mut [u8],
|
||||||
|
receiver: &Receiver<Request>,
|
||||||
|
statistics: &mut SocketWorkerLocalStatistics,
|
||||||
|
){
|
||||||
|
let mut cursor = Cursor::new(buffer);
|
||||||
|
|
||||||
|
while let Ok(request) = receiver.try_recv() {
|
||||||
|
cursor.set_position(0);
|
||||||
|
|
||||||
|
if let Err(err) = request_to_bytes(&mut cursor, request){
|
||||||
|
eprintln!("request_to_bytes err: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
let position = cursor.position() as usize;
|
||||||
|
let inner = cursor.get_ref();
|
||||||
|
|
||||||
|
match socket.send(&inner[..position]) {
|
||||||
|
Ok(_) => {
|
||||||
|
statistics.requests += 1;
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Couldn't send packet: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
53
aquatic_http_load_test/src/utils.rs
Normal file
53
aquatic_http_load_test/src/utils.rs
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use rand_distr::{Standard, Pareto};
|
||||||
|
use rand::prelude::*;
|
||||||
|
|
||||||
|
use crate::common::*;
|
||||||
|
|
||||||
|
|
||||||
|
pub fn select_info_hash_index(
|
||||||
|
config: &Config,
|
||||||
|
state: &LoadTestState,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
) -> usize {
|
||||||
|
pareto_usize(rng, state.pareto, config.handler.number_of_torrents - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn pareto_usize(
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
pareto: Arc<Pareto<f64>>,
|
||||||
|
max: usize,
|
||||||
|
) -> usize {
|
||||||
|
let p: f64 = pareto.sample(rng);
|
||||||
|
let p = (p.min(101.0f64) - 1.0) / 100.0;
|
||||||
|
|
||||||
|
(p * max as f64) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn generate_peer_id() -> PeerId {
|
||||||
|
PeerId(random_20_bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn generate_info_hash() -> InfoHash {
|
||||||
|
InfoHash(random_20_bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Don't use SmallRng here for now
|
||||||
|
fn random_20_bytes() -> [u8; 20] {
|
||||||
|
let mut bytes = [0; 20];
|
||||||
|
|
||||||
|
for (i, b) in rand::thread_rng()
|
||||||
|
.sample_iter(&Standard)
|
||||||
|
.enumerate()
|
||||||
|
.take(20) {
|
||||||
|
|
||||||
|
bytes[i] = b
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue