diff --git a/TODO.md b/TODO.md index e5acc4b..6172add 100644 --- a/TODO.md +++ b/TODO.md @@ -23,8 +23,6 @@ without sharding. io_uring impl is slightly behind or slighly ahead of mio, but nothing justifying code complexity and unsafety * clean torrent map in workers, remove it from shared state - * consider rewriting load test to just have one worker type. Connection state - should/could be divided by socket worker anyway? * mio * stagger connection cleaning intervals? * uring diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index 6339f32..85474fd 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -5,7 +5,6 @@ use aquatic_cli_helpers::LogLevel; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::CpuPinningConfig; use hashbrown::HashMap; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use aquatic_udp_protocol::*; @@ -22,11 +21,7 @@ pub struct Config { /// address here. pub server_address: SocketAddr, pub log_level: LogLevel, - /// Number of sockets and socket worker threads - pub num_socket_workers: u8, - /// Number of workers generating requests from responses, as well as - /// requests not connected to previous ones. - pub num_request_workers: usize, + pub workers: u8, /// Run duration (quit and generate report after this many seconds) pub duration: usize, pub network: NetworkConfig, @@ -75,8 +70,6 @@ pub struct HandlerConfig { pub number_of_torrents: usize, /// Maximum number of torrents to ask about in scrape requests pub scrape_max_torrents: usize, - /// Handler: max number of responses to collect for before processing - pub max_responses_per_iter: usize, /// Probability that a generated request is a connect request as part /// of sum of the various weight arguments. pub weight_connect: usize, @@ -86,8 +79,6 @@ pub struct HandlerConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, - /// Handler: max microseconds to wait for single response from channel - pub channel_timeout: u64, /// Pareto shape /// /// Fake peers choose torrents according to Pareto distribution. @@ -105,8 +96,7 @@ impl Default for Config { Self { server_address: "127.0.0.1:3000".parse().unwrap(), log_level: LogLevel::Error, - num_socket_workers: 1, - num_request_workers: 1, + workers: 1, duration: 0, network: NetworkConfig::default(), handler: HandlerConfig::default(), @@ -138,8 +128,6 @@ impl Default for HandlerConfig { weight_announce: 1, weight_scrape: 1, additional_request_factor: 0.4, - max_responses_per_iter: 10_000, - channel_timeout: 200, torrent_selection_pareto_shape: 2.0, } } @@ -168,9 +156,9 @@ pub struct Statistics { #[derive(Clone)] pub struct LoadTestState { - pub torrent_peers: Arc>, pub info_hashes: Arc>, pub statistics: Arc, + pub responses: Arc, } #[derive(PartialEq, Eq, Clone, Copy)] diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/handler.rs index e690ca4..cc98c27 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/handler.rs @@ -1,9 +1,5 @@ use std::sync::Arc; -use std::time::Duration; -use std::vec::Drain; -use crossbeam_channel::{Receiver, Sender}; -use parking_lot::MutexGuard; use rand::distributions::WeightedIndex; use rand::prelude::*; use rand_distr::Pareto; @@ -13,125 +9,7 @@ use aquatic_udp_protocol::*; use crate::common::*; use crate::utils::*; -pub fn run_handler_thread( - config: &Config, - state: LoadTestState, - pareto: Pareto, - request_senders: Vec>, - response_receiver: Receiver<(ThreadId, Response)>, -) { - let state = &state; - - let mut rng1 = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()"); - let mut rng2 = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()"); - - let timeout = Duration::from_micros(config.handler.channel_timeout); - - let mut responses = Vec::new(); - - loop { - let mut opt_torrent_peers = None; - - // Collect a maximum number of responses. Stop collecting before that - // number is reached if having waited for too long for a request, but - // only if ConnectionMap mutex isn't locked. - for i in 0..config.handler.max_responses_per_iter { - let response = if i == 0 { - match response_receiver.recv() { - Ok(r) => r, - Err(_) => break, // Really shouldn't happen - } - } else { - match response_receiver.recv_timeout(timeout) { - Ok(r) => r, - Err(_) => { - if let Some(guard) = state.torrent_peers.try_lock() { - opt_torrent_peers = Some(guard); - - break; - } else { - continue; - } - } - } - }; - - responses.push(response); - } - - let mut torrent_peers: MutexGuard = - opt_torrent_peers.unwrap_or_else(|| state.torrent_peers.lock()); - - let requests = process_responses( - &mut rng1, - pareto, - &state.info_hashes, - config, - &mut torrent_peers, - responses.drain(..), - ); - - // Somewhat dubious heuristic for deciding how fast to create - // and send additional requests (requests not having anything - // to do with previously sent requests) - let num_additional_to_send = { - let num_additional_requests = requests.iter().map(|v| v.len()).sum::() as f64; - - let num_new_requests_per_socket = - num_additional_requests / config.num_socket_workers as f64; - - ((num_new_requests_per_socket / 1.2) * config.handler.additional_request_factor) - as usize - + 10 - }; - - for (channel_index, new_requests) in requests.into_iter().enumerate() { - let channel = &request_senders[channel_index]; - - for _ in 0..num_additional_to_send { - let request = create_connect_request(generate_transaction_id(&mut rng2)); - - channel - .send(request) - .expect("send request to channel in handler worker"); - } - - for request in new_requests.into_iter() { - channel - .send(request) - .expect("send request to channel in handler worker"); - } - } - } -} - -fn process_responses( - rng: &mut impl Rng, - pareto: Pareto, - info_hashes: &Arc>, - config: &Config, - torrent_peers: &mut TorrentPeerMap, - responses: Drain<(ThreadId, Response)>, -) -> Vec> { - let mut new_requests = Vec::with_capacity(config.num_socket_workers as usize); - - for _ in 0..config.num_socket_workers { - new_requests.push(Vec::new()); - } - - for (socket_thread_id, response) in responses.into_iter() { - let opt_request = - process_response(rng, pareto, info_hashes, &config, torrent_peers, response); - - if let Some(new_request) = opt_request { - new_requests[socket_thread_id.0 as usize].push(new_request); - } - } - - new_requests -} - -fn process_response( +pub fn process_response( rng: &mut impl Rng, pareto: Pareto, info_hashes: &Arc>, diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 0b34bb2..50468c8 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -5,10 +5,6 @@ use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use crossbeam_channel::unbounded; -use hashbrown::HashMap; -use parking_lot::Mutex; -use rand::prelude::*; use rand_distr::Pareto; mod common; @@ -17,7 +13,6 @@ mod network; mod utils; use common::*; -use handler::run_handler_thread; use network::*; use utils::*; @@ -54,22 +49,17 @@ fn run(config: Config) -> ::anyhow::Result<()> { } let state = LoadTestState { - torrent_peers: Arc::new(Mutex::new(HashMap::new())), info_hashes: Arc::new(info_hashes), statistics: Arc::new(Statistics::default()), + responses: Default::default(), }; let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap(); - // Start socket workers + // Start workers - let (response_sender, response_receiver) = unbounded(); - - let mut request_senders = Vec::new(); - - for i in 0..config.num_socket_workers { + for i in 0..config.workers { let thread_id = ThreadId(i); - let (sender, receiver) = unbounded(); let port = config.network.first_port + (i as u16); let ip = if config.server_address.is_ipv6() { @@ -83,55 +73,25 @@ fn run(config: Config) -> ::anyhow::Result<()> { }; let addr = SocketAddr::new(ip, port); - - request_senders.push(sender); - let config = config.clone(); - let response_sender = response_sender.clone(); let state = state.clone(); thread::spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, - config.num_socket_workers as usize, + config.workers as usize, WorkerIndex::SocketWorker(i as usize), ); - run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) + run_worker_thread(state, pareto, &config, addr, thread_id) }); } - for i in 0..config.num_request_workers { - let config = config.clone(); - let state = state.clone(); - let request_senders = request_senders.clone(); - let response_receiver = response_receiver.clone(); - - thread::spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.num_socket_workers as usize, - WorkerIndex::RequestWorker(i as usize), - ); - run_handler_thread(&config, state, pareto, request_senders, response_receiver) - }); - } - - // Bootstrap request cycle by adding a request to each request channel - for sender in request_senders.iter() { - let request = create_connect_request(generate_transaction_id(&mut thread_rng())); - - sender - .send(request) - .expect("bootstrap: add initial request to request queue"); - } - #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, - config.num_socket_workers as usize, + config.workers as usize, WorkerIndex::Other, ); diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index 358a27b..79fc3e5 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -1,15 +1,16 @@ -use std::io::Cursor; +use std::{io::Cursor, vec::Drain}; use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::time::Duration; -use crossbeam_channel::{Receiver, Sender}; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; +use rand::{SeedableRng, prelude::SmallRng, thread_rng}; +use rand_distr::Pareto; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; -use crate::common::*; +use crate::{common::*, handler::{process_response}, utils::*}; const MAX_PACKET_SIZE: usize = 4096; @@ -45,10 +46,9 @@ pub fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket socket.into() } -pub fn run_socket_thread( +pub fn run_worker_thread( state: LoadTestState, - response_channel_sender: Sender<(ThreadId, Response)>, - request_receiver: Receiver, + pareto: Pareto, config: &Config, addr: SocketAddr, thread_id: ThreadId, @@ -56,6 +56,9 @@ pub fn run_socket_thread( let mut socket = UdpSocket::from_std(create_socket(config, addr)); let mut buffer = [0u8; MAX_PACKET_SIZE]; + let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()"); + let mut torrent_peers = TorrentPeerMap::default(); + let token = Token(thread_id.0 as usize); let interests = Interest::READABLE; let timeout = Duration::from_micros(config.network.poll_timeout); @@ -70,6 +73,10 @@ pub fn run_socket_thread( let mut local_state = SocketWorkerLocalStatistics::default(); let mut responses = Vec::new(); + let mut requests = Vec::new(); + + // Bootstrap request cycle by adding a request + requests.push(create_connect_request(generate_transaction_id(&mut thread_rng()))); loop { poll.poll(&mut events, Some(timeout)) @@ -78,52 +85,56 @@ pub fn run_socket_thread( for event in events.iter() { if (event.token() == token) & event.is_readable() { read_responses( - thread_id, &socket, &mut buffer, &mut local_state, &mut responses, ); - - for r in responses.drain(..) { - response_channel_sender.send(r).unwrap_or_else(|err| { - panic!( - "add response to channel in socket worker {}: {:?}", - thread_id.0, err - ) - }); - } - - poll.registry() - .reregister(&mut socket, token, interests) - .unwrap(); } + } - send_requests( - &state, - &mut socket, - &mut buffer, - &request_receiver, - &mut local_state, - ); + let total_responses = responses.len() + if thread_id.0 == 0 { + state.responses.fetch_and(0, Ordering::SeqCst) + } else { + state.responses.fetch_add(responses.len(), Ordering::SeqCst) + }; + + // Somewhat dubious heuristic for deciding how fast to create + // and send additional requests + let num_additional_to_send = { + let n = total_responses as f64 / (config.workers as f64 * 4.0); + + (n * config.handler.additional_request_factor) as usize + 10 + }; + + for _ in 0..num_additional_to_send { + requests.push(create_connect_request(generate_transaction_id(&mut rng))); + } + + for response in responses.drain(..) { + let opt_request = + process_response(&mut rng, pareto, &state.info_hashes, &config, &mut torrent_peers, response); + + if let Some(new_request) = opt_request { + requests.push(new_request); + } } send_requests( &state, &mut socket, &mut buffer, - &request_receiver, &mut local_state, + requests.drain(..), ); } } fn read_responses( - thread_id: ThreadId, socket: &UdpSocket, buffer: &mut [u8], ls: &mut SocketWorkerLocalStatistics, - responses: &mut Vec<(ThreadId, Response)>, + responses: &mut Vec, ) { while let Ok(amt) = socket.recv(buffer) { match Response::from_bytes(&buffer[0..amt]) { @@ -148,7 +159,7 @@ fn read_responses( } } - responses.push((thread_id, response)) + responses.push(response) } Err(err) => { eprintln!("Received invalid response: {:#?}", err); @@ -161,12 +172,12 @@ fn send_requests( state: &LoadTestState, socket: &mut UdpSocket, buffer: &mut [u8], - receiver: &Receiver, statistics: &mut SocketWorkerLocalStatistics, + requests: Drain, ) { let mut cursor = Cursor::new(buffer); - while let Ok(request) = receiver.try_recv() { + for request in requests { cursor.set_position(0); if let Err(err) = request.write(&mut cursor) {