WIP: aquatic_ws: add half-crappy load test, fix infinite loop bug

This commit is contained in:
Joakim Frostegård 2020-08-01 03:35:00 +02:00
parent c7be84a61e
commit a5108f813d
17 changed files with 821 additions and 10 deletions

View file

@ -0,0 +1,26 @@
[package]
name = "aquatic_ws_load_test"
version = "0.1.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2018"
license = "Apache-2.0"
[[bin]]
name = "aquatic_ws_load_test"
[dependencies]
anyhow = "1"
aquatic_cli_helpers = { path = "../aquatic_cli_helpers" }
aquatic_ws_protocol = { path = "../aquatic_ws_protocol" }
hashbrown = { version = "0.8", features = ["serde"] }
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
rand = { version = "0.7", features = ["small_rng"] }
rand_distr = "0.2"
serde = { version = "1", features = ["derive"] }
slab = "0.4"
tungstenite = "0.11"
[dev-dependencies]
quickcheck = "0.9"
quickcheck_macros = "0.9"

View file

@ -0,0 +1,41 @@
use std::sync::{Arc, atomic::AtomicUsize};
use rand_distr::Pareto;
pub use aquatic_ws_protocol::*;
#[derive(PartialEq, Eq, Clone)]
pub struct TorrentPeer {
pub info_hash: InfoHash,
pub scrape_hash_indeces: Vec<usize>,
pub peer_id: PeerId,
pub port: u16,
}
#[derive(Default)]
pub struct Statistics {
pub requests: AtomicUsize,
pub response_peers: AtomicUsize,
pub responses_announce: AtomicUsize,
pub responses_scrape: AtomicUsize,
pub responses_failure: AtomicUsize,
pub bytes_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
}
#[derive(Clone)]
pub struct LoadTestState {
pub info_hashes: Arc<Vec<InfoHash>>,
pub statistics: Arc<Statistics>,
pub pareto: Arc<Pareto<f64>>,
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum RequestType {
Announce,
Scrape
}

View file

@ -0,0 +1,76 @@
use std::net::SocketAddr;
use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
pub server_address: SocketAddr,
pub num_workers: u8,
pub duration: usize,
pub network: NetworkConfig,
pub torrents: TorrentConfig,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
pub poll_timeout_microseconds: u64,
pub poll_event_capacity: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TorrentConfig {
pub number_of_torrents: usize,
/// Pareto shape
///
/// Fake peers choose torrents according to Pareto distribution.
pub torrent_selection_pareto_shape: f64,
/// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64,
/// Probability that a generated request is a announce request, as part
/// of sum of the various weight arguments.
pub weight_announce: usize,
/// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments.
pub weight_scrape: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
server_address: "127.0.0.1:3000".parse().unwrap(),
num_workers: 1,
duration: 0,
network: NetworkConfig::default(),
torrents: TorrentConfig::default(),
}
}
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
poll_timeout_microseconds: 1000,
poll_event_capacity: 4096,
}
}
}
impl Default for TorrentConfig {
fn default() -> Self {
Self {
number_of_torrents: 10_000,
peer_seeder_probability: 0.25,
torrent_selection_pareto_shape: 2.0,
weight_announce: 5,
weight_scrape: 0,
}
}
}

View file

@ -0,0 +1,156 @@
use std::thread;
use std::sync::{Arc, atomic::Ordering};
use std::time::{Duration, Instant};
use rand::prelude::*;
use rand_distr::Pareto;
mod common;
mod config;
mod network;
mod utils;
use common::*;
use config::*;
use network::*;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
/// Multiply bytes during a second with this to get Mbit/s
const MBITS_FACTOR: f64 = 1.0 / ((1024.0 * 1024.0) / 8.0);
pub fn main(){
aquatic_cli_helpers::run_app_with_cli_and_config::<Config>(
"aquatic: udp bittorrent tracker: load tester",
run,
)
}
fn run(config: Config) -> ::anyhow::Result<()> {
if config.torrents.weight_announce + config.torrents.weight_scrape == 0 {
panic!("Error: at least one weight must be larger than zero.");
}
println!("Starting client with config: {:#?}", config);
let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents);
let mut rng = SmallRng::from_entropy();
for _ in 0..config.torrents.number_of_torrents {
info_hashes.push(InfoHash(rng.gen()));
}
let pareto = Pareto::new(
1.0,
config.torrents.torrent_selection_pareto_shape
).unwrap();
let state = LoadTestState {
info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()),
pareto: Arc::new(pareto),
};
// Start socket workers
for _ in 0..config.num_workers {
let config = config.clone();
let state = state.clone();
thread::spawn(move || run_socket_thread(
&config,
state,
1
));
}
monitor_statistics(
state,
&config
);
Ok(())
}
fn monitor_statistics(
state: LoadTestState,
config: &Config,
){
let start_time = Instant::now();
let mut report_avg_response_vec: Vec<f64> = Vec::new();
let interval = 5;
let interval_f64 = interval as f64;
loop {
thread::sleep(Duration::from_secs(interval));
let statistics = state.statistics.as_ref();
let responses_announce = statistics.responses_announce
.fetch_and(0, Ordering::SeqCst) as f64;
// let response_peers = statistics.response_peers
// .fetch_and(0, Ordering::SeqCst) as f64;
let requests_per_second = statistics.requests
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_scrape_per_second = statistics.responses_scrape
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_failure_per_second = statistics.responses_failure
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let bytes_sent_per_second = statistics.bytes_sent
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let bytes_received_per_second = statistics.bytes_received
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_announce_per_second = responses_announce / interval_f64;
let responses_per_second =
responses_announce_per_second +
responses_scrape_per_second +
responses_failure_per_second;
report_avg_response_vec.push(responses_per_second);
println!();
println!("Requests out: {:.2}/second", requests_per_second);
println!("Responses in: {:.2}/second", responses_per_second);
println!(" - Announce responses: {:.2}", responses_announce_per_second);
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
println!(" - Failure responses: {:.2}", responses_failure_per_second);
//println!("Peers per announce response: {:.2}", response_peers / responses_announce);
println!("Bandwidth out: {:.2}Mbit/s", bytes_sent_per_second * MBITS_FACTOR);
println!("Bandwidth in: {:.2}Mbit/s", bytes_received_per_second * MBITS_FACTOR);
let time_elapsed = start_time.elapsed();
let duration = Duration::from_secs(config.duration as u64);
if config.duration != 0 && time_elapsed >= duration {
let report_len = report_avg_response_vec.len() as f64;
let report_sum: f64 = report_avg_response_vec.into_iter().sum();
let report_avg: f64 = report_sum / report_len;
println!(
concat!(
"\n# aquatic load test report\n\n",
"Test ran for {} seconds.\n",
"Average responses per second: {:.2}\n\nConfig: {:#?}\n"
),
time_elapsed.as_secs(),
report_avg,
config
);
break
}
}
}

View file

@ -0,0 +1,323 @@
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::io::{Read, Write, ErrorKind, Cursor};
use hashbrown::HashMap;
use mio::{net::TcpStream, Events, Poll, Interest, Token};
use rand::{rngs::SmallRng, prelude::*};
use tungstenite::{WebSocket, HandshakeError, ClientHandshake, handshake::MidHandshake};
use crate::common::*;
use crate::config::*;
use crate::utils::create_random_request;
type HandshakeResult<T> = std::result::Result<(tungstenite::protocol::WebSocket<TcpStream>, T), tungstenite::handshake::HandshakeError<tungstenite::handshake::client::ClientHandshake<TcpStream>>>;
pub enum ConnectionState {
TcpStream(TcpStream),
WebSocket(WebSocket<TcpStream>),
MidHandshake(MidHandshake<ClientHandshake<TcpStream>>)
}
impl ConnectionState {
fn advance(self, config: &Config) -> Option<Self> {
match self {
Self::TcpStream(stream) => {
let req = format!(
"ws://{}:{}",
config.server_address.ip(),
config.server_address.port()
);
match ::tungstenite::client(req, stream){
Ok((ws, _)) => {
Some(ConnectionState::WebSocket(ws))
},
Err(HandshakeError::Interrupted(handshake)) => {
Some(ConnectionState::MidHandshake(handshake))
},
Err(HandshakeError::Failure(err)) => {
eprintln!("handshake error: {:?}", err);
None
}
}
},
Self::MidHandshake(handshake) => {
match handshake.handshake() {
Ok((ws, _)) => {
Some(ConnectionState::WebSocket(ws))
},
Err(HandshakeError::Interrupted(handshake)) => {
Some(ConnectionState::MidHandshake(handshake))
},
Err(HandshakeError::Failure(err)) => {
eprintln!("handshake error: {:?}", err);
None
}
}
},
Self::WebSocket(ws) => Some(Self::WebSocket(ws)),
}
}
}
pub struct Connection {
stream: ConnectionState,
can_send_initial: bool,
marked_as_complete: bool
}
impl Connection {
pub fn create_and_register(
config: &Config,
connections: &mut ConnectionMap,
poll: &mut Poll,
token_counter: &mut usize,
) -> anyhow::Result<()> {
let mut stream = TcpStream::connect(config.server_address)?;
poll.registry()
.register(&mut stream, Token(*token_counter), Interest::READABLE)
.unwrap();
let connection = Connection {
stream: ConnectionState::TcpStream(stream),
can_send_initial: false,
marked_as_complete: false,
};
connections.insert(*token_counter, connection);
*token_counter = *token_counter + 1;
Ok(())
}
pub fn advance(self, config: &Config) -> Option<Self> {
if let Some(stream) = self.stream.advance(config){
Some(Self {
stream,
can_send_initial: self.can_send_initial,
marked_as_complete: false,
})
} else {
None
}
}
pub fn read_response_and_send_request(
&mut self,
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
){
if let ConnectionState::WebSocket(ref mut ws) = self.stream {
loop {
match ws.read_message(){
Ok(message) => {
Self::register_response_type(state, message);
break;
},
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {
self.can_send_initial = false;
eprintln!("handle_read_event error would block: {}", err);
return;
},
Err(err) => {
eprintln!("handle_read_event error: {}", err);
return;
}
}
};
self.send_request(
config,
state,
rng,
);
}
}
fn register_response_type(
state: &LoadTestState,
message: ::tungstenite::Message,
){
state.statistics.responses_announce.fetch_add(1, Ordering::SeqCst); // FIXME
}
pub fn send_request(
&mut self,
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
){
if let ConnectionState::WebSocket(ref mut ws) = self.stream {
let request = create_random_request(
&config,
&state,
rng
);
let message = request.to_ws_message();
match ws.write_message(message){
Ok(_) => {
state.statistics.requests.fetch_add(1, Ordering::SeqCst);
},
Err(err) => {
eprintln!("send request error: {}", err);
}
}
self.can_send_initial = false;
} else {
println!("send request can't send to non-ws stream");
}
}
}
pub type ConnectionMap = HashMap<usize, Connection>;
const NUM_CONNECTIONS: usize = 5;
const CREATE_CONN_INTERVAL: usize = 2 ^ 30;
pub fn run_socket_thread(
config: &Config,
state: LoadTestState,
num_initial_requests: usize,
) {
let timeout = Duration::from_micros(config.network.poll_timeout_microseconds);
let mut connections: ConnectionMap = HashMap::with_capacity(NUM_CONNECTIONS);
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut rng = SmallRng::from_entropy();
let mut token_counter = 0usize;
for _ in 0..num_initial_requests {
Connection::create_and_register(
config,
&mut connections,
&mut poll,
&mut token_counter,
).unwrap();
}
println!("num connections in map: {}", connections.len());
let mut initial_sent = false;
let mut iter_counter = 0usize;
let mut num_completed = 0usize;
loop {
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for event in events.iter(){
if event.is_readable(){
let token = event.token();
let mut run_advance = false;
if let Some(connection) = connections.get_mut(&token.0){
if let ConnectionState::WebSocket(_) = connection.stream {
connection.read_response_and_send_request(
config,
&state,
&mut rng,
);
} else {
run_advance = true;
println!("set run_advance=true");
}
} else {
eprintln!("connection not found: {:?}", token);
}
if run_advance {
let connection = connections.remove(&token.0).unwrap();
if let Some(connection) = connection.advance(config){
println!("advanced connection");
connections.insert(token.0, connection);
}
}
}
}
if num_completed != token_counter {
for k in 0..token_counter {
if let Some(mut connection) = connections.remove(&k){
if let ConnectionState::WebSocket(_) = connection.stream {
if !connection.marked_as_complete {
connection.can_send_initial = true;
connection.marked_as_complete = true;
initial_sent = false;
num_completed += 1;
}
connections.insert(k, connection);
} else {
if let Some(connection) = connection.advance(config){
connections.insert(k, connection);
}
}
} else {
// println!("connection not found for token {}", k);
}
}
}
if !initial_sent {
for (_, connection) in connections.iter_mut(){
if connection.can_send_initial {
connection.send_request(
config,
&state,
&mut rng,
);
initial_sent = true;
}
}
}
// Slowly create new connections
if token_counter < NUM_CONNECTIONS && iter_counter % CREATE_CONN_INTERVAL == 0 {
let res = Connection::create_and_register(
config,
&mut connections,
&mut poll,
&mut token_counter,
);
if let Err(err) = res {
eprintln!("create connection error: {}", err);
}
// initial_sent = false;
}
iter_counter = iter_counter.wrapping_add(1);
}
}

View file

@ -0,0 +1,114 @@
use std::sync::Arc;
use rand::distributions::WeightedIndex;
use rand_distr::Pareto;
use rand::prelude::*;
use crate::common::*;
use crate::config::*;
pub fn create_random_request(
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
) -> InMessage {
let weights = [
config.torrents.weight_announce as u32,
config.torrents.weight_scrape as u32,
];
let items = [
RequestType::Announce,
RequestType::Scrape,
];
let dist = WeightedIndex::new(&weights)
.expect("random request weighted index");
match items[dist.sample(rng)] {
RequestType::Announce => create_announce_request(
config,
state,
rng,
),
RequestType::Scrape => create_scrape_request(
config,
state,
rng,
)
}
}
#[inline]
fn create_announce_request(
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
) -> InMessage {
let (event, bytes_left) = {
if rng.gen_bool(config.torrents.peer_seeder_probability) {
(AnnounceEvent::Completed, 0)
} else {
(AnnounceEvent::Started, 50)
}
};
let info_hash_index = select_info_hash_index(config, &state, rng);
InMessage::AnnounceRequest(AnnounceRequest {
info_hash: state.info_hashes[info_hash_index],
peer_id: PeerId(rng.gen()),
bytes_left: Some(bytes_left),
event,
numwant: None,
offers: None, // FIXME
answer: None,
to_peer_id: None,
offer_id: None,
})
}
#[inline]
fn create_scrape_request(
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
) -> InMessage {
let mut scrape_hashes = Vec::with_capacity(5);
for _ in 0..5 {
let info_hash_index = select_info_hash_index(config, &state, rng);
scrape_hashes.push(state.info_hashes[info_hash_index]);
}
InMessage::ScrapeRequest(ScrapeRequest {
info_hashes: scrape_hashes,
})
}
#[inline]
fn select_info_hash_index(
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
) -> usize {
pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1)
}
#[inline]
fn pareto_usize(
rng: &mut impl Rng,
pareto: &Arc<Pareto<f64>>,
max: usize,
) -> usize {
let p: f64 = pareto.sample(rng);
let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize
}