udp load test: rewrite network loop, default to weight_announce=5

This commit is contained in:
Joakim Frostegård 2021-11-17 00:58:08 +01:00
parent c5bf3901ea
commit eb511c3a4c
3 changed files with 66 additions and 110 deletions

View file

@ -85,10 +85,6 @@ pub struct HandlerConfig {
pub torrent_selection_pareto_shape: f64, pub torrent_selection_pareto_shape: f64,
/// Probability that a generated peer is a seeder /// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64, pub peer_seeder_probability: f64,
/// Part of additional request creation calculation, meaning requests
/// which are not dependent on previous responses from server. Higher
/// means more.
pub additional_request_factor: f64,
} }
impl Default for Config { impl Default for Config {
@ -125,9 +121,8 @@ impl Default for HandlerConfig {
peer_seeder_probability: 0.25, peer_seeder_probability: 0.25,
scrape_max_torrents: 50, scrape_max_torrents: 50,
weight_connect: 0, weight_connect: 0,
weight_announce: 1, weight_announce: 5,
weight_scrape: 1, weight_scrape: 1,
additional_request_factor: 0.4,
torrent_selection_pareto_shape: 2.0, torrent_selection_pareto_shape: 2.0,
} }
} }
@ -158,7 +153,6 @@ pub struct Statistics {
pub struct LoadTestState { pub struct LoadTestState {
pub info_hashes: Arc<Vec<InfoHash>>, pub info_hashes: Arc<Vec<InfoHash>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
pub responses: Arc<AtomicUsize>,
} }
#[derive(PartialEq, Eq, Clone, Copy)] #[derive(PartialEq, Eq, Clone, Copy)]

View file

@ -51,7 +51,6 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let state = LoadTestState { let state = LoadTestState {
info_hashes: Arc::new(info_hashes), info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
responses: Default::default(),
}; };
let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap(); let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap();

View file

@ -1,7 +1,7 @@
use std::io::Cursor;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Duration; use std::time::Duration;
use std::{io::Cursor, vec::Drain};
use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use mio::{net::UdpSocket, Events, Interest, Poll, Token};
use rand::{prelude::SmallRng, thread_rng, SeedableRng}; use rand::{prelude::SmallRng, thread_rng, SeedableRng};
@ -71,14 +71,11 @@ pub fn run_worker_thread(
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut local_state = SocketWorkerLocalStatistics::default(); let mut statistics = SocketWorkerLocalStatistics::default();
let mut responses = Vec::new();
let mut requests = Vec::new();
// Bootstrap request cycle by adding a request // Bootstrap request cycle
requests.push(create_connect_request(generate_transaction_id( let initial_request = create_connect_request(generate_transaction_id(&mut thread_rng()));
&mut thread_rng(), send_request(&mut socket, &mut buffer, &mut statistics, initial_request);
)));
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
@ -86,30 +83,29 @@ pub fn run_worker_thread(
for event in events.iter() { for event in events.iter() {
if (event.token() == token) & event.is_readable() { if (event.token() == token) & event.is_readable() {
read_responses(&socket, &mut buffer, &mut local_state, &mut responses); while let Ok(amt) = socket.recv(&mut buffer) {
match Response::from_bytes(&buffer[0..amt]) {
Ok(response) => {
match response {
Response::AnnounceIpv4(ref r) => {
statistics.responses_announce += 1;
statistics.response_peers += r.peers.len();
}
Response::AnnounceIpv6(ref r) => {
statistics.responses_announce += 1;
statistics.response_peers += r.peers.len();
}
Response::Scrape(_) => {
statistics.responses_scrape += 1;
}
Response::Connect(_) => {
statistics.responses_connect += 1;
}
Response::Error(_) => {
statistics.responses_error += 1;
} }
} }
let total_responses = responses.len()
+ if thread_id.0 == 0 {
state.responses.fetch_and(0, Ordering::SeqCst)
} else {
state.responses.fetch_add(responses.len(), Ordering::SeqCst)
};
// Somewhat dubious heuristic for deciding how fast to create
// and send additional requests
let num_additional_to_send = {
let n = total_responses as f64 / (config.workers as f64 * 4.0);
(n * config.handler.additional_request_factor) as usize + 10
};
for _ in 0..num_additional_to_send {
requests.push(create_connect_request(generate_transaction_id(&mut rng)));
}
for response in responses.drain(..) {
let opt_request = process_response( let opt_request = process_response(
&mut rng, &mut rng,
pareto, pareto,
@ -119,75 +115,36 @@ pub fn run_worker_thread(
response, response,
); );
if let Some(new_request) = opt_request { if let Some(request) = opt_request {
requests.push(new_request); send_request(&mut socket, &mut buffer, &mut statistics, request);
} }
} }
send_requests(
&state,
&mut socket,
&mut buffer,
&mut local_state,
requests.drain(..),
);
}
}
fn read_responses(
socket: &UdpSocket,
buffer: &mut [u8],
ls: &mut SocketWorkerLocalStatistics,
responses: &mut Vec<Response>,
) {
while let Ok(amt) = socket.recv(buffer) {
match Response::from_bytes(&buffer[0..amt]) {
Ok(response) => {
match response {
Response::AnnounceIpv4(ref r) => {
ls.responses_announce += 1;
ls.response_peers += r.peers.len();
}
Response::AnnounceIpv6(ref r) => {
ls.responses_announce += 1;
ls.response_peers += r.peers.len();
}
Response::Scrape(_) => {
ls.responses_scrape += 1;
}
Response::Connect(_) => {
ls.responses_connect += 1;
}
Response::Error(_) => {
ls.responses_error += 1;
}
}
responses.push(response)
}
Err(err) => { Err(err) => {
eprintln!("Received invalid response: {:#?}", err); eprintln!("Received invalid response: {:#?}", err);
} }
} }
} }
let additional_request = create_connect_request(generate_transaction_id(&mut rng));
send_request(&mut socket, &mut buffer, &mut statistics, additional_request);
update_shared_statistics(&state, &mut statistics);
}
}
}
} }
fn send_requests( fn send_request(
state: &LoadTestState,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
statistics: &mut SocketWorkerLocalStatistics, statistics: &mut SocketWorkerLocalStatistics,
requests: Drain<Request>, request: Request,
) { ) {
let mut cursor = Cursor::new(buffer); let mut cursor = Cursor::new(buffer);
for request in requests { match request.write(&mut cursor) {
cursor.set_position(0); Ok(()) => {
if let Err(err) = request.write(&mut cursor) {
eprintln!("request_to_bytes err: {}", err);
}
let position = cursor.position() as usize; let position = cursor.position() as usize;
let inner = cursor.get_ref(); let inner = cursor.get_ref();
@ -200,7 +157,13 @@ fn send_requests(
} }
} }
} }
Err(err) => {
eprintln!("request_to_bytes err: {}", err);
}
}
}
fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorkerLocalStatistics) {
state state
.statistics .statistics
.requests .requests