udp load test: use only one type of worker for better performance

This commit is contained in:
Joakim Frostegård 2021-11-17 00:51:15 +01:00
parent 2aa94d050b
commit 5440157a95
5 changed files with 54 additions and 219 deletions

View file

@ -23,8 +23,6 @@
without sharding. io_uring impl is slightly behind or slighly ahead of mio, but without sharding. io_uring impl is slightly behind or slighly ahead of mio, but
nothing justifying code complexity and unsafety nothing justifying code complexity and unsafety
* clean torrent map in workers, remove it from shared state * clean torrent map in workers, remove it from shared state
* consider rewriting load test to just have one worker type. Connection state
should/could be divided by socket worker anyway?
* mio * mio
* stagger connection cleaning intervals? * stagger connection cleaning intervals?
* uring * uring

View file

@ -5,7 +5,6 @@ use aquatic_cli_helpers::LogLevel;
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::CpuPinningConfig; use aquatic_common::cpu_pinning::CpuPinningConfig;
use hashbrown::HashMap; use hashbrown::HashMap;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
@ -22,11 +21,7 @@ pub struct Config {
/// address here. /// address here.
pub server_address: SocketAddr, pub server_address: SocketAddr,
pub log_level: LogLevel, pub log_level: LogLevel,
/// Number of sockets and socket worker threads pub workers: u8,
pub num_socket_workers: u8,
/// Number of workers generating requests from responses, as well as
/// requests not connected to previous ones.
pub num_request_workers: usize,
/// Run duration (quit and generate report after this many seconds) /// Run duration (quit and generate report after this many seconds)
pub duration: usize, pub duration: usize,
pub network: NetworkConfig, pub network: NetworkConfig,
@ -75,8 +70,6 @@ pub struct HandlerConfig {
pub number_of_torrents: usize, pub number_of_torrents: usize,
/// Maximum number of torrents to ask about in scrape requests /// Maximum number of torrents to ask about in scrape requests
pub scrape_max_torrents: usize, pub scrape_max_torrents: usize,
/// Handler: max number of responses to collect for before processing
pub max_responses_per_iter: usize,
/// Probability that a generated request is a connect request as part /// Probability that a generated request is a connect request as part
/// of sum of the various weight arguments. /// of sum of the various weight arguments.
pub weight_connect: usize, pub weight_connect: usize,
@ -86,8 +79,6 @@ pub struct HandlerConfig {
/// Probability that a generated request is a scrape request, as part /// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments. /// of sum of the various weight arguments.
pub weight_scrape: usize, pub weight_scrape: usize,
/// Handler: max microseconds to wait for single response from channel
pub channel_timeout: u64,
/// Pareto shape /// Pareto shape
/// ///
/// Fake peers choose torrents according to Pareto distribution. /// Fake peers choose torrents according to Pareto distribution.
@ -105,8 +96,7 @@ impl Default for Config {
Self { Self {
server_address: "127.0.0.1:3000".parse().unwrap(), server_address: "127.0.0.1:3000".parse().unwrap(),
log_level: LogLevel::Error, log_level: LogLevel::Error,
num_socket_workers: 1, workers: 1,
num_request_workers: 1,
duration: 0, duration: 0,
network: NetworkConfig::default(), network: NetworkConfig::default(),
handler: HandlerConfig::default(), handler: HandlerConfig::default(),
@ -138,8 +128,6 @@ impl Default for HandlerConfig {
weight_announce: 1, weight_announce: 1,
weight_scrape: 1, weight_scrape: 1,
additional_request_factor: 0.4, additional_request_factor: 0.4,
max_responses_per_iter: 10_000,
channel_timeout: 200,
torrent_selection_pareto_shape: 2.0, torrent_selection_pareto_shape: 2.0,
} }
} }
@ -168,9 +156,9 @@ pub struct Statistics {
#[derive(Clone)] #[derive(Clone)]
pub struct LoadTestState { pub struct LoadTestState {
pub torrent_peers: Arc<Mutex<TorrentPeerMap>>,
pub info_hashes: Arc<Vec<InfoHash>>, pub info_hashes: Arc<Vec<InfoHash>>,
pub statistics: Arc<Statistics>, pub statistics: Arc<Statistics>,
pub responses: Arc<AtomicUsize>,
} }
#[derive(PartialEq, Eq, Clone, Copy)] #[derive(PartialEq, Eq, Clone, Copy)]

View file

@ -1,9 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use std::vec::Drain;
use crossbeam_channel::{Receiver, Sender};
use parking_lot::MutexGuard;
use rand::distributions::WeightedIndex; use rand::distributions::WeightedIndex;
use rand::prelude::*; use rand::prelude::*;
use rand_distr::Pareto; use rand_distr::Pareto;
@ -13,125 +9,7 @@ use aquatic_udp_protocol::*;
use crate::common::*; use crate::common::*;
use crate::utils::*; use crate::utils::*;
pub fn run_handler_thread( pub fn process_response(
config: &Config,
state: LoadTestState,
pareto: Pareto<f64>,
request_senders: Vec<Sender<Request>>,
response_receiver: Receiver<(ThreadId, Response)>,
) {
let state = &state;
let mut rng1 = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()");
let mut rng2 = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()");
let timeout = Duration::from_micros(config.handler.channel_timeout);
let mut responses = Vec::new();
loop {
let mut opt_torrent_peers = None;
// Collect a maximum number of responses. Stop collecting before that
// number is reached if having waited for too long for a request, but
// only if ConnectionMap mutex isn't locked.
for i in 0..config.handler.max_responses_per_iter {
let response = if i == 0 {
match response_receiver.recv() {
Ok(r) => r,
Err(_) => break, // Really shouldn't happen
}
} else {
match response_receiver.recv_timeout(timeout) {
Ok(r) => r,
Err(_) => {
if let Some(guard) = state.torrent_peers.try_lock() {
opt_torrent_peers = Some(guard);
break;
} else {
continue;
}
}
}
};
responses.push(response);
}
let mut torrent_peers: MutexGuard<TorrentPeerMap> =
opt_torrent_peers.unwrap_or_else(|| state.torrent_peers.lock());
let requests = process_responses(
&mut rng1,
pareto,
&state.info_hashes,
config,
&mut torrent_peers,
responses.drain(..),
);
// Somewhat dubious heuristic for deciding how fast to create
// and send additional requests (requests not having anything
// to do with previously sent requests)
let num_additional_to_send = {
let num_additional_requests = requests.iter().map(|v| v.len()).sum::<usize>() as f64;
let num_new_requests_per_socket =
num_additional_requests / config.num_socket_workers as f64;
((num_new_requests_per_socket / 1.2) * config.handler.additional_request_factor)
as usize
+ 10
};
for (channel_index, new_requests) in requests.into_iter().enumerate() {
let channel = &request_senders[channel_index];
for _ in 0..num_additional_to_send {
let request = create_connect_request(generate_transaction_id(&mut rng2));
channel
.send(request)
.expect("send request to channel in handler worker");
}
for request in new_requests.into_iter() {
channel
.send(request)
.expect("send request to channel in handler worker");
}
}
}
}
fn process_responses(
rng: &mut impl Rng,
pareto: Pareto<f64>,
info_hashes: &Arc<Vec<InfoHash>>,
config: &Config,
torrent_peers: &mut TorrentPeerMap,
responses: Drain<(ThreadId, Response)>,
) -> Vec<Vec<Request>> {
let mut new_requests = Vec::with_capacity(config.num_socket_workers as usize);
for _ in 0..config.num_socket_workers {
new_requests.push(Vec::new());
}
for (socket_thread_id, response) in responses.into_iter() {
let opt_request =
process_response(rng, pareto, info_hashes, &config, torrent_peers, response);
if let Some(new_request) = opt_request {
new_requests[socket_thread_id.0 as usize].push(new_request);
}
}
new_requests
}
fn process_response(
rng: &mut impl Rng, rng: &mut impl Rng,
pareto: Pareto<f64>, pareto: Pareto<f64>,
info_hashes: &Arc<Vec<InfoHash>>, info_hashes: &Arc<Vec<InfoHash>>,

View file

@ -5,10 +5,6 @@ use std::time::{Duration, Instant};
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use crossbeam_channel::unbounded;
use hashbrown::HashMap;
use parking_lot::Mutex;
use rand::prelude::*;
use rand_distr::Pareto; use rand_distr::Pareto;
mod common; mod common;
@ -17,7 +13,6 @@ mod network;
mod utils; mod utils;
use common::*; use common::*;
use handler::run_handler_thread;
use network::*; use network::*;
use utils::*; use utils::*;
@ -54,22 +49,17 @@ fn run(config: Config) -> ::anyhow::Result<()> {
} }
let state = LoadTestState { let state = LoadTestState {
torrent_peers: Arc::new(Mutex::new(HashMap::new())),
info_hashes: Arc::new(info_hashes), info_hashes: Arc::new(info_hashes),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(Statistics::default()),
responses: Default::default(),
}; };
let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap(); let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap();
// Start socket workers // Start workers
let (response_sender, response_receiver) = unbounded(); for i in 0..config.workers {
let mut request_senders = Vec::new();
for i in 0..config.num_socket_workers {
let thread_id = ThreadId(i); let thread_id = ThreadId(i);
let (sender, receiver) = unbounded();
let port = config.network.first_port + (i as u16); let port = config.network.first_port + (i as u16);
let ip = if config.server_address.is_ipv6() { let ip = if config.server_address.is_ipv6() {
@ -83,55 +73,25 @@ fn run(config: Config) -> ::anyhow::Result<()> {
}; };
let addr = SocketAddr::new(ip, port); let addr = SocketAddr::new(ip, port);
request_senders.push(sender);
let config = config.clone(); let config = config.clone();
let response_sender = response_sender.clone();
let state = state.clone(); let state = state.clone();
thread::spawn(move || { thread::spawn(move || {
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to( pin_current_if_configured_to(
&config.cpu_pinning, &config.cpu_pinning,
config.num_socket_workers as usize, config.workers as usize,
WorkerIndex::SocketWorker(i as usize), WorkerIndex::SocketWorker(i as usize),
); );
run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) run_worker_thread(state, pareto, &config, addr, thread_id)
}); });
} }
for i in 0..config.num_request_workers {
let config = config.clone();
let state = state.clone();
let request_senders = request_senders.clone();
let response_receiver = response_receiver.clone();
thread::spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.num_socket_workers as usize,
WorkerIndex::RequestWorker(i as usize),
);
run_handler_thread(&config, state, pareto, request_senders, response_receiver)
});
}
// Bootstrap request cycle by adding a request to each request channel
for sender in request_senders.iter() {
let request = create_connect_request(generate_transaction_id(&mut thread_rng()));
sender
.send(request)
.expect("bootstrap: add initial request to request queue");
}
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to( pin_current_if_configured_to(
&config.cpu_pinning, &config.cpu_pinning,
config.num_socket_workers as usize, config.workers as usize,
WorkerIndex::Other, WorkerIndex::Other,
); );

View file

@ -1,15 +1,16 @@
use std::io::Cursor; use std::{io::Cursor, vec::Drain};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Duration; use std::time::Duration;
use crossbeam_channel::{Receiver, Sender};
use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use mio::{net::UdpSocket, Events, Interest, Poll, Token};
use rand::{SeedableRng, prelude::SmallRng, thread_rng};
use rand_distr::Pareto;
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::common::*; use crate::{common::*, handler::{process_response}, utils::*};
const MAX_PACKET_SIZE: usize = 4096; const MAX_PACKET_SIZE: usize = 4096;
@ -45,10 +46,9 @@ pub fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket
socket.into() socket.into()
} }
pub fn run_socket_thread( pub fn run_worker_thread(
state: LoadTestState, state: LoadTestState,
response_channel_sender: Sender<(ThreadId, Response)>, pareto: Pareto<f64>,
request_receiver: Receiver<Request>,
config: &Config, config: &Config,
addr: SocketAddr, addr: SocketAddr,
thread_id: ThreadId, thread_id: ThreadId,
@ -56,6 +56,9 @@ pub fn run_socket_thread(
let mut socket = UdpSocket::from_std(create_socket(config, addr)); let mut socket = UdpSocket::from_std(create_socket(config, addr));
let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()");
let mut torrent_peers = TorrentPeerMap::default();
let token = Token(thread_id.0 as usize); let token = Token(thread_id.0 as usize);
let interests = Interest::READABLE; let interests = Interest::READABLE;
let timeout = Duration::from_micros(config.network.poll_timeout); let timeout = Duration::from_micros(config.network.poll_timeout);
@ -70,6 +73,10 @@ pub fn run_socket_thread(
let mut local_state = SocketWorkerLocalStatistics::default(); let mut local_state = SocketWorkerLocalStatistics::default();
let mut responses = Vec::new(); let mut responses = Vec::new();
let mut requests = Vec::new();
// Bootstrap request cycle by adding a request
requests.push(create_connect_request(generate_transaction_id(&mut thread_rng())));
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
@ -78,52 +85,56 @@ pub fn run_socket_thread(
for event in events.iter() { for event in events.iter() {
if (event.token() == token) & event.is_readable() { if (event.token() == token) & event.is_readable() {
read_responses( read_responses(
thread_id,
&socket, &socket,
&mut buffer, &mut buffer,
&mut local_state, &mut local_state,
&mut responses, &mut responses,
); );
for r in responses.drain(..) {
response_channel_sender.send(r).unwrap_or_else(|err| {
panic!(
"add response to channel in socket worker {}: {:?}",
thread_id.0, err
)
});
}
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
} }
}
send_requests( let total_responses = responses.len() + if thread_id.0 == 0 {
&state, state.responses.fetch_and(0, Ordering::SeqCst)
&mut socket, } else {
&mut buffer, state.responses.fetch_add(responses.len(), Ordering::SeqCst)
&request_receiver, };
&mut local_state,
); // Somewhat dubious heuristic for deciding how fast to create
// and send additional requests
let num_additional_to_send = {
let n = total_responses as f64 / (config.workers as f64 * 4.0);
(n * config.handler.additional_request_factor) as usize + 10
};
for _ in 0..num_additional_to_send {
requests.push(create_connect_request(generate_transaction_id(&mut rng)));
}
for response in responses.drain(..) {
let opt_request =
process_response(&mut rng, pareto, &state.info_hashes, &config, &mut torrent_peers, response);
if let Some(new_request) = opt_request {
requests.push(new_request);
}
} }
send_requests( send_requests(
&state, &state,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
&request_receiver,
&mut local_state, &mut local_state,
requests.drain(..),
); );
} }
} }
fn read_responses( fn read_responses(
thread_id: ThreadId,
socket: &UdpSocket, socket: &UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
ls: &mut SocketWorkerLocalStatistics, ls: &mut SocketWorkerLocalStatistics,
responses: &mut Vec<(ThreadId, Response)>, responses: &mut Vec<Response>,
) { ) {
while let Ok(amt) = socket.recv(buffer) { while let Ok(amt) = socket.recv(buffer) {
match Response::from_bytes(&buffer[0..amt]) { match Response::from_bytes(&buffer[0..amt]) {
@ -148,7 +159,7 @@ fn read_responses(
} }
} }
responses.push((thread_id, response)) responses.push(response)
} }
Err(err) => { Err(err) => {
eprintln!("Received invalid response: {:#?}", err); eprintln!("Received invalid response: {:#?}", err);
@ -161,12 +172,12 @@ fn send_requests(
state: &LoadTestState, state: &LoadTestState,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
receiver: &Receiver<Request>,
statistics: &mut SocketWorkerLocalStatistics, statistics: &mut SocketWorkerLocalStatistics,
requests: Drain<Request>,
) { ) {
let mut cursor = Cursor::new(buffer); let mut cursor = Cursor::new(buffer);
while let Ok(request) = receiver.try_recv() { for request in requests {
cursor.set_position(0); cursor.set_position(0);
if let Err(err) = request.write(&mut cursor) { if let Err(err) = request.write(&mut cursor) {