Merge pull request #186 from greatest-ape/improved-udp-load-test

Improve udp load test; use IndexMap::swap_remove explicitly; update deps
This commit is contained in:
Joakim Frostegård 2024-02-06 18:58:49 +01:00 committed by GitHub
commit ec5b6460b2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 674 additions and 524 deletions

46
Cargo.lock generated
View file

@ -78,9 +78,9 @@ dependencies = [
[[package]] [[package]]
name = "anstyle" name = "anstyle"
version = "1.0.5" version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc"
[[package]] [[package]]
name = "anstyle-parse" name = "anstyle-parse"
@ -145,7 +145,7 @@ dependencies = [
"regex", "regex",
"serde", "serde",
"tempfile", "tempfile",
"toml 0.8.9", "toml 0.8.10",
] ]
[[package]] [[package]]
@ -335,9 +335,9 @@ dependencies = [
"aquatic_common", "aquatic_common",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_udp_protocol", "aquatic_udp_protocol",
"hashbrown 0.14.3", "crossbeam-channel",
"hdrhistogram",
"mimalloc", "mimalloc",
"mio",
"quickcheck", "quickcheck",
"quickcheck_macros", "quickcheck_macros",
"rand", "rand",
@ -1349,9 +1349,9 @@ dependencies = [
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3"
[[package]] [[package]]
name = "hex" name = "hex"
@ -1527,7 +1527,7 @@ version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455"
dependencies = [ dependencies = [
"hermit-abi 0.3.4", "hermit-abi 0.3.5",
"rustix", "rustix",
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
@ -1774,7 +1774,7 @@ dependencies = [
"ordered-float", "ordered-float",
"quanta", "quanta",
"radix_trie", "radix_trie",
"sketches-ddsketch 0.2.1", "sketches-ddsketch 0.2.2",
] ]
[[package]] [[package]]
@ -1940,7 +1940,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [ dependencies = [
"hermit-abi 0.3.4", "hermit-abi 0.3.5",
"libc", "libc",
] ]
@ -2269,15 +2269,6 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "ref-cast" name = "ref-cast"
version = "1.0.22" version = "1.0.22"
@ -2573,9 +2564,9 @@ checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee"
[[package]] [[package]]
name = "sketches-ddsketch" name = "sketches-ddsketch"
version = "0.2.1" version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c"
[[package]] [[package]]
name = "slab" name = "slab"
@ -2705,13 +2696,12 @@ dependencies = [
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.9.0" version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"fastrand 2.0.1", "fastrand 2.0.1",
"redox_syscall",
"rustix", "rustix",
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
@ -2834,9 +2824,9 @@ dependencies = [
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.8.9" version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6a4b9e8023eb94392d3dca65d717c53abc5dad49c07cb65bb8fcd87115fa325" checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290"
dependencies = [ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
@ -2855,9 +2845,9 @@ dependencies = [
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
version = "0.21.1" version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" checksum = "0c9ffdf896f8daaabf9b66ba8e77ea1ed5ed0f72821b398aba62352e95062951"
dependencies = [ dependencies = [
"indexmap 2.2.2", "indexmap 2.2.2",
"serde", "serde",

View file

@ -25,7 +25,7 @@ struct Args {
#[arg(long, default_value_t = Priority::Medium)] #[arg(long, default_value_t = Priority::Medium)]
min_priority: Priority, min_priority: Priority,
/// How long to run each load test for /// How long to run each load test for
#[arg(long, default_value_t = 90)] #[arg(long, default_value_t = 30)]
duration: usize, duration: usize,
/// Only include data for last N seconds of load test runs. /// Only include data for last N seconds of load test runs.
/// ///
@ -33,7 +33,7 @@ struct Args {
/// maximum throughput /// maximum throughput
/// ///
/// 0 = use data for whole run /// 0 = use data for whole run
#[arg(long, default_value_t = 30)] #[arg(long, default_value_t = 0)]
summarize_last: usize, summarize_last: usize,
#[command(subcommand)] #[command(subcommand)]
command: Command, command: Command,

View file

@ -443,6 +443,8 @@ impl ProcessRunner for AquaticUdpLoadTestRunner {
c.duration = self.parameters.duration; c.duration = self.parameters.duration;
c.summarize_last = self.parameters.summarize_last; c.summarize_last = self.parameters.summarize_last;
c.extra_statistics = false;
c.requests.announce_peers_wanted = 30; c.requests.announce_peers_wanted = 30;
c.requests.weight_connect = 0; c.requests.weight_connect = 0;
c.requests.weight_announce = 100; c.requests.weight_announce = 100;

View file

@ -433,7 +433,7 @@ impl<I: Ip> LargePeerMap<I> {
} }
fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> { fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
let opt_removed_peer = self.peers.remove(key); let opt_removed_peer = self.peers.swap_remove(key);
if let Some(Peer { if let Some(Peer {
is_seeder: true, .. is_seeder: true, ..

View file

@ -97,7 +97,7 @@ pub fn run_statistics_worker(
*count -= 1; *count -= 1;
if *count == 0 { if *count == 0 {
peers.remove(&peer_id); peers.swap_remove(&peer_id);
} }
} }
} }

View file

@ -407,7 +407,7 @@ impl<I: Ip> LargePeerMap<I> {
} }
fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> { fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
let opt_removed_peer = self.peers.remove(key); let opt_removed_peer = self.peers.swap_remove(key);
if let Some(Peer { if let Some(Peer {
is_seeder: true, .. is_seeder: true, ..

View file

@ -25,9 +25,9 @@ aquatic_toml_config.workspace = true
aquatic_udp_protocol.workspace = true aquatic_udp_protocol.workspace = true
anyhow = "1" anyhow = "1"
hashbrown = "0.14" crossbeam-channel = "0.5"
hdrhistogram = "7"
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.8", features = ["net", "os-poll"] }
rand_distr = "0.4" rand_distr = "0.4"
rand = { version = "0.8", features = ["small_rng"] } rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }

View file

@ -1,22 +1,16 @@
use std::sync::{atomic::AtomicUsize, Arc}; use std::sync::{atomic::AtomicUsize, Arc};
use hashbrown::HashMap; use aquatic_common::IndexMap;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
#[derive(PartialEq, Eq, Clone)] #[derive(Clone)]
pub struct TorrentPeer { pub struct LoadTestState {
pub info_hash: InfoHash, pub info_hashes: Arc<[InfoHash]>,
pub scrape_hash_indices: Box<[usize]>, pub statistics: Arc<SharedStatistics>,
pub connection_id: ConnectionId,
pub peer_id: PeerId,
pub port: Port,
} }
pub type TorrentPeerMap = HashMap<TransactionId, TorrentPeer>;
#[derive(Default)] #[derive(Default)]
pub struct Statistics { pub struct SharedStatistics {
pub requests: AtomicUsize, pub requests: AtomicUsize,
pub response_peers: AtomicUsize, pub response_peers: AtomicUsize,
pub responses_connect: AtomicUsize, pub responses_connect: AtomicUsize,
@ -25,25 +19,14 @@ pub struct Statistics {
pub responses_error: AtomicUsize, pub responses_error: AtomicUsize,
} }
#[derive(Clone)] pub struct Peer {
pub struct LoadTestState { pub announce_info_hash_index: usize,
pub info_hashes: Arc<[InfoHash]>, pub announce_info_hash: InfoHash,
pub statistics: Arc<Statistics>, pub announce_port: Port,
pub scrape_info_hash_indices: Box<[usize]>,
pub socket_index: u8,
} }
#[derive(PartialEq, Eq, Clone, Copy)] pub enum StatisticsMessage {
pub enum RequestType { ResponsesPerInfoHash(IndexMap<usize, u64>),
Announce,
Connect,
Scrape,
}
#[derive(Default)]
pub struct SocketWorkerLocalStatistics {
pub requests: usize,
pub response_peers: usize,
pub responses_connect: usize,
pub responses_announce: usize,
pub responses_scrape: usize,
pub responses_error: usize,
} }

View file

@ -25,6 +25,8 @@ pub struct Config {
/// ///
/// 0 = include whole run /// 0 = include whole run
pub summarize_last: usize, pub summarize_last: usize,
/// Display extra statistics
pub extra_statistics: bool,
pub network: NetworkConfig, pub network: NetworkConfig,
pub requests: RequestConfig, pub requests: RequestConfig,
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
@ -39,6 +41,7 @@ impl Default for Config {
workers: 1, workers: 1,
duration: 0, duration: 0,
summarize_last: 0, summarize_last: 0,
extra_statistics: true,
network: NetworkConfig::default(), network: NetworkConfig::default(),
requests: RequestConfig::default(), requests: RequestConfig::default(),
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
@ -47,6 +50,12 @@ impl Default for Config {
} }
} }
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)] #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)] #[serde(default, deny_unknown_fields)]
pub struct NetworkConfig { pub struct NetworkConfig {
@ -57,10 +66,8 @@ pub struct NetworkConfig {
/// ///
/// Setting this to true can cause issues on macOS. /// Setting this to true can cause issues on macOS.
pub multiple_client_ipv4s: bool, pub multiple_client_ipv4s: bool,
/// Number of first client port /// Number of sockets to open per worker
pub first_port: u16, pub sockets_per_worker: u8,
/// Socket worker poll timeout in microseconds
pub poll_timeout: u64,
/// Size of socket recv buffer. Use 0 for OS default. /// Size of socket recv buffer. Use 0 for OS default.
/// ///
/// This setting can have a big impact on dropped packages. It might /// This setting can have a big impact on dropped packages. It might
@ -80,8 +87,7 @@ impl Default for NetworkConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
multiple_client_ipv4s: true, multiple_client_ipv4s: true,
first_port: 45_000, sockets_per_worker: 4,
poll_timeout: 1,
recv_buffer: 8_000_000, recv_buffer: 8_000_000,
} }
} }
@ -92,6 +98,8 @@ impl Default for NetworkConfig {
pub struct RequestConfig { pub struct RequestConfig {
/// Number of torrents to simulate /// Number of torrents to simulate
pub number_of_torrents: usize, pub number_of_torrents: usize,
/// Number of peers to simulate
pub number_of_peers: usize,
/// Maximum number of torrents to ask about in scrape requests /// Maximum number of torrents to ask about in scrape requests
pub scrape_max_torrents: usize, pub scrape_max_torrents: usize,
/// Ask for this number of peers in announce requests /// Ask for this number of peers in announce requests
@ -105,30 +113,21 @@ pub struct RequestConfig {
/// Probability that a generated request is a scrape request, as part /// Probability that a generated request is a scrape request, as part
/// of sum of the various weight arguments. /// of sum of the various weight arguments.
pub weight_scrape: usize, pub weight_scrape: usize,
/// Peers choose torrents according to this Gamma distribution shape
pub torrent_gamma_shape: f64,
/// Peers choose torrents according to this Gamma distribution scale
pub torrent_gamma_scale: f64,
/// Probability that a generated peer is a seeder /// Probability that a generated peer is a seeder
pub peer_seeder_probability: f64, pub peer_seeder_probability: f64,
/// Probability that an additional connect request will be sent for each
/// mio event
pub additional_request_probability: f32,
} }
impl Default for RequestConfig { impl Default for RequestConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
number_of_torrents: 10_000, number_of_torrents: 1_000_000,
number_of_peers: 2_000_000,
scrape_max_torrents: 10, scrape_max_torrents: 10,
announce_peers_wanted: 30, announce_peers_wanted: 30,
weight_connect: 0, weight_connect: 1,
weight_announce: 100, weight_announce: 100,
weight_scrape: 1, weight_scrape: 1,
torrent_gamma_shape: 0.2,
torrent_gamma_scale: 100.0,
peer_seeder_probability: 0.75, peer_seeder_probability: 0.75,
additional_request_probability: 0.5,
} }
} }
} }

View file

@ -1,3 +1,4 @@
use std::iter::repeat_with;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::{atomic::Ordering, Arc}; use std::sync::{atomic::Ordering, Arc};
@ -6,23 +7,23 @@ use std::time::{Duration, Instant};
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use rand_distr::Gamma; use aquatic_common::IndexMap;
use aquatic_udp_protocol::{InfoHash, Port};
use crossbeam_channel::{unbounded, Receiver};
use hdrhistogram::Histogram;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use rand_distr::{Distribution, WeightedAliasIndex};
pub mod common; mod common;
pub mod config; pub mod config;
pub mod utils; mod worker;
pub mod worker;
use common::*; use common::*;
use config::Config; use config::Config;
use utils::*;
use worker::*; use worker::*;
impl aquatic_common::cli::Config for Config { const PERCENTILES: &[f64] = &[10.0, 25.0, 50.0, 75.0, 90.0, 95.0, 99.0, 99.9, 100.0];
fn get_log_level(&self) -> Option<aquatic_common::cli::LogLevel> {
Some(self.log_level)
}
}
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.requests.weight_announce if config.requests.weight_announce
@ -37,30 +38,21 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
panic!("Error: report_last_seconds can't be larger than duration"); panic!("Error: report_last_seconds can't be larger than duration");
} }
println!("Starting client with config: {:#?}", config); println!("Starting client with config: {:#?}\n", config);
let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); let info_hash_dist = InfoHashDist::new(&config)?;
let peers_by_worker = create_peers(&config, &info_hash_dist);
for _ in 0..config.requests.number_of_torrents {
info_hashes.push(generate_info_hash());
}
let state = LoadTestState { let state = LoadTestState {
info_hashes: Arc::from(info_hashes.into_boxed_slice()), info_hashes: info_hash_dist.into_arc_info_hashes(),
statistics: Arc::new(Statistics::default()), statistics: Arc::new(SharedStatistics::default()),
}; };
let gamma = Gamma::new( let (statistics_sender, statistics_receiver) = unbounded();
config.requests.torrent_gamma_shape,
config.requests.torrent_gamma_scale,
)
.unwrap();
// Start workers // Start workers
for i in 0..config.workers { for (i, peers) in (0..config.workers).zip(peers_by_worker) {
let port = config.network.first_port + (i as u16);
let ip = if config.server_address.is_ipv6() { let ip = if config.server_address.is_ipv6() {
Ipv6Addr::LOCALHOST.into() Ipv6Addr::LOCALHOST.into()
} else if config.network.multiple_client_ipv4s { } else if config.network.multiple_client_ipv4s {
@ -69,9 +61,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
Ipv4Addr::LOCALHOST.into() Ipv4Addr::LOCALHOST.into()
}; };
let addr = SocketAddr::new(ip, port); let addr = SocketAddr::new(ip, 0);
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
let statistics_sender = statistics_sender.clone();
Builder::new().name("load-test".into()).spawn(move || { Builder::new().name("load-test".into()).spawn(move || {
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
@ -82,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::SocketWorker(i as usize), WorkerIndex::SocketWorker(i as usize),
); );
Worker::run(state, gamma, config, addr) Worker::run(config, state, statistics_sender, peers, addr)
})?; })?;
} }
@ -94,12 +87,16 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::Util, WorkerIndex::Util,
); );
monitor_statistics(state, &config); monitor_statistics(state, &config, statistics_receiver);
Ok(()) Ok(())
} }
fn monitor_statistics(state: LoadTestState, config: &Config) { fn monitor_statistics(
state: LoadTestState,
config: &Config,
statistics_receiver: Receiver<StatisticsMessage>,
) {
let mut report_avg_connect: Vec<f64> = Vec::new(); let mut report_avg_connect: Vec<f64> = Vec::new();
let mut report_avg_announce: Vec<f64> = Vec::new(); let mut report_avg_announce: Vec<f64> = Vec::new();
let mut report_avg_scrape: Vec<f64> = Vec::new(); let mut report_avg_scrape: Vec<f64> = Vec::new();
@ -115,6 +112,21 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
let time_elapsed = loop { let time_elapsed = loop {
thread::sleep(Duration::from_secs(INTERVAL)); thread::sleep(Duration::from_secs(INTERVAL));
let mut opt_responses_per_info_hash: Option<IndexMap<usize, u64>> =
config.extra_statistics.then_some(Default::default());
for message in statistics_receiver.try_iter() {
match message {
StatisticsMessage::ResponsesPerInfoHash(data) => {
if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_mut() {
for (k, v) in data {
*responses_per_info_hash.entry(k).or_default() += v;
}
}
}
}
}
let requests = fetch_and_reset(&state.statistics.requests); let requests = fetch_and_reset(&state.statistics.requests);
let response_peers = fetch_and_reset(&state.statistics.response_peers); let response_peers = fetch_and_reset(&state.statistics.response_peers);
let responses_connect = fetch_and_reset(&state.statistics.responses_connect); let responses_connect = fetch_and_reset(&state.statistics.responses_connect);
@ -158,6 +170,20 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
peers_per_announce_response peers_per_announce_response
); );
if let Some(responses_per_info_hash) = opt_responses_per_info_hash.as_ref() {
let mut histogram = Histogram::<u64>::new(2).unwrap();
for num_responses in responses_per_info_hash.values().copied() {
histogram.record(num_responses).unwrap();
}
println!("Announce responses per info hash:");
for p in PERCENTILES {
println!(" - p{}: {}", p, histogram.value_at_percentile(*p));
}
}
let time_elapsed = start_time.elapsed(); let time_elapsed = start_time.elapsed();
if config.duration != 0 && time_elapsed >= duration { if config.duration != 0 && time_elapsed >= duration {
@ -208,3 +234,121 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 {
atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 atomic_usize.fetch_and(0, Ordering::Relaxed) as f64
} }
fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec<Box<[Peer]>> {
let mut rng = SmallRng::seed_from_u64(0xc3a58be617b3acce);
let mut opt_peers_per_info_hash: Option<IndexMap<usize, u64>> =
config.extra_statistics.then_some(IndexMap::default());
let mut all_peers = repeat_with(|| {
let num_scrape_indices = rng.gen_range(1..config.requests.scrape_max_torrents + 1);
let scrape_info_hash_indices = repeat_with(|| info_hash_dist.get_random_index(&mut rng))
.take(num_scrape_indices)
.collect::<Vec<_>>()
.into_boxed_slice();
let (announce_info_hash_index, announce_info_hash) = info_hash_dist.get_random(&mut rng);
if let Some(peers_per_info_hash) = opt_peers_per_info_hash.as_mut() {
*peers_per_info_hash
.entry(announce_info_hash_index)
.or_default() += 1;
}
Peer {
announce_info_hash_index,
announce_info_hash,
announce_port: Port::new(rng.gen()),
scrape_info_hash_indices,
socket_index: rng.gen_range(0..config.network.sockets_per_worker),
}
})
.take(config.requests.number_of_peers)
.collect::<Vec<_>>();
if let Some(peers_per_info_hash) = opt_peers_per_info_hash {
println!("Number of info hashes: {}", peers_per_info_hash.len());
let mut histogram = Histogram::<u64>::new(2).unwrap();
for num_peers in peers_per_info_hash.values() {
histogram.record(*num_peers).unwrap();
}
println!("Peers per info hash:");
for p in PERCENTILES {
println!(" - p{}: {}", p, histogram.value_at_percentile(*p));
}
}
let mut peers_by_worker = Vec::new();
let num_peers_per_worker = all_peers.len() / config.workers as usize;
for _ in 0..(config.workers as usize) {
peers_by_worker.push(
all_peers
.split_off(all_peers.len() - num_peers_per_worker)
.into_boxed_slice(),
);
all_peers.shrink_to_fit();
}
peers_by_worker
}
struct InfoHashDist {
info_hashes: Box<[InfoHash]>,
dist: WeightedAliasIndex<f64>,
}
impl InfoHashDist {
fn new(config: &Config) -> anyhow::Result<Self> {
let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce);
let info_hashes = repeat_with(|| {
let mut bytes = [0u8; 20];
for byte in bytes.iter_mut() {
*byte = rng.gen();
}
InfoHash(bytes)
})
.take(config.requests.number_of_torrents)
.collect::<Vec<InfoHash>>()
.into_boxed_slice();
let num_torrents = config.requests.number_of_torrents as u32;
let weights = (0..num_torrents)
.map(|i| {
let floor = num_torrents as f64 / config.requests.number_of_peers as f64;
floor + (6.5f64 - ((500.0 * f64::from(i)) / f64::from(num_torrents))).exp()
})
.collect();
let dist = WeightedAliasIndex::new(weights)?;
Ok(Self { info_hashes, dist })
}
fn get_random(&self, rng: &mut impl Rng) -> (usize, InfoHash) {
let index = self.dist.sample(rng);
(index, self.info_hashes[index])
}
fn get_random_index(&self, rng: &mut impl Rng) -> usize {
self.dist.sample(rng)
}
fn into_arc_info_hashes(self) -> Arc<[InfoHash]> {
Arc::from(self.info_hashes)
}
}

View file

@ -1,36 +0,0 @@
use rand::prelude::*;
use rand_distr::Gamma;
use aquatic_udp_protocol::*;
pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma<f64>, max: usize) -> usize {
let p: f64 = rng.sample(gamma);
let p = (p.min(101.0f64) - 1.0) / 100.0;
(p * max as f64) as usize
}
pub fn generate_peer_id() -> PeerId {
PeerId(random_20_bytes())
}
pub fn generate_info_hash() -> InfoHash {
InfoHash(random_20_bytes())
}
pub fn generate_transaction_id(rng: &mut impl Rng) -> TransactionId {
TransactionId::new(rng.gen())
}
pub fn create_connect_request(transaction_id: TransactionId) -> Request {
(ConnectRequest { transaction_id }).into()
}
// Don't use SmallRng here for now
fn random_20_bytes() -> [u8; 20] {
let mut bytes = [0; 20];
thread_rng().fill_bytes(&mut bytes[..]);
bytes
}

View file

@ -0,0 +1,439 @@
use std::io::{Cursor, ErrorKind};
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::Ordering;
use std::time::Duration;
use aquatic_common::IndexMap;
use crossbeam_channel::Sender;
use rand::Rng;
use rand::{prelude::SmallRng, SeedableRng};
use rand_distr::{Distribution, WeightedIndex};
use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::*;
use crate::common::{LoadTestState, Peer};
use crate::config::Config;
use crate::StatisticsMessage;
const MAX_PACKET_SIZE: usize = 8192;
pub struct Worker {
config: Config,
shared_state: LoadTestState,
peers: Box<[Peer]>,
request_type_dist: RequestTypeDist,
addr: SocketAddr,
sockets: Vec<UdpSocket>,
buffer: [u8; MAX_PACKET_SIZE],
rng: SmallRng,
statistics: LocalStatistics,
statistics_sender: Sender<StatisticsMessage>,
announce_responses_per_info_hash: IndexMap<usize, u64>,
}
impl Worker {
pub fn run(
config: Config,
shared_state: LoadTestState,
statistics_sender: Sender<StatisticsMessage>,
peers: Box<[Peer]>,
addr: SocketAddr,
) {
let mut sockets = Vec::new();
for _ in 0..config.network.sockets_per_worker {
sockets.push(create_socket(&config, addr));
}
let buffer = [0u8; MAX_PACKET_SIZE];
let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce);
let statistics = LocalStatistics::default();
let request_type_dist = RequestTypeDist::new(&config).unwrap();
let mut instance = Self {
config,
shared_state,
peers,
request_type_dist,
addr,
sockets,
buffer,
rng,
statistics,
statistics_sender,
announce_responses_per_info_hash: Default::default(),
};
instance.run_inner();
}
fn run_inner(&mut self) {
let mut connection_ids = Vec::new();
for _ in 0..self.config.network.sockets_per_worker {
connection_ids.push(self.acquire_connection_id());
}
let mut requests_sent = 0usize;
let mut responses_received = 0usize;
let mut connect_socket_index = 0u8;
let mut peer_index = 0usize;
let mut loop_index = 0usize;
loop {
let response_ratio = responses_received as f64 / requests_sent.max(1) as f64;
if response_ratio >= 0.90 || requests_sent == 0 || self.rng.gen::<u8>() == 0 {
for _ in 0..self.sockets.len() {
match self.request_type_dist.sample(&mut self.rng) {
RequestType::Connect => {
self.send_connect_request(
connect_socket_index,
connect_socket_index.into(),
);
connect_socket_index = connect_socket_index.wrapping_add(1)
% self.config.network.sockets_per_worker;
}
RequestType::Announce => {
self.send_announce_request(&connection_ids, peer_index);
peer_index = (peer_index + 1) % self.peers.len();
}
RequestType::Scrape => {
self.send_scrape_request(&connection_ids, peer_index);
peer_index = (peer_index + 1) % self.peers.len();
}
}
requests_sent += 1;
}
}
for socket_index in 0..self.sockets.len() {
// Do this instead of iterating over Vec to fix borrow checker complaint
let socket = self.sockets.get(socket_index).unwrap();
match socket.recv(&mut self.buffer[..]) {
Ok(amt) => {
match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) {
Ok(Response::Connect(r)) => {
// If we're sending connect requests, we might
// as well keep connection IDs valid
let connection_id_index =
u32::from_ne_bytes(r.transaction_id.0.get().to_ne_bytes())
as usize;
connection_ids[connection_id_index] = r.connection_id;
self.handle_response(Response::Connect(r));
}
Ok(response) => {
self.handle_response(response);
}
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
}
responses_received += 1;
}
Err(err) if err.kind() == ErrorKind::WouldBlock => (),
Err(err) => {
eprintln!("recv error: {:#}", err);
}
}
}
if loop_index % 1024 == 0 {
self.update_shared_statistics();
}
loop_index = loop_index.wrapping_add(1);
}
}
fn acquire_connection_id(&mut self) -> ConnectionId {
loop {
self.send_connect_request(0, u32::MAX);
for _ in 0..100 {
match self.sockets[0].recv(&mut self.buffer[..]) {
Ok(amt) => {
match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) {
Ok(Response::Connect(r)) => {
return r.connection_id;
}
Ok(r) => {
eprintln!("Received non-connect response: {:?}", r);
}
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
::std::thread::sleep(Duration::from_millis(10));
}
Err(err) => {
eprintln!("recv error: {:#}", err);
}
};
}
}
}
fn send_connect_request(&mut self, socket_index: u8, transaction_id: u32) {
let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes()));
let request = ConnectRequest { transaction_id };
let mut cursor = Cursor::new(self.buffer);
request.write_bytes(&mut cursor).unwrap();
let position = cursor.position() as usize;
match self.sockets[socket_index as usize].send(&cursor.get_ref()[..position]) {
Ok(_) => {
self.statistics.requests += 1;
}
Err(err) => {
eprintln!("Couldn't send packet: {:?}", err);
}
}
}
fn send_announce_request(&mut self, connection_ids: &[ConnectionId], peer_index: usize) {
let peer = self.peers.get(peer_index).unwrap();
let (event, bytes_left) = {
if self
.rng
.gen_bool(self.config.requests.peer_seeder_probability)
{
(AnnounceEvent::Completed, NumberOfBytes::new(0))
} else {
(AnnounceEvent::Started, NumberOfBytes::new(50))
}
};
let transaction_id =
TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes()));
let request = AnnounceRequest {
connection_id: connection_ids[peer.socket_index as usize],
action_placeholder: Default::default(),
transaction_id,
info_hash: peer.announce_info_hash,
peer_id: PeerId([0; 20]),
bytes_downloaded: NumberOfBytes::new(50),
bytes_uploaded: NumberOfBytes::new(50),
bytes_left,
event: event.into(),
ip_address: Ipv4AddrBytes([0; 4]),
key: PeerKey::new(0),
peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted),
port: peer.announce_port,
};
let mut cursor = Cursor::new(self.buffer);
request.write_bytes(&mut cursor).unwrap();
let position = cursor.position() as usize;
match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) {
Ok(_) => {
self.statistics.requests += 1;
}
Err(err) => {
eprintln!("Couldn't send packet: {:?}", err);
}
}
}
fn send_scrape_request(&mut self, connection_ids: &[ConnectionId], peer_index: usize) {
let peer = self.peers.get(peer_index).unwrap();
let transaction_id =
TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes()));
let mut info_hashes = Vec::with_capacity(peer.scrape_info_hash_indices.len());
for i in peer.scrape_info_hash_indices.iter() {
info_hashes.push(self.shared_state.info_hashes[*i].to_owned())
}
let request = ScrapeRequest {
connection_id: connection_ids[peer.socket_index as usize],
transaction_id,
info_hashes,
};
let mut cursor = Cursor::new(self.buffer);
request.write_bytes(&mut cursor).unwrap();
let position = cursor.position() as usize;
match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) {
Ok(_) => {
self.statistics.requests += 1;
}
Err(err) => {
eprintln!("Couldn't send packet: {:?}", err);
}
}
}
fn handle_response(&mut self, response: Response) {
match response {
Response::Connect(_) => {
self.statistics.responses_connect += 1;
}
Response::AnnounceIpv4(r) => {
self.statistics.responses_announce += 1;
self.statistics.response_peers += r.peers.len();
let peer_index =
u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize;
if let Some(peer) = self.peers.get(peer_index) {
*self
.announce_responses_per_info_hash
.entry(peer.announce_info_hash_index)
.or_default() += 1;
}
}
Response::AnnounceIpv6(r) => {
self.statistics.responses_announce += 1;
self.statistics.response_peers += r.peers.len();
let peer_index =
u32::from_ne_bytes(r.fixed.transaction_id.0.get().to_ne_bytes()) as usize;
if let Some(peer) = self.peers.get(peer_index) {
*self
.announce_responses_per_info_hash
.entry(peer.announce_info_hash_index)
.or_default() += 1;
}
}
Response::Scrape(_) => {
self.statistics.responses_scrape += 1;
}
Response::Error(_) => {
self.statistics.responses_error += 1;
}
}
}
fn update_shared_statistics(&mut self) {
let shared_statistics = &self.shared_state.statistics;
shared_statistics
.requests
.fetch_add(self.statistics.requests, Ordering::Relaxed);
shared_statistics
.responses_connect
.fetch_add(self.statistics.responses_connect, Ordering::Relaxed);
shared_statistics
.responses_announce
.fetch_add(self.statistics.responses_announce, Ordering::Relaxed);
shared_statistics
.responses_scrape
.fetch_add(self.statistics.responses_scrape, Ordering::Relaxed);
shared_statistics
.responses_error
.fetch_add(self.statistics.responses_error, Ordering::Relaxed);
shared_statistics
.response_peers
.fetch_add(self.statistics.response_peers, Ordering::Relaxed);
if self.config.extra_statistics {
let message = StatisticsMessage::ResponsesPerInfoHash(
self.announce_responses_per_info_hash.split_off(0),
);
self.statistics_sender.try_send(message).unwrap();
}
self.statistics = LocalStatistics::default();
}
}
fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket {
let socket = if addr.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
if config.network.recv_buffer != 0 {
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) {
eprintln!(
"socket: failed setting recv buffer to {}: {:?}",
config.network.recv_buffer, err
);
}
}
socket
.bind(&addr.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err));
socket
.connect(&config.server_address.into())
.expect("socket: connect to server");
socket.into()
}
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum RequestType {
Announce,
Connect,
Scrape,
}
pub struct RequestTypeDist(WeightedIndex<usize>);
impl RequestTypeDist {
fn new(config: &Config) -> anyhow::Result<Self> {
let weights = [
config.requests.weight_announce,
config.requests.weight_connect,
config.requests.weight_scrape,
];
Ok(Self(WeightedIndex::new(weights)?))
}
fn sample(&self, rng: &mut impl Rng) -> RequestType {
const ITEMS: [RequestType; 3] = [
RequestType::Announce,
RequestType::Connect,
RequestType::Scrape,
];
ITEMS[self.0.sample(rng)]
}
}
#[derive(Default)]
pub struct LocalStatistics {
pub requests: usize,
pub response_peers: usize,
pub responses_connect: usize,
pub responses_announce: usize,
pub responses_scrape: usize,
pub responses_error: usize,
}

View file

@ -1,371 +0,0 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use mio::{net::UdpSocket, Events, Interest, Poll, Token};
use rand::Rng;
use rand::{prelude::SmallRng, SeedableRng};
use rand_distr::{Distribution, Gamma, WeightedIndex};
use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::*;
use crate::config::Config;
use crate::{common::*, utils::*};
const MAX_PACKET_SIZE: usize = 8192;
pub struct Worker {
config: Config,
shared_state: LoadTestState,
gamma: Gamma<f64>,
addr: SocketAddr,
socket: UdpSocket,
buffer: [u8; MAX_PACKET_SIZE],
rng: SmallRng,
torrent_peers: TorrentPeerMap,
statistics: SocketWorkerLocalStatistics,
}
impl Worker {
pub fn run(shared_state: LoadTestState, gamma: Gamma<f64>, config: Config, addr: SocketAddr) {
let socket = UdpSocket::from_std(create_socket(&config, addr));
let buffer = [0u8; MAX_PACKET_SIZE];
let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce);
let torrent_peers = TorrentPeerMap::default();
let statistics = SocketWorkerLocalStatistics::default();
let mut instance = Self {
config,
shared_state,
gamma,
addr,
socket,
buffer,
rng,
torrent_peers,
statistics,
};
instance.run_inner();
}
fn run_inner(&mut self) {
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(1);
poll.registry()
.register(&mut self.socket, Token(0), Interest::READABLE)
.unwrap();
// Bootstrap request cycle
let initial_request = create_connect_request(generate_transaction_id(&mut self.rng));
self.send_request(initial_request);
let timeout = Duration::from_micros(self.config.network.poll_timeout);
loop {
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for _ in events.iter() {
while let Ok(amt) = self.socket.recv(&mut self.buffer) {
match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) {
Ok(response) => {
if let Some(request) = self.process_response(response) {
self.send_request(request);
}
}
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
}
}
if self.rng.gen::<f32>() <= self.config.requests.additional_request_probability {
let additional_request =
create_connect_request(generate_transaction_id(&mut self.rng));
self.send_request(additional_request);
}
self.update_shared_statistics();
}
}
}
fn process_response(&mut self, response: Response) -> Option<Request> {
match response {
Response::Connect(r) => {
self.statistics.responses_connect += 1;
// Fetch the torrent peer or create it if is doesn't exists. Update
// the connection id if fetched. Create a request and move the
// torrent peer appropriately.
let mut torrent_peer = self
.torrent_peers
.remove(&r.transaction_id)
.unwrap_or_else(|| self.create_torrent_peer(r.connection_id));
torrent_peer.connection_id = r.connection_id;
let new_transaction_id = generate_transaction_id(&mut self.rng);
let request = self.create_random_request(new_transaction_id, &torrent_peer);
self.torrent_peers.insert(new_transaction_id, torrent_peer);
Some(request)
}
Response::AnnounceIpv4(r) => {
self.statistics.responses_announce += 1;
self.statistics.response_peers += r.peers.len();
self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id)
}
Response::AnnounceIpv6(r) => {
self.statistics.responses_announce += 1;
self.statistics.response_peers += r.peers.len();
self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id)
}
Response::Scrape(r) => {
self.statistics.responses_scrape += 1;
self.if_torrent_peer_move_and_create_random_request(r.transaction_id)
}
Response::Error(r) => {
self.statistics.responses_error += 1;
if !r.message.to_lowercase().contains("connection") {
eprintln!(
"Received error response which didn't contain the word 'connection': {}",
r.message
);
}
if let Some(torrent_peer) = self.torrent_peers.remove(&r.transaction_id) {
let new_transaction_id = generate_transaction_id(&mut self.rng);
self.torrent_peers.insert(new_transaction_id, torrent_peer);
Some(create_connect_request(new_transaction_id))
} else {
Some(create_connect_request(generate_transaction_id(
&mut self.rng,
)))
}
}
}
}
fn if_torrent_peer_move_and_create_random_request(
&mut self,
transaction_id: TransactionId,
) -> Option<Request> {
let torrent_peer = self.torrent_peers.remove(&transaction_id)?;
let new_transaction_id = generate_transaction_id(&mut self.rng);
let request = self.create_random_request(new_transaction_id, &torrent_peer);
self.torrent_peers.insert(new_transaction_id, torrent_peer);
Some(request)
}
fn create_torrent_peer(&mut self, connection_id: ConnectionId) -> TorrentPeer {
let num_scrape_hashes = self
.rng
.gen_range(1..self.config.requests.scrape_max_torrents);
let scrape_hash_indices = (0..num_scrape_hashes)
.map(|_| self.random_info_hash_index())
.collect::<Vec<_>>()
.into_boxed_slice();
let info_hash_index = self.random_info_hash_index();
TorrentPeer {
info_hash: self.shared_state.info_hashes[info_hash_index],
scrape_hash_indices,
connection_id,
peer_id: generate_peer_id(),
port: Port::new(self.rng.gen()),
}
}
fn create_random_request(
&mut self,
transaction_id: TransactionId,
torrent_peer: &TorrentPeer,
) -> Request {
const ITEMS: [RequestType; 3] = [
RequestType::Announce,
RequestType::Connect,
RequestType::Scrape,
];
let weights = [
self.config.requests.weight_announce as u32,
self.config.requests.weight_connect as u32,
self.config.requests.weight_scrape as u32,
];
let dist = WeightedIndex::new(weights).expect("random request weighted index");
match ITEMS[dist.sample(&mut self.rng)] {
RequestType::Announce => self.create_announce_request(torrent_peer, transaction_id),
RequestType::Connect => (ConnectRequest { transaction_id }).into(),
RequestType::Scrape => self.create_scrape_request(torrent_peer, transaction_id),
}
}
fn create_announce_request(
&mut self,
torrent_peer: &TorrentPeer,
transaction_id: TransactionId,
) -> Request {
let (event, bytes_left) = {
if self
.rng
.gen_bool(self.config.requests.peer_seeder_probability)
{
(AnnounceEvent::Completed, NumberOfBytes::new(0))
} else {
(AnnounceEvent::Started, NumberOfBytes::new(50))
}
};
(AnnounceRequest {
connection_id: torrent_peer.connection_id,
action_placeholder: Default::default(),
transaction_id,
info_hash: torrent_peer.info_hash,
peer_id: torrent_peer.peer_id,
bytes_downloaded: NumberOfBytes::new(50),
bytes_uploaded: NumberOfBytes::new(50),
bytes_left,
event: event.into(),
ip_address: Ipv4AddrBytes([0; 4]),
key: PeerKey::new(0),
peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted),
port: torrent_peer.port,
})
.into()
}
fn create_scrape_request(
&self,
torrent_peer: &TorrentPeer,
transaction_id: TransactionId,
) -> Request {
let indeces = &torrent_peer.scrape_hash_indices;
let mut scape_hashes = Vec::with_capacity(indeces.len());
for i in indeces.iter() {
scape_hashes.push(self.shared_state.info_hashes[*i].to_owned())
}
(ScrapeRequest {
connection_id: torrent_peer.connection_id,
transaction_id,
info_hashes: scape_hashes,
})
.into()
}
fn random_info_hash_index(&mut self) -> usize {
gamma_usize(
&mut self.rng,
self.gamma,
&self.config.requests.number_of_torrents - 1,
)
}
fn send_request(&mut self, request: Request) {
let mut cursor = Cursor::new(self.buffer);
match request.write_bytes(&mut cursor) {
Ok(()) => {
let position = cursor.position() as usize;
let inner = cursor.get_ref();
match self.socket.send(&inner[..position]) {
Ok(_) => {
self.statistics.requests += 1;
}
Err(err) => {
eprintln!("Couldn't send packet: {:?}", err);
}
}
}
Err(err) => {
eprintln!("request_to_bytes err: {}", err);
}
}
}
fn update_shared_statistics(&mut self) {
self.shared_state
.statistics
.requests
.fetch_add(self.statistics.requests, Ordering::Relaxed);
self.shared_state
.statistics
.responses_connect
.fetch_add(self.statistics.responses_connect, Ordering::Relaxed);
self.shared_state
.statistics
.responses_announce
.fetch_add(self.statistics.responses_announce, Ordering::Relaxed);
self.shared_state
.statistics
.responses_scrape
.fetch_add(self.statistics.responses_scrape, Ordering::Relaxed);
self.shared_state
.statistics
.responses_error
.fetch_add(self.statistics.responses_error, Ordering::Relaxed);
self.shared_state
.statistics
.response_peers
.fetch_add(self.statistics.response_peers, Ordering::Relaxed);
self.statistics = SocketWorkerLocalStatistics::default();
}
}
fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket {
let socket = if addr.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
if config.network.recv_buffer != 0 {
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) {
eprintln!(
"socket: failed setting recv buffer to {}: {:?}",
config.network.recv_buffer, err
);
}
}
socket
.bind(&addr.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err));
socket
.connect(&config.server_address.into())
.expect("socket: connect to server");
socket.into()
}

View file

@ -356,7 +356,7 @@ impl TorrentData {
peer.valid_until = valid_until; peer.valid_until = valid_until;
} }
PeerStatus::Stopped => { PeerStatus::Stopped => {
let peer = entry.remove(); let peer = entry.swap_remove();
if peer.seeder { if peer.seeder {
self.num_seeders -= 1; self.num_seeders -= 1;
@ -477,7 +477,7 @@ impl TorrentData {
if answer_receiver if answer_receiver
.expecting_answers .expecting_answers
.remove(&expecting_answer) .swap_remove(&expecting_answer)
.is_some() .is_some()
{ {
let answer_out_message = AnswerOutMessage { let answer_out_message = AnswerOutMessage {
@ -519,7 +519,7 @@ impl TorrentData {
peer_id: PeerId, peer_id: PeerId,
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
) { ) {
if let Some(peer) = self.peers.remove(&peer_id) { if let Some(peer) = self.peers.swap_remove(&peer_id) {
if peer.seeder { if peer.seeder {
self.num_seeders -= 1; self.num_seeders -= 1;
} }