diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 54ed266..1975aac 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -15,8 +15,8 @@ use std::{ use crate::config::Config; mod common; -mod workers; pub mod config; +mod workers; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; @@ -115,8 +115,13 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { WorkerIndex::RequestWorker(i), ); - workers::request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) - .await + workers::request::run_request_worker( + config, + state, + request_mesh_builder, + response_mesh_builder, + ) + .await }); executors.push(executor); diff --git a/aquatic_http/src/workers/mod.rs b/aquatic_http/src/workers/mod.rs index 5f41ab1..63fc0ec 100644 --- a/aquatic_http/src/workers/mod.rs +++ b/aquatic_http/src/workers/mod.rs @@ -1,2 +1,2 @@ pub mod request; -pub mod socket; \ No newline at end of file +pub mod socket; diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index b0f9f2e..06c1e75 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -5,7 +5,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; -use crossbeam_channel::Sender; +use crossbeam_channel::{Sender, TrySendError}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; use aquatic_common::AHashIndexMap; @@ -88,8 +88,14 @@ impl ConnectedRequestSender { request: ConnectedRequest, addr: SocketAddr, ) { - if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + match self.senders[index.0].try_send((self.index, request, addr)) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0) + } + Err(TrySendError::Disconnected(_)) => { + panic!("Request channel {} is disconnected", index.0); + } } } } @@ -109,8 +115,14 @@ impl ConnectedResponseSender { response: ConnectedResponse, addr: SocketAddr, ) { - if let Err(err) = self.senders[index.0].try_send((response, addr)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + match self.senders[index.0].try_send((response, addr)) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + ::log::error!("Response channel {} is full, dropping response. Try increasing number of socket workers or raising config.worker_channel_size.", index.0) + } + Err(TrySendError::Disconnected(_)) => { + panic!("Response channel {} is disconnected", index.0); + } } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 35f2e0a..279d6e8 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -16,9 +16,18 @@ pub struct Config { /// generate responses and send them back to the socket workers. pub request_workers: usize, pub log_level: LogLevel, + /// Maximum number of items in each channel passing requests/responses + /// between workers. A value of zero means that the channel will be of + /// unbounded size. + pub worker_channel_size: usize, + /// How long to block waiting for requests in request workers. Higher + /// values means that with zero traffic, the worker will not unnecessarily + /// cause the CPU to wake up as often. However, high values (something like + /// larger than 1000) combined with very low traffic can cause delays + /// in torrent cleaning. + pub request_channel_recv_timeout_ms: u64, pub network: NetworkConfig, pub protocol: ProtocolConfig, - pub handlers: HandlerConfig, pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, @@ -33,9 +42,10 @@ impl Default for Config { socket_workers: 1, request_workers: 1, log_level: LogLevel::Error, + worker_channel_size: 0, + request_channel_recv_timeout_ms: 100, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), - handlers: HandlerConfig::default(), statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), @@ -110,20 +120,6 @@ impl Default for ProtocolConfig { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct HandlerConfig { - pub channel_recv_timeout_ms: u64, -} - -impl Default for HandlerConfig { - fn default() -> Self { - Self { - channel_recv_timeout_ms: 100, - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct StatisticsConfig { diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 1e7e604..3c77751 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -12,7 +12,7 @@ use anyhow::Context; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; -use crossbeam_channel::unbounded; +use crossbeam_channel::{bounded, unbounded}; use aquatic_common::access_list::update_access_list; use signal_hook::consts::SIGUSR1; @@ -40,14 +40,22 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_receivers = BTreeMap::new(); for i in 0..config.request_workers { - let (request_sender, request_receiver) = unbounded(); + let (request_sender, request_receiver) = if config.worker_channel_size == 0 { + unbounded() + } else { + bounded(config.worker_channel_size) + }; request_senders.push(request_sender); request_receivers.insert(i, request_receiver); } for i in 0..config.socket_workers { - let (response_sender, response_receiver) = unbounded(); + let (response_sender, response_receiver) = if config.worker_channel_size == 0 { + unbounded() + } else { + bounded(config.worker_channel_size) + }; response_senders.push(response_sender); response_receivers.insert(i, response_receiver); diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index dd0498b..ea81d29 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -90,7 +90,7 @@ pub fn run_request_worker( let mut torrents = TorrentMaps::default(); let mut small_rng = SmallRng::from_entropy(); - let timeout = Duration::from_millis(config.handlers.channel_recv_timeout_ms); + let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms); let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index 276c324..6f40729 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -4,9 +4,6 @@ use hashbrown::HashMap; use aquatic_udp_protocol::*; -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -pub struct ThreadId(pub u8); - #[derive(PartialEq, Eq, Clone)] pub struct TorrentPeer { pub info_hash: InfoHash, diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index cc63900..55ee0e2 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -18,15 +18,27 @@ pub struct Config { pub workers: u8, /// Run duration (quit and generate report after this many seconds) pub duration: usize, - /// Probability that an additional connect request will be sent for each - /// mio event - pub additional_request_probability: f32, pub network: NetworkConfig, - pub handler: HandlerConfig, + pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] pub cpu_pinning: CpuPinningConfig, } +impl Default for Config { + fn default() -> Self { + Self { + server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, + workers: 1, + duration: 0, + network: NetworkConfig::default(), + requests: RequestConfig::default(), + #[cfg(feature = "cpu-pinning")] + cpu_pinning: CpuPinningConfig::default_for_load_test(), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct NetworkConfig { @@ -60,9 +72,21 @@ pub struct NetworkConfig { pub recv_buffer: usize, } +impl Default for NetworkConfig { + fn default() -> Self { + Self { + multiple_client_ipv4s: true, + first_port: 45_000, + poll_timeout: 276, + poll_event_capacity: 2_877, + recv_buffer: 6_000_000, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] -pub struct HandlerConfig { +pub struct RequestConfig { /// Number of torrents to simulate pub number_of_torrents: usize, /// Maximum number of torrents to ask about in scrape requests @@ -82,37 +106,12 @@ pub struct HandlerConfig { pub torrent_selection_pareto_shape: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, + /// Probability that an additional connect request will be sent for each + /// mio event + pub additional_request_probability: f32, } -impl Default for Config { - fn default() -> Self { - Self { - server_address: "127.0.0.1:3000".parse().unwrap(), - log_level: LogLevel::Error, - workers: 1, - duration: 0, - additional_request_probability: 0.5, - network: NetworkConfig::default(), - handler: HandlerConfig::default(), - #[cfg(feature = "cpu-pinning")] - cpu_pinning: CpuPinningConfig::default_for_load_test(), - } - } -} - -impl Default for NetworkConfig { - fn default() -> Self { - Self { - multiple_client_ipv4s: true, - first_port: 45_000, - poll_timeout: 276, - poll_event_capacity: 2_877, - recv_buffer: 6_000_000, - } - } -} - -impl Default for HandlerConfig { +impl Default for RequestConfig { fn default() -> Self { Self { number_of_torrents: 10_000, @@ -122,6 +121,7 @@ impl Default for HandlerConfig { weight_announce: 5, weight_scrape: 1, torrent_selection_pareto_shape: 2.0, + additional_request_probability: 0.5, } } } diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 957fcbc..2488659 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -10,14 +10,13 @@ use rand_distr::Pareto; mod common; mod config; -mod handler; -mod network; mod utils; +mod worker; use common::*; use config::Config; -use network::*; use utils::*; +use worker::*; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; @@ -37,7 +36,9 @@ impl aquatic_cli_helpers::Config for Config { } fn run(config: Config) -> ::anyhow::Result<()> { - if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape + if config.requests.weight_announce + + config.requests.weight_connect + + config.requests.weight_scrape == 0 { panic!("Error: at least one weight must be larger than zero."); @@ -45,9 +46,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); + let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); - for _ in 0..config.handler.number_of_torrents { + for _ in 0..config.requests.number_of_torrents { info_hashes.push(generate_info_hash()); } @@ -56,12 +57,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { statistics: Arc::new(Statistics::default()), }; - let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap(); + let pareto = Pareto::new(1.0, config.requests.torrent_selection_pareto_shape).unwrap(); // Start workers for i in 0..config.workers { - let thread_id = ThreadId(i); let port = config.network.first_port + (i as u16); let ip = if config.server_address.is_ipv6() { @@ -86,7 +86,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - run_worker_thread(state, pareto, &config, addr, thread_id) + run_worker_thread(state, pareto, &config, addr) }); } diff --git a/aquatic_udp_load_test/src/utils.rs b/aquatic_udp_load_test/src/utils.rs index f88c211..bae1391 100644 --- a/aquatic_udp_load_test/src/utils.rs +++ b/aquatic_udp_load_test/src/utils.rs @@ -1,43 +1,8 @@ -use std::sync::Arc; - use rand::prelude::*; use rand_distr::Pareto; use aquatic_udp_protocol::*; -use crate::common::*; -use crate::config::Config; - -pub fn create_torrent_peer( - config: &Config, - rng: &mut impl Rng, - pareto: Pareto, - info_hashes: &Arc>, - connection_id: ConnectionId, -) -> TorrentPeer { - let num_scape_hashes = rng.gen_range(1..config.handler.scrape_max_torrents); - - let mut scrape_hash_indeces = Vec::new(); - - for _ in 0..num_scape_hashes { - scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) - } - - let info_hash_index = select_info_hash_index(config, rng, pareto); - - TorrentPeer { - info_hash: info_hashes[info_hash_index], - scrape_hash_indeces, - connection_id, - peer_id: generate_peer_id(), - port: Port(rand::random()), - } -} - -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { - pareto_usize(rng, pareto, config.handler.number_of_torrents - 1) -} - pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto, max: usize) -> usize { let p: f64 = rng.sample(pareto); let p = (p.min(101.0f64) - 1.0) / 100.0; diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/worker/mod.rs similarity index 95% rename from aquatic_udp_load_test/src/network.rs rename to aquatic_udp_load_test/src/worker/mod.rs index 3a61804..816dafc 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -1,3 +1,5 @@ +mod request_gen; + use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; @@ -12,48 +14,16 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; use crate::config::Config; -use crate::{common::*, handler::process_response, utils::*}; +use crate::{common::*, utils::*}; +use request_gen::process_response; -const MAX_PACKET_SIZE: usize = 4096; - -pub fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, err - ); - } - } - - socket - .bind(&addr.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); - - socket - .connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into() -} +const MAX_PACKET_SIZE: usize = 8192; pub fn run_worker_thread( state: LoadTestState, pareto: Pareto, config: &Config, addr: SocketAddr, - thread_id: ThreadId, ) { let mut socket = UdpSocket::from_std(create_socket(config, addr)); let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -61,7 +31,7 @@ pub fn run_worker_thread( let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()"); let mut torrent_peers = TorrentPeerMap::default(); - let token = Token(thread_id.0 as usize); + let token = Token(0); let interests = Interest::READABLE; let timeout = Duration::from_micros(config.network.poll_timeout); @@ -127,7 +97,7 @@ pub fn run_worker_thread( } } - if rng.gen::() <= config.additional_request_probability { + if rng.gen::() <= config.requests.additional_request_probability { let additional_request = create_connect_request(generate_transaction_id(&mut rng)); @@ -201,3 +171,35 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker *statistics = SocketWorkerLocalStatistics::default(); } + +fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, err + ); + } + } + + socket + .bind(&addr.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); + + socket + .connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into() +} diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/worker/request_gen.rs similarity index 83% rename from aquatic_udp_load_test/src/handler.rs rename to aquatic_udp_load_test/src/worker/request_gen.rs index 2611341..98091c8 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/worker/request_gen.rs @@ -115,9 +115,9 @@ fn create_random_request( torrent_peer: &TorrentPeer, ) -> Request { let weights = vec![ - config.handler.weight_announce as u32, - config.handler.weight_connect as u32, - config.handler.weight_scrape as u32, + config.requests.weight_announce as u32, + config.requests.weight_connect as u32, + config.requests.weight_scrape as u32, ]; let items = vec![ @@ -142,7 +142,7 @@ fn create_announce_request( transaction_id: TransactionId, ) -> Request { let (event, bytes_left) = { - if rng.gen_bool(config.handler.peer_seeder_probability) { + if rng.gen_bool(config.requests.peer_seeder_probability) { (AnnounceEvent::Completed, NumberOfBytes(0)) } else { (AnnounceEvent::Started, NumberOfBytes(50)) @@ -186,3 +186,33 @@ fn create_scrape_request( }) .into() } + +fn create_torrent_peer( + config: &Config, + rng: &mut impl Rng, + pareto: Pareto, + info_hashes: &Arc>, + connection_id: ConnectionId, +) -> TorrentPeer { + let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); + + let mut scrape_hash_indeces = Vec::new(); + + for _ in 0..num_scape_hashes { + scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) + } + + let info_hash_index = select_info_hash_index(config, rng, pareto); + + TorrentPeer { + info_hash: info_hashes[info_hash_index], + scrape_hash_indeces, + connection_id, + peer_id: generate_peer_id(), + port: Port(rand::random()), + } +} + +fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { + pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) +}