mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 18:55:32 +00:00
Merge pull request #35 from greatest-ape/fixes-2021-11-28
udp and udp load test improvements, run cargo fmt
This commit is contained in:
commit
cf595f53e8
12 changed files with 166 additions and 151 deletions
|
|
@ -15,8 +15,8 @@ use std::{
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
mod common;
|
mod common;
|
||||||
mod workers;
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
|
mod workers;
|
||||||
|
|
||||||
pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
|
pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
|
||||||
|
|
||||||
|
|
@ -115,7 +115,12 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
|
||||||
WorkerIndex::RequestWorker(i),
|
WorkerIndex::RequestWorker(i),
|
||||||
);
|
);
|
||||||
|
|
||||||
workers::request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
|
workers::request::run_request_worker(
|
||||||
|
config,
|
||||||
|
state,
|
||||||
|
request_mesh_builder,
|
||||||
|
response_mesh_builder,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::{Sender, TrySendError};
|
||||||
|
|
||||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap};
|
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap};
|
||||||
use aquatic_common::AHashIndexMap;
|
use aquatic_common::AHashIndexMap;
|
||||||
|
|
@ -88,8 +88,14 @@ impl ConnectedRequestSender {
|
||||||
request: ConnectedRequest,
|
request: ConnectedRequest,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) {
|
) {
|
||||||
if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) {
|
match self.senders[index.0].try_send((self.index, request, addr)) {
|
||||||
::log::warn!("request_sender.try_send failed: {:?}", err)
|
Ok(()) => {}
|
||||||
|
Err(TrySendError::Full(_)) => {
|
||||||
|
::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0)
|
||||||
|
}
|
||||||
|
Err(TrySendError::Disconnected(_)) => {
|
||||||
|
panic!("Request channel {} is disconnected", index.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -109,8 +115,14 @@ impl ConnectedResponseSender {
|
||||||
response: ConnectedResponse,
|
response: ConnectedResponse,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) {
|
) {
|
||||||
if let Err(err) = self.senders[index.0].try_send((response, addr)) {
|
match self.senders[index.0].try_send((response, addr)) {
|
||||||
::log::warn!("request_sender.try_send failed: {:?}", err)
|
Ok(()) => {}
|
||||||
|
Err(TrySendError::Full(_)) => {
|
||||||
|
::log::error!("Response channel {} is full, dropping response. Try increasing number of socket workers or raising config.worker_channel_size.", index.0)
|
||||||
|
}
|
||||||
|
Err(TrySendError::Disconnected(_)) => {
|
||||||
|
panic!("Response channel {} is disconnected", index.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,9 +16,18 @@ pub struct Config {
|
||||||
/// generate responses and send them back to the socket workers.
|
/// generate responses and send them back to the socket workers.
|
||||||
pub request_workers: usize,
|
pub request_workers: usize,
|
||||||
pub log_level: LogLevel,
|
pub log_level: LogLevel,
|
||||||
|
/// Maximum number of items in each channel passing requests/responses
|
||||||
|
/// between workers. A value of zero means that the channel will be of
|
||||||
|
/// unbounded size.
|
||||||
|
pub worker_channel_size: usize,
|
||||||
|
/// How long to block waiting for requests in request workers. Higher
|
||||||
|
/// values means that with zero traffic, the worker will not unnecessarily
|
||||||
|
/// cause the CPU to wake up as often. However, high values (something like
|
||||||
|
/// larger than 1000) combined with very low traffic can cause delays
|
||||||
|
/// in torrent cleaning.
|
||||||
|
pub request_channel_recv_timeout_ms: u64,
|
||||||
pub network: NetworkConfig,
|
pub network: NetworkConfig,
|
||||||
pub protocol: ProtocolConfig,
|
pub protocol: ProtocolConfig,
|
||||||
pub handlers: HandlerConfig,
|
|
||||||
pub statistics: StatisticsConfig,
|
pub statistics: StatisticsConfig,
|
||||||
pub cleaning: CleaningConfig,
|
pub cleaning: CleaningConfig,
|
||||||
pub privileges: PrivilegeConfig,
|
pub privileges: PrivilegeConfig,
|
||||||
|
|
@ -33,9 +42,10 @@ impl Default for Config {
|
||||||
socket_workers: 1,
|
socket_workers: 1,
|
||||||
request_workers: 1,
|
request_workers: 1,
|
||||||
log_level: LogLevel::Error,
|
log_level: LogLevel::Error,
|
||||||
|
worker_channel_size: 0,
|
||||||
|
request_channel_recv_timeout_ms: 100,
|
||||||
network: NetworkConfig::default(),
|
network: NetworkConfig::default(),
|
||||||
protocol: ProtocolConfig::default(),
|
protocol: ProtocolConfig::default(),
|
||||||
handlers: HandlerConfig::default(),
|
|
||||||
statistics: StatisticsConfig::default(),
|
statistics: StatisticsConfig::default(),
|
||||||
cleaning: CleaningConfig::default(),
|
cleaning: CleaningConfig::default(),
|
||||||
privileges: PrivilegeConfig::default(),
|
privileges: PrivilegeConfig::default(),
|
||||||
|
|
@ -110,20 +120,6 @@ impl Default for ProtocolConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
#[serde(default)]
|
|
||||||
pub struct HandlerConfig {
|
|
||||||
pub channel_recv_timeout_ms: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for HandlerConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
channel_recv_timeout_ms: 100,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct StatisticsConfig {
|
pub struct StatisticsConfig {
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ use anyhow::Context;
|
||||||
#[cfg(feature = "cpu-pinning")]
|
#[cfg(feature = "cpu-pinning")]
|
||||||
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::{bounded, unbounded};
|
||||||
|
|
||||||
use aquatic_common::access_list::update_access_list;
|
use aquatic_common::access_list::update_access_list;
|
||||||
use signal_hook::consts::SIGUSR1;
|
use signal_hook::consts::SIGUSR1;
|
||||||
|
|
@ -40,14 +40,22 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let mut response_receivers = BTreeMap::new();
|
let mut response_receivers = BTreeMap::new();
|
||||||
|
|
||||||
for i in 0..config.request_workers {
|
for i in 0..config.request_workers {
|
||||||
let (request_sender, request_receiver) = unbounded();
|
let (request_sender, request_receiver) = if config.worker_channel_size == 0 {
|
||||||
|
unbounded()
|
||||||
|
} else {
|
||||||
|
bounded(config.worker_channel_size)
|
||||||
|
};
|
||||||
|
|
||||||
request_senders.push(request_sender);
|
request_senders.push(request_sender);
|
||||||
request_receivers.insert(i, request_receiver);
|
request_receivers.insert(i, request_receiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
for i in 0..config.socket_workers {
|
for i in 0..config.socket_workers {
|
||||||
let (response_sender, response_receiver) = unbounded();
|
let (response_sender, response_receiver) = if config.worker_channel_size == 0 {
|
||||||
|
unbounded()
|
||||||
|
} else {
|
||||||
|
bounded(config.worker_channel_size)
|
||||||
|
};
|
||||||
|
|
||||||
response_senders.push(response_sender);
|
response_senders.push(response_sender);
|
||||||
response_receivers.insert(i, response_receiver);
|
response_receivers.insert(i, response_receiver);
|
||||||
|
|
|
||||||
|
|
@ -90,7 +90,7 @@ pub fn run_request_worker(
|
||||||
let mut torrents = TorrentMaps::default();
|
let mut torrents = TorrentMaps::default();
|
||||||
let mut small_rng = SmallRng::from_entropy();
|
let mut small_rng = SmallRng::from_entropy();
|
||||||
|
|
||||||
let timeout = Duration::from_millis(config.handlers.channel_recv_timeout_ms);
|
let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms);
|
||||||
let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
|
let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
|
||||||
|
|
||||||
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
|
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,6 @@ use hashbrown::HashMap;
|
||||||
|
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
|
||||||
pub struct ThreadId(pub u8);
|
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Clone)]
|
#[derive(PartialEq, Eq, Clone)]
|
||||||
pub struct TorrentPeer {
|
pub struct TorrentPeer {
|
||||||
pub info_hash: InfoHash,
|
pub info_hash: InfoHash,
|
||||||
|
|
|
||||||
|
|
@ -18,15 +18,27 @@ pub struct Config {
|
||||||
pub workers: u8,
|
pub workers: u8,
|
||||||
/// Run duration (quit and generate report after this many seconds)
|
/// Run duration (quit and generate report after this many seconds)
|
||||||
pub duration: usize,
|
pub duration: usize,
|
||||||
/// Probability that an additional connect request will be sent for each
|
|
||||||
/// mio event
|
|
||||||
pub additional_request_probability: f32,
|
|
||||||
pub network: NetworkConfig,
|
pub network: NetworkConfig,
|
||||||
pub handler: HandlerConfig,
|
pub requests: RequestConfig,
|
||||||
#[cfg(feature = "cpu-pinning")]
|
#[cfg(feature = "cpu-pinning")]
|
||||||
pub cpu_pinning: CpuPinningConfig,
|
pub cpu_pinning: CpuPinningConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for Config {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
server_address: "127.0.0.1:3000".parse().unwrap(),
|
||||||
|
log_level: LogLevel::Error,
|
||||||
|
workers: 1,
|
||||||
|
duration: 0,
|
||||||
|
network: NetworkConfig::default(),
|
||||||
|
requests: RequestConfig::default(),
|
||||||
|
#[cfg(feature = "cpu-pinning")]
|
||||||
|
cpu_pinning: CpuPinningConfig::default_for_load_test(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct NetworkConfig {
|
pub struct NetworkConfig {
|
||||||
|
|
@ -60,9 +72,21 @@ pub struct NetworkConfig {
|
||||||
pub recv_buffer: usize,
|
pub recv_buffer: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for NetworkConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
multiple_client_ipv4s: true,
|
||||||
|
first_port: 45_000,
|
||||||
|
poll_timeout: 276,
|
||||||
|
poll_event_capacity: 2_877,
|
||||||
|
recv_buffer: 6_000_000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct HandlerConfig {
|
pub struct RequestConfig {
|
||||||
/// Number of torrents to simulate
|
/// Number of torrents to simulate
|
||||||
pub number_of_torrents: usize,
|
pub number_of_torrents: usize,
|
||||||
/// Maximum number of torrents to ask about in scrape requests
|
/// Maximum number of torrents to ask about in scrape requests
|
||||||
|
|
@ -82,37 +106,12 @@ pub struct HandlerConfig {
|
||||||
pub torrent_selection_pareto_shape: f64,
|
pub torrent_selection_pareto_shape: f64,
|
||||||
/// Probability that a generated peer is a seeder
|
/// Probability that a generated peer is a seeder
|
||||||
pub peer_seeder_probability: f64,
|
pub peer_seeder_probability: f64,
|
||||||
|
/// Probability that an additional connect request will be sent for each
|
||||||
|
/// mio event
|
||||||
|
pub additional_request_probability: f32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for RequestConfig {
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
server_address: "127.0.0.1:3000".parse().unwrap(),
|
|
||||||
log_level: LogLevel::Error,
|
|
||||||
workers: 1,
|
|
||||||
duration: 0,
|
|
||||||
additional_request_probability: 0.5,
|
|
||||||
network: NetworkConfig::default(),
|
|
||||||
handler: HandlerConfig::default(),
|
|
||||||
#[cfg(feature = "cpu-pinning")]
|
|
||||||
cpu_pinning: CpuPinningConfig::default_for_load_test(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for NetworkConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
multiple_client_ipv4s: true,
|
|
||||||
first_port: 45_000,
|
|
||||||
poll_timeout: 276,
|
|
||||||
poll_event_capacity: 2_877,
|
|
||||||
recv_buffer: 6_000_000,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for HandlerConfig {
|
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
number_of_torrents: 10_000,
|
number_of_torrents: 10_000,
|
||||||
|
|
@ -122,6 +121,7 @@ impl Default for HandlerConfig {
|
||||||
weight_announce: 5,
|
weight_announce: 5,
|
||||||
weight_scrape: 1,
|
weight_scrape: 1,
|
||||||
torrent_selection_pareto_shape: 2.0,
|
torrent_selection_pareto_shape: 2.0,
|
||||||
|
additional_request_probability: 0.5,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,14 +10,13 @@ use rand_distr::Pareto;
|
||||||
|
|
||||||
mod common;
|
mod common;
|
||||||
mod config;
|
mod config;
|
||||||
mod handler;
|
|
||||||
mod network;
|
|
||||||
mod utils;
|
mod utils;
|
||||||
|
mod worker;
|
||||||
|
|
||||||
use common::*;
|
use common::*;
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use network::*;
|
|
||||||
use utils::*;
|
use utils::*;
|
||||||
|
use worker::*;
|
||||||
|
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
|
||||||
|
|
@ -37,7 +36,9 @@ impl aquatic_cli_helpers::Config for Config {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run(config: Config) -> ::anyhow::Result<()> {
|
fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape
|
if config.requests.weight_announce
|
||||||
|
+ config.requests.weight_connect
|
||||||
|
+ config.requests.weight_scrape
|
||||||
== 0
|
== 0
|
||||||
{
|
{
|
||||||
panic!("Error: at least one weight must be larger than zero.");
|
panic!("Error: at least one weight must be larger than zero.");
|
||||||
|
|
@ -45,9 +46,9 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
|
|
||||||
println!("Starting client with config: {:#?}", config);
|
println!("Starting client with config: {:#?}", config);
|
||||||
|
|
||||||
let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents);
|
let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents);
|
||||||
|
|
||||||
for _ in 0..config.handler.number_of_torrents {
|
for _ in 0..config.requests.number_of_torrents {
|
||||||
info_hashes.push(generate_info_hash());
|
info_hashes.push(generate_info_hash());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -56,12 +57,11 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
statistics: Arc::new(Statistics::default()),
|
statistics: Arc::new(Statistics::default()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap();
|
let pareto = Pareto::new(1.0, config.requests.torrent_selection_pareto_shape).unwrap();
|
||||||
|
|
||||||
// Start workers
|
// Start workers
|
||||||
|
|
||||||
for i in 0..config.workers {
|
for i in 0..config.workers {
|
||||||
let thread_id = ThreadId(i);
|
|
||||||
let port = config.network.first_port + (i as u16);
|
let port = config.network.first_port + (i as u16);
|
||||||
|
|
||||||
let ip = if config.server_address.is_ipv6() {
|
let ip = if config.server_address.is_ipv6() {
|
||||||
|
|
@ -86,7 +86,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
WorkerIndex::SocketWorker(i as usize),
|
WorkerIndex::SocketWorker(i as usize),
|
||||||
);
|
);
|
||||||
|
|
||||||
run_worker_thread(state, pareto, &config, addr, thread_id)
|
run_worker_thread(state, pareto, &config, addr)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,43 +1,8 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use rand_distr::Pareto;
|
use rand_distr::Pareto;
|
||||||
|
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
|
|
||||||
use crate::common::*;
|
|
||||||
use crate::config::Config;
|
|
||||||
|
|
||||||
pub fn create_torrent_peer(
|
|
||||||
config: &Config,
|
|
||||||
rng: &mut impl Rng,
|
|
||||||
pareto: Pareto<f64>,
|
|
||||||
info_hashes: &Arc<Vec<InfoHash>>,
|
|
||||||
connection_id: ConnectionId,
|
|
||||||
) -> TorrentPeer {
|
|
||||||
let num_scape_hashes = rng.gen_range(1..config.handler.scrape_max_torrents);
|
|
||||||
|
|
||||||
let mut scrape_hash_indeces = Vec::new();
|
|
||||||
|
|
||||||
for _ in 0..num_scape_hashes {
|
|
||||||
scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto))
|
|
||||||
}
|
|
||||||
|
|
||||||
let info_hash_index = select_info_hash_index(config, rng, pareto);
|
|
||||||
|
|
||||||
TorrentPeer {
|
|
||||||
info_hash: info_hashes[info_hash_index],
|
|
||||||
scrape_hash_indeces,
|
|
||||||
connection_id,
|
|
||||||
peer_id: generate_peer_id(),
|
|
||||||
port: Port(rand::random()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto<f64>) -> usize {
|
|
||||||
pareto_usize(rng, pareto, config.handler.number_of_torrents - 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto<f64>, max: usize) -> usize {
|
pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto<f64>, max: usize) -> usize {
|
||||||
let p: f64 = rng.sample(pareto);
|
let p: f64 = rng.sample(pareto);
|
||||||
let p = (p.min(101.0f64) - 1.0) / 100.0;
|
let p = (p.min(101.0f64) - 1.0) / 100.0;
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
mod request_gen;
|
||||||
|
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
@ -12,48 +14,16 @@ use socket2::{Domain, Protocol, Socket, Type};
|
||||||
use aquatic_udp_protocol::*;
|
use aquatic_udp_protocol::*;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::{common::*, handler::process_response, utils::*};
|
use crate::{common::*, utils::*};
|
||||||
|
use request_gen::process_response;
|
||||||
|
|
||||||
const MAX_PACKET_SIZE: usize = 4096;
|
const MAX_PACKET_SIZE: usize = 8192;
|
||||||
|
|
||||||
pub fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket {
|
|
||||||
let socket = if addr.is_ipv4() {
|
|
||||||
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
|
|
||||||
} else {
|
|
||||||
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
|
|
||||||
}
|
|
||||||
.expect("create socket");
|
|
||||||
|
|
||||||
socket
|
|
||||||
.set_nonblocking(true)
|
|
||||||
.expect("socket: set nonblocking");
|
|
||||||
|
|
||||||
if config.network.recv_buffer != 0 {
|
|
||||||
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) {
|
|
||||||
eprintln!(
|
|
||||||
"socket: failed setting recv buffer to {}: {:?}",
|
|
||||||
config.network.recv_buffer, err
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
socket
|
|
||||||
.bind(&addr.into())
|
|
||||||
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err));
|
|
||||||
|
|
||||||
socket
|
|
||||||
.connect(&config.server_address.into())
|
|
||||||
.expect("socket: connect to server");
|
|
||||||
|
|
||||||
socket.into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run_worker_thread(
|
pub fn run_worker_thread(
|
||||||
state: LoadTestState,
|
state: LoadTestState,
|
||||||
pareto: Pareto<f64>,
|
pareto: Pareto<f64>,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
thread_id: ThreadId,
|
|
||||||
) {
|
) {
|
||||||
let mut socket = UdpSocket::from_std(create_socket(config, addr));
|
let mut socket = UdpSocket::from_std(create_socket(config, addr));
|
||||||
let mut buffer = [0u8; MAX_PACKET_SIZE];
|
let mut buffer = [0u8; MAX_PACKET_SIZE];
|
||||||
|
|
@ -61,7 +31,7 @@ pub fn run_worker_thread(
|
||||||
let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()");
|
let mut rng = SmallRng::from_rng(thread_rng()).expect("create SmallRng from thread_rng()");
|
||||||
let mut torrent_peers = TorrentPeerMap::default();
|
let mut torrent_peers = TorrentPeerMap::default();
|
||||||
|
|
||||||
let token = Token(thread_id.0 as usize);
|
let token = Token(0);
|
||||||
let interests = Interest::READABLE;
|
let interests = Interest::READABLE;
|
||||||
let timeout = Duration::from_micros(config.network.poll_timeout);
|
let timeout = Duration::from_micros(config.network.poll_timeout);
|
||||||
|
|
||||||
|
|
@ -127,7 +97,7 @@ pub fn run_worker_thread(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if rng.gen::<f32>() <= config.additional_request_probability {
|
if rng.gen::<f32>() <= config.requests.additional_request_probability {
|
||||||
let additional_request =
|
let additional_request =
|
||||||
create_connect_request(generate_transaction_id(&mut rng));
|
create_connect_request(generate_transaction_id(&mut rng));
|
||||||
|
|
||||||
|
|
@ -201,3 +171,35 @@ fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorker
|
||||||
|
|
||||||
*statistics = SocketWorkerLocalStatistics::default();
|
*statistics = SocketWorkerLocalStatistics::default();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket {
|
||||||
|
let socket = if addr.is_ipv4() {
|
||||||
|
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
|
||||||
|
} else {
|
||||||
|
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
|
||||||
|
}
|
||||||
|
.expect("create socket");
|
||||||
|
|
||||||
|
socket
|
||||||
|
.set_nonblocking(true)
|
||||||
|
.expect("socket: set nonblocking");
|
||||||
|
|
||||||
|
if config.network.recv_buffer != 0 {
|
||||||
|
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) {
|
||||||
|
eprintln!(
|
||||||
|
"socket: failed setting recv buffer to {}: {:?}",
|
||||||
|
config.network.recv_buffer, err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
socket
|
||||||
|
.bind(&addr.into())
|
||||||
|
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err));
|
||||||
|
|
||||||
|
socket
|
||||||
|
.connect(&config.server_address.into())
|
||||||
|
.expect("socket: connect to server");
|
||||||
|
|
||||||
|
socket.into()
|
||||||
|
}
|
||||||
|
|
@ -115,9 +115,9 @@ fn create_random_request(
|
||||||
torrent_peer: &TorrentPeer,
|
torrent_peer: &TorrentPeer,
|
||||||
) -> Request {
|
) -> Request {
|
||||||
let weights = vec![
|
let weights = vec![
|
||||||
config.handler.weight_announce as u32,
|
config.requests.weight_announce as u32,
|
||||||
config.handler.weight_connect as u32,
|
config.requests.weight_connect as u32,
|
||||||
config.handler.weight_scrape as u32,
|
config.requests.weight_scrape as u32,
|
||||||
];
|
];
|
||||||
|
|
||||||
let items = vec![
|
let items = vec![
|
||||||
|
|
@ -142,7 +142,7 @@ fn create_announce_request(
|
||||||
transaction_id: TransactionId,
|
transaction_id: TransactionId,
|
||||||
) -> Request {
|
) -> Request {
|
||||||
let (event, bytes_left) = {
|
let (event, bytes_left) = {
|
||||||
if rng.gen_bool(config.handler.peer_seeder_probability) {
|
if rng.gen_bool(config.requests.peer_seeder_probability) {
|
||||||
(AnnounceEvent::Completed, NumberOfBytes(0))
|
(AnnounceEvent::Completed, NumberOfBytes(0))
|
||||||
} else {
|
} else {
|
||||||
(AnnounceEvent::Started, NumberOfBytes(50))
|
(AnnounceEvent::Started, NumberOfBytes(50))
|
||||||
|
|
@ -186,3 +186,33 @@ fn create_scrape_request(
|
||||||
})
|
})
|
||||||
.into()
|
.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn create_torrent_peer(
|
||||||
|
config: &Config,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
pareto: Pareto<f64>,
|
||||||
|
info_hashes: &Arc<Vec<InfoHash>>,
|
||||||
|
connection_id: ConnectionId,
|
||||||
|
) -> TorrentPeer {
|
||||||
|
let num_scape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents);
|
||||||
|
|
||||||
|
let mut scrape_hash_indeces = Vec::new();
|
||||||
|
|
||||||
|
for _ in 0..num_scape_hashes {
|
||||||
|
scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto))
|
||||||
|
}
|
||||||
|
|
||||||
|
let info_hash_index = select_info_hash_index(config, rng, pareto);
|
||||||
|
|
||||||
|
TorrentPeer {
|
||||||
|
info_hash: info_hashes[info_hash_index],
|
||||||
|
scrape_hash_indeces,
|
||||||
|
connection_id,
|
||||||
|
peer_id: generate_peer_id(),
|
||||||
|
port: Port(rand::random()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto<f64>) -> usize {
|
||||||
|
pareto_usize(rng, pareto, config.requests.number_of_torrents - 1)
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue