add aquatic_load_test for benchmarking aquatic over the network

This commit is contained in:
Joakim Frostegård 2020-04-19 10:21:12 +02:00
parent eb3ba6a2ce
commit f6ed47fec7
9 changed files with 1108 additions and 0 deletions

19
Cargo.lock generated
View file

@ -61,6 +61,25 @@ dependencies = [
"serde",
]
[[package]]
name = "aquatic_load_test"
version = "0.1.0"
dependencies = [
"bittorrent_udp",
"cli_helpers",
"crossbeam-channel",
"hashbrown",
"mimalloc",
"mio",
"net2",
"parking_lot",
"quickcheck",
"quickcheck_macros",
"rand",
"rand_distr",
"serde",
]
[[package]]
name = "arrayvec"
version = "0.4.12"

View file

@ -3,6 +3,7 @@
members = [
"aquatic",
"aquatic_bench",
"aquatic_load_test",
"bittorrent_udp",
"cli_helpers",
]

View file

@ -0,0 +1,25 @@
[package]
name = "aquatic_load_test"
version = "0.1.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2018"
[[bin]]
name = "aquatic_load_test"
[dependencies]
bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" }
crossbeam-channel = "0.4"
hashbrown = "0.7"
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
net2 = "0.2"
parking_lot = "0.10"
rand = { version = "0.7", features = ["small_rng"] }
rand_distr = "0.2"
serde = { version = "1", features = ["derive"] }
[dev-dependencies]
quickcheck = "0.9"
quickcheck_macros = "0.9"

View file

@ -0,0 +1,197 @@
use std::net::SocketAddr;
use std::sync::{Arc, atomic::AtomicUsize};
use hashbrown::HashMap;
use parking_lot::Mutex;
use serde::{Serialize, Deserialize};
use bittorrent_udp::types::*;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct ThreadId(pub u8);
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/// Server address
pub server_address: SocketAddr,
/// Number of sockets and socket worker threads
///
/// Sockets will bind to one port each, and with
/// multiple_client_ips = true, additionally to one IP each.
pub num_socket_workers: u8,
/// Number of workers generating requests from responses, as well as
/// requests not connected to previous ones.
pub num_request_workers: usize,
/// Run duration (quit and generate report after this many seconds)
pub duration: usize,
pub network: NetworkConfig,
pub handler: HandlerConfig,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
/// True means bind to one localhost IP per socket. On macOS, this by
/// default causes all server responses to go to one socket worker.
/// Default option ("true") can cause issues on macOS.
///
/// The point of multiple IPs is to possibly cause a better distribution
/// of requests to servers with SO_REUSEPORT option.
pub multiple_client_ips: bool,
/// Use Ipv6 only
pub ipv6_client: bool,
/// Number of first client port
pub first_port: u16,
/// Socket worker poll timeout in microseconds
pub poll_timeout: u64,
/// Socket worker polling event number
pub poll_event_capacity: usize,
/// Size of socket recv buffer. Use 0 for OS default.
///
/// This setting can have a big impact on dropped packages. It might
/// require changing system defaults. Some examples of commands to set
/// recommended values for different operating systems:
///
/// macOS:
/// $ sudo sysctl net.inet.udp.recvspace=6000000
/// $ sudo sysctl net.inet.udp.maxdgram=500000 # Not necessary, but recommended
/// $ sudo sysctl kern.ipc.maxsockbuf=8388608 # Not necessary, but recommended
///
/// Linux:
/// $ sudo sysctl -w net.core.rmem_max=104857600
/// $ sudo sysctl -w net.core.rmem_default=104857600
pub recv_buffer: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct HandlerConfig {
/// Number of torrents to simulate
pub number_of_torrents: usize,
/// Maximum number of torrents to ask about in scrape requests
pub scrape_max_torrents: usize,
/// Handler: max number of responses to collect for before processing
pub max_responses_per_iter: usize,
/// Probability that a generated request is a connect request as part
/// of sum of the various weight arguments.
pub weight_connect: usize,
/// Probability that a generated request is a announce request, as part
/// of sum of the various weight arguments.
pub weight_announce: usize,
/// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments.
pub weight_scrape: usize,
/// Handler: max microseconds to wait for single response from channel
pub channel_timeout: u64,
/// Pareto shape
///
/// Fake peers choose torrents according to Pareto distribution.
pub torrent_selection_pareto_shape: f64,
/// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64,
/// Part of additional request creation calculation, meaning requests
/// which are not dependent on previous responses from server. Higher
/// means more.
pub additional_request_factor: f64,
}
impl Default for Config {
fn default() -> Self {
Self {
server_address: "127.0.0.1:3000".parse().unwrap(),
num_socket_workers: 1,
num_request_workers: 1,
duration: 0,
network: NetworkConfig::default(),
handler: HandlerConfig::default(),
}
}
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
multiple_client_ips: true,
ipv6_client: false,
first_port: 45_000,
poll_timeout: 276,
poll_event_capacity: 2_877,
recv_buffer: 6_000_000,
}
}
}
impl Default for HandlerConfig {
fn default() -> Self {
Self {
number_of_torrents: 10_000,
peer_seeder_probability: 0.25,
scrape_max_torrents: 50,
weight_connect: 0,
weight_announce: 1,
weight_scrape: 1,
additional_request_factor: 0.4,
max_responses_per_iter: 10_000,
channel_timeout: 200,
torrent_selection_pareto_shape: 2.0,
}
}
}
#[derive(PartialEq, Eq, Clone)]
pub struct TorrentPeer {
pub info_hash: InfoHash,
pub scrape_hash_indeces: Vec<usize>,
pub connection_id: ConnectionId,
pub peer_id: PeerId,
pub port: Port,
}
pub type TorrentPeerMap = HashMap<TransactionId, TorrentPeer>;
#[derive(Default)]
pub struct Statistics {
pub requests: AtomicUsize,
pub response_peers: AtomicUsize,
pub responses_connect: AtomicUsize,
pub responses_announce: AtomicUsize,
pub responses_scrape: AtomicUsize,
pub responses_error: AtomicUsize,
}
#[derive(Clone)]
pub struct LoadTestState {
pub torrent_peers: Arc<Mutex<TorrentPeerMap>>,
pub info_hashes: Arc<Vec<InfoHash>>,
pub statistics: Arc<Statistics>,
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum RequestType {
Announce,
Connect,
Scrape
}
#[derive(Default)]
pub struct SocketWorkerLocalStatistics {
pub requests: usize,
pub response_peers: usize,
pub responses_connect: usize,
pub responses_announce: usize,
pub responses_scrape: usize,
pub responses_error: usize,
}

View file

@ -0,0 +1,345 @@
use std::sync::Arc;
use std::time::Duration;
use std::vec::Drain;
use crossbeam_channel::{Receiver, Sender};
use parking_lot::MutexGuard;
use rand::distributions::WeightedIndex;
use rand::prelude::*;
use rand_distr::Pareto;
use bittorrent_udp::types::*;
use crate::common::*;
use crate::utils::*;
pub fn run_handler_thread(
config: &Config,
state: LoadTestState,
pareto: Pareto<f64>,
request_senders: Vec<Sender<Request>>,
response_receiver: Receiver<(ThreadId, Response)>,
){
let state = &state;
let mut rng1 = SmallRng::from_rng(thread_rng())
.expect("create SmallRng from thread_rng()");
let mut rng2 = SmallRng::from_rng(thread_rng())
.expect("create SmallRng from thread_rng()");
let timeout = Duration::from_micros(config.handler.channel_timeout);
let mut responses = Vec::new();
loop {
let mut opt_torrent_peers = None;
// Collect a maximum number of responses. Stop collecting before that
// number is reached if having waited for too long for a request, but
// only if ConnectionMap mutex isn't locked.
for i in 0..config.handler.max_responses_per_iter {
let response = if i == 0 {
match response_receiver.recv(){
Ok(r) => r,
Err(_) => break, // Really shouldn't happen
}
} else {
match response_receiver.recv_timeout(timeout){
Ok(r) => r,
Err(_) => {
if let Some(guard) = state.torrent_peers.try_lock(){
opt_torrent_peers = Some(guard);
break
} else {
continue
}
},
}
};
responses.push(response);
}
let mut torrent_peers: MutexGuard<TorrentPeerMap> = opt_torrent_peers
.unwrap_or_else(|| state.torrent_peers.lock());
let requests = process_responses(
&mut rng1,
pareto,
&state.info_hashes,
config,
&mut torrent_peers,
responses.drain(..)
);
// Somewhat dubious heuristic for deciding how fast to create
// and send additional requests (requests not having anything
// to do with previously sent requests)
let num_additional_to_send = {
let num_additional_requests = requests.iter()
.map(|v| v.len())
.sum::<usize>() as f64;
let num_new_requests_per_socket = num_additional_requests /
config.num_socket_workers as f64;
((num_new_requests_per_socket / 1.2) * config.handler.additional_request_factor) as usize + 10
};
for (channel_index, new_requests) in requests.into_iter().enumerate(){
let channel = &request_senders[channel_index];
for _ in 0..num_additional_to_send {
let request = create_connect_request(
generate_transaction_id(&mut rng2)
);
channel.send(request)
.expect("send request to channel in handler worker");
}
for request in new_requests.into_iter(){
channel.send(request)
.expect("send request to channel in handler worker");
}
}
}
}
fn process_responses(
rng: &mut impl Rng,
pareto: Pareto<f64>,
info_hashes: &Arc<Vec<InfoHash>>,
config: &Config,
torrent_peers: &mut TorrentPeerMap,
responses: Drain<(ThreadId, Response)>
) -> Vec<Vec<Request>> {
let mut new_requests = Vec::with_capacity(
config.num_socket_workers as usize
);
for _ in 0..config.num_socket_workers {
new_requests.push(Vec::new());
}
for (socket_thread_id, response) in responses.into_iter() {
let opt_request = process_response(
rng,
pareto,
info_hashes,
&config,
torrent_peers,
response
);
if let Some(new_request) = opt_request {
new_requests[socket_thread_id.0 as usize].push(new_request);
}
}
new_requests
}
fn process_response(
rng: &mut impl Rng,
pareto: Pareto<f64>,
info_hashes: &Arc<Vec<InfoHash>>,
config: &Config,
torrent_peers: &mut TorrentPeerMap,
response: Response
) -> Option<Request> {
match response {
Response::Connect(r) => {
// Fetch the torrent peer or create it if is doesn't exists. Update
// the connection id if fetched. Create a request and move the
// torrent peer appropriately.
let torrent_peer = torrent_peers.remove(&r.transaction_id)
.map(|mut torrent_peer| {
torrent_peer.connection_id = r.connection_id;
torrent_peer
})
.unwrap_or_else(|| {
create_torrent_peer(
config,
rng,
pareto,
info_hashes,
r.connection_id
).to_owned()
});
let new_transaction_id = generate_transaction_id(rng);
let request = create_random_request(
config,
rng,
info_hashes,
new_transaction_id,
&torrent_peer
);
torrent_peers.insert(new_transaction_id, torrent_peer);
Some(request)
},
Response::Announce(r) => {
return if_torrent_peer_move_and_create_random_request(
config,
rng,
info_hashes,
torrent_peers,
r.transaction_id
);
},
Response::Scrape(r) => {
return if_torrent_peer_move_and_create_random_request(
config,
rng,
info_hashes,
torrent_peers,
r.transaction_id
);
},
Response::Error(r) => {
if !r.message.to_lowercase().contains("connection"){
eprintln!("Received error response which didn't contain the word 'connection': {}", r.message);
}
if let Some(torrent_peer) = torrent_peers.remove(&r.transaction_id){
let new_transaction_id = generate_transaction_id(rng);
torrent_peers.insert(new_transaction_id, torrent_peer);
Some(create_connect_request(new_transaction_id))
} else {
Some(create_connect_request(generate_transaction_id(rng)))
}
}
}
}
fn if_torrent_peer_move_and_create_random_request(
config: &Config,
rng: &mut impl Rng,
info_hashes: &Arc<Vec<InfoHash>>,
torrent_peers: &mut TorrentPeerMap,
transaction_id: TransactionId,
) -> Option<Request> {
if let Some(torrent_peer) = torrent_peers.remove(&transaction_id){
let new_transaction_id = generate_transaction_id(rng);
let request = create_random_request(
config,
rng,
info_hashes,
new_transaction_id,
&torrent_peer
);
torrent_peers.insert(new_transaction_id, torrent_peer);
Some(request)
} else {
None
}
}
fn create_random_request(
config: &Config,
rng: &mut impl Rng,
info_hashes: &Arc<Vec<InfoHash>>,
transaction_id: TransactionId,
torrent_peer: &TorrentPeer
) -> Request {
let weights = vec![
config.handler.weight_announce as u32,
config.handler.weight_connect as u32,
config.handler.weight_scrape as u32,
];
let items = vec![
RequestType::Announce,
RequestType::Connect,
RequestType::Scrape,
];
let dist = WeightedIndex::new(&weights)
.expect("random request weighted index");
match items[dist.sample(rng)] {
RequestType::Announce => create_announce_request(
config,
rng,
torrent_peer,
transaction_id
),
RequestType::Connect => create_connect_request(transaction_id),
RequestType::Scrape => create_scrape_request(
&info_hashes,
torrent_peer,
transaction_id
)
}
}
fn create_announce_request(
config: &Config,
rng: &mut impl Rng,
torrent_peer: &TorrentPeer,
transaction_id: TransactionId,
) -> Request {
let (event, bytes_left) = {
if rng.gen_bool(config.handler.peer_seeder_probability) {
(AnnounceEvent::Completed, NumberOfBytes(0))
} else {
(AnnounceEvent::Started, NumberOfBytes(50))
}
};
(AnnounceRequest {
connection_id: torrent_peer.connection_id,
transaction_id,
info_hash: torrent_peer.info_hash,
peer_id: torrent_peer.peer_id,
bytes_downloaded: NumberOfBytes(50),
bytes_uploaded: NumberOfBytes(50),
bytes_left,
event,
ip_address: None,
key: PeerKey(12345),
peers_wanted: NumberOfPeers(100),
port: torrent_peer.port
}).into()
}
fn create_scrape_request(
info_hashes: &Arc<Vec<InfoHash>>,
torrent_peer: &TorrentPeer,
transaction_id: TransactionId,
) -> Request {
let indeces = &torrent_peer.scrape_hash_indeces;
let mut scape_hashes = Vec::with_capacity(indeces.len());
for i in indeces {
scape_hashes.push(info_hashes[*i].to_owned())
}
(ScrapeRequest {
connection_id: torrent_peer.connection_id,
transaction_id,
info_hashes: scape_hashes,
}).into()
}

View file

@ -0,0 +1,206 @@
use std::net::{SocketAddr, Ipv4Addr, Ipv6Addr};
use std::thread;
use std::sync::{Arc, atomic::Ordering};
use std::time::{Duration, Instant};
use crossbeam_channel::unbounded;
use hashbrown::HashMap;
use parking_lot::Mutex;
use rand::prelude::*;
use rand_distr::Pareto;
mod common;
mod handler;
mod network;
mod utils;
use common::*;
use utils::*;
use network::*;
use handler::run_handler_thread;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
pub fn main(){
cli_helpers::run_app_with_cli_and_config::<Config>(
"aquatic: udp bittorrent tracker: load tester",
run,
)
}
fn run(config: Config){
if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 {
panic!("Error: at least one weight must be larger than zero.");
}
println!("Starting client with config: {:#?}", config);
let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents);
for _ in 0..config.handler.number_of_torrents {
info_hashes.push(generate_info_hash());
}
let state = LoadTestState {
torrent_peers: Arc::new(Mutex::new(HashMap::new())),
info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()),
};
let pareto = Pareto::new(
1.0,
config.handler.torrent_selection_pareto_shape
).unwrap();
// Start socket workers
let (response_sender, response_receiver) = unbounded();
let mut request_senders = Vec::new();
for i in 0..config.num_socket_workers {
let thread_id = ThreadId(i);
let (sender, receiver) = unbounded();
let port = config.network.first_port + (i as u16);
let addr = if config.network.multiple_client_ips {
let ip = if config.network.ipv6_client { // FIXME: test ipv6
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1 + i as u16).into()
} else {
Ipv4Addr::new(127, 0, 0, 1 + i).into()
};
SocketAddr::new(ip, port)
} else {
let ip = if config.network.ipv6_client {
Ipv6Addr::LOCALHOST.into()
} else {
Ipv4Addr::LOCALHOST.into()
};
SocketAddr::new(ip, port)
};
request_senders.push(sender);
let config = config.clone();
let response_sender = response_sender.clone();
let state = state.clone();
thread::spawn(move || run_socket_thread(
state,
response_sender,
receiver,
&config,
addr,
thread_id
));
}
for _ in 0..config.num_request_workers {
let config = config.clone();
let state= state.clone();
let request_senders = request_senders.clone();
let response_receiver = response_receiver.clone();
thread::spawn(move || run_handler_thread(
&config,
state,
pareto,
request_senders,
response_receiver,
));
}
// Bootstrap request cycle by adding a request to each request channel
for sender in request_senders.iter(){
let request = create_connect_request(
generate_transaction_id(&mut thread_rng())
);
sender.send(request.into())
.expect("bootstrap: add initial request to request queue");
}
monitor_statistics(
state,
&config
)
}
fn monitor_statistics(
state: LoadTestState,
config: &Config,
){
let start_time = Instant::now();
let mut report_avg_response_vec: Vec<f64> = Vec::new();
let interval = 5;
let interval_f64 = interval as f64;
loop {
thread::sleep(Duration::from_secs(interval));
let statistics = state.statistics.as_ref();
let responses_announce = statistics.responses_announce
.fetch_and(0, Ordering::SeqCst) as f64;
let response_peers = statistics.response_peers
.fetch_and(0, Ordering::SeqCst) as f64;
let requests_per_second = statistics.requests
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_connect_per_second = statistics.responses_connect
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_scrape_per_second = statistics.responses_scrape
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_error_per_second = statistics.responses_error
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_announce_per_second = responses_announce / interval_f64;
let responses_per_second =
responses_connect_per_second +
responses_announce_per_second +
responses_scrape_per_second +
responses_error_per_second;
report_avg_response_vec.push(responses_per_second * interval_f64);
println!();
println!("Requests out: {:.2}/second", requests_per_second);
println!("Responses in: {:.2}/second", responses_per_second);
println!(" - Connect responses: {:.2}", responses_connect_per_second);
println!(" - Announce responses: {:.2}", responses_announce_per_second);
println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
println!(" - Error responses: {:.2}", responses_error_per_second);
println!("Peers per announce response: {:.2}", response_peers / responses_announce);
let time_elapsed = start_time.elapsed();
let duration = Duration::from_secs(config.duration as u64);
if config.duration != 0 && time_elapsed >= duration {
let report_len = report_avg_response_vec.len() as f64;
let report_sum: f64 = report_avg_response_vec.into_iter().sum();
let report_avg: f64 = report_sum / report_len;
println!(
concat!(
"\n# aquatic load test report\n\n",
"Test ran for {} seconds.\n",
"Average responses per second: {:.2}\n\nConfig: {:#?}\n"
),
time_elapsed.as_secs(),
report_avg,
config
);
break
}
}
}

View file

@ -0,0 +1,216 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crossbeam_channel::{Receiver, Sender};
use mio::{net::UdpSocket, Events, Poll, Interest, Token};
use net2::{UdpSocketExt, UdpBuilder};
use bittorrent_udp::converters::*;
use bittorrent_udp::types::*;
use crate::common::*;
const MAX_PACKET_SIZE: usize = 4096;
pub fn create_socket(
config: &Config,
addr: SocketAddr
) -> ::std::net::UdpSocket {
let builder = &{
if addr.is_ipv4(){
UdpBuilder::new_v4().expect("socket: build")
} else {
UdpBuilder::new_v6().expect("socket: build")
}
};
let socket = builder.bind(&addr)
.expect(&format!("socket: bind to {}", addr));
socket.set_nonblocking(true)
.expect("socket: set nonblocking");
if config.network.recv_buffer != 0 {
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer){
eprintln!(
"socket: failed setting recv buffer to {}: {:?}",
config.network.recv_buffer,
err
);
}
}
socket.connect(config.server_address)
.expect("socket: connect to server");
socket
}
pub fn run_socket_thread(
state: LoadTestState,
response_channel_sender: Sender<(ThreadId, Response)>,
request_receiver: Receiver<Request>,
config: &Config,
addr: SocketAddr,
thread_id: ThreadId
) {
let mut socket = UdpSocket::from_std(create_socket(config, addr));
let mut buffer = [0u8; MAX_PACKET_SIZE];
let ip_version = match config.server_address {
SocketAddr::V4(_) => IpVersion::IPv4,
SocketAddr::V6(_) => IpVersion::IPv6,
};
let token = Token(thread_id.0 as usize);
let interests = Interest::READABLE;
let timeout = Duration::from_micros(config.network.poll_timeout);
let mut poll = Poll::new().expect("create poll");
poll.registry()
.register(&mut socket, token, interests)
.unwrap();
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut local_state = SocketWorkerLocalStatistics::default();
let mut responses = Vec::new();
loop {
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for event in events.iter(){
if event.token() == token {
if event.is_readable(){
read_responses(
thread_id,
ip_version,
&socket,
&mut buffer,
&mut local_state,
&mut responses
);
for r in responses.drain(..){
response_channel_sender.send(r)
.expect(&format!(
"add response to channel in socket worker {}",
thread_id.0
));
}
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
}
}
send_requests(
&state,
&mut socket,
&mut buffer,
&request_receiver,
&mut local_state
);
}
send_requests(
&state,
&mut socket,
&mut buffer,
&request_receiver,
&mut local_state
);
}
}
fn read_responses(
thread_id: ThreadId,
ip_version: IpVersion,
socket: &UdpSocket,
buffer: &mut [u8],
ls: &mut SocketWorkerLocalStatistics,
responses: &mut Vec<(ThreadId, Response)>,
){
while let Ok(amt) = socket.recv(buffer) {
match response_from_bytes(&buffer[0..amt], ip_version){
Ok(response) => {
match response {
Response::Announce(ref r) => {
ls.responses_announce += 1;
ls.response_peers += r.peers.len();
},
Response::Scrape(_) => {
ls.responses_scrape += 1;
},
Response::Connect(_) => {
ls.responses_connect += 1;
},
Response::Error(_) => {
ls.responses_error += 1;
},
}
responses.push((thread_id, response))
},
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
}
}
}
fn send_requests(
state: &LoadTestState,
socket: &mut UdpSocket,
buffer: &mut [u8],
receiver: &Receiver<Request>,
statistics: &mut SocketWorkerLocalStatistics,
){
let mut cursor = Cursor::new(buffer);
while let Ok(request) = receiver.try_recv() {
cursor.set_position(0);
if let Err(err) = request_to_bytes(&mut cursor, request){
eprintln!("request_to_bytes err: {}", err);
}
let position = cursor.position() as usize;
let inner = cursor.get_ref();
match socket.send(&inner[..position]) {
Ok(_) => {
statistics.requests += 1;
},
Err(err) => {
eprintln!("Couldn't send packet: {:?}", err);
}
}
}
state.statistics.requests
.fetch_add(statistics.requests, Ordering::SeqCst);
state.statistics.responses_connect
.fetch_add(statistics.responses_connect, Ordering::SeqCst);
state.statistics.responses_announce
.fetch_add(statistics.responses_announce, Ordering::SeqCst);
state.statistics.responses_scrape
.fetch_add(statistics.responses_scrape, Ordering::SeqCst);
state.statistics.responses_error
.fetch_add(statistics.responses_error, Ordering::SeqCst);
state.statistics.response_peers
.fetch_add(statistics.response_peers, Ordering::SeqCst);
*statistics = SocketWorkerLocalStatistics::default();
}

View file

@ -0,0 +1,94 @@
use std::sync::Arc;
use rand_distr::{Standard, Pareto};
use rand::prelude::*;
use bittorrent_udp::types::*;
use crate::common::*;
pub fn create_torrent_peer(
config: &Config,
rng: &mut impl Rng,
pareto: Pareto<f64>,
info_hashes: &Arc<Vec<InfoHash>>,
connection_id: ConnectionId
) -> TorrentPeer {
let num_scape_hashes = rng.gen_range(1, config.handler.scrape_max_torrents);
let mut scrape_hash_indeces = Vec::new();
for _ in 0..num_scape_hashes {
scrape_hash_indeces.push(
select_info_hash_index(config, rng, pareto)
)
}
let info_hash_index = select_info_hash_index(config, rng, pareto);
TorrentPeer {
info_hash: info_hashes[info_hash_index],
scrape_hash_indeces,
connection_id,
peer_id: generate_peer_id(),
port: Port(rand::random())
}
}
fn select_info_hash_index(
config: &Config,
rng: &mut impl Rng,
pareto: Pareto<f64>,
) -> usize {
pareto_usize(rng, pareto, config.handler.number_of_torrents - 1)
}
pub fn pareto_usize(
rng: &mut impl Rng,
pareto: Pareto<f64>,
max: usize,
) -> usize {
let p: f64 = rng.sample(pareto);
let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize
}
pub fn generate_peer_id() -> PeerId {
PeerId(random_20_bytes())
}
pub fn generate_info_hash() -> InfoHash {
InfoHash(random_20_bytes())
}
pub fn generate_transaction_id(rng: &mut impl Rng) -> TransactionId {
TransactionId(rng.gen())
}
pub fn create_connect_request(transaction_id: TransactionId) -> Request {
(ConnectRequest { transaction_id }).into()
}
// Don't use SmallRng here for now
fn random_20_bytes() -> [u8; 20] {
let mut bytes = [0; 20];
for (i, b) in rand::thread_rng()
.sample_iter(&Standard)
.enumerate()
.take(20) {
bytes[i] = b
}
bytes
}

5
scripts/run-load-test.sh Executable file
View file

@ -0,0 +1,5 @@
#!/bin/sh
export RUSTFLAGS="-C target-cpu=native"
cargo run --release --bin aquatic_load_test -- $@