diff --git a/Cargo.lock b/Cargo.lock index 41caaab..a61454d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -335,9 +335,8 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", - "hashbrown 0.14.3", + "hdrhistogram", "mimalloc", - "mio", "quickcheck", "quickcheck_macros", "rand", diff --git a/crates/udp_load_test/Cargo.toml b/crates/udp_load_test/Cargo.toml index 2eedf92..076810d 100644 --- a/crates/udp_load_test/Cargo.toml +++ b/crates/udp_load_test/Cargo.toml @@ -25,9 +25,8 @@ aquatic_toml_config.workspace = true aquatic_udp_protocol.workspace = true anyhow = "1" -hashbrown = "0.14" +hdrhistogram = "7" mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.8", features = ["net", "os-poll"] } rand_distr = "0.4" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7cbffbd..7470830 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -1,22 +1,15 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use hashbrown::HashMap; - use aquatic_udp_protocol::*; -#[derive(PartialEq, Eq, Clone)] -pub struct TorrentPeer { - pub info_hash: InfoHash, - pub scrape_hash_indices: Box<[usize]>, - pub connection_id: ConnectionId, - pub peer_id: PeerId, - pub port: Port, +#[derive(Clone)] +pub struct LoadTestState { + pub info_hashes: Arc<[InfoHash]>, + pub statistics: Arc, } -pub type TorrentPeerMap = HashMap; - #[derive(Default)] -pub struct Statistics { +pub struct SharedStatistics { pub requests: AtomicUsize, pub response_peers: AtomicUsize, pub responses_connect: AtomicUsize, @@ -25,25 +18,8 @@ pub struct Statistics { pub responses_error: AtomicUsize, } -#[derive(Clone)] -pub struct LoadTestState { - pub info_hashes: Arc<[InfoHash]>, - pub statistics: Arc, -} - -#[derive(PartialEq, Eq, Clone, Copy)] -pub enum RequestType { - Announce, - Connect, - Scrape, -} - -#[derive(Default)] -pub struct SocketWorkerLocalStatistics { - pub requests: usize, - pub response_peers: usize, - pub responses_connect: usize, - pub responses_announce: usize, - pub responses_scrape: usize, - pub responses_error: usize, +pub struct Peer { + pub announce_info_hash: InfoHash, + pub announce_port: Port, + pub scrape_info_hash_indices: Box<[usize]>, } diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 5571b11..0f0b701 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -25,6 +25,8 @@ pub struct Config { /// /// 0 = include whole run pub summarize_last: usize, + /// Display data on number of peers per info hash + pub peer_histogram: bool, pub network: NetworkConfig, pub requests: RequestConfig, #[cfg(feature = "cpu-pinning")] @@ -39,6 +41,7 @@ impl Default for Config { workers: 1, duration: 0, summarize_last: 0, + peer_histogram: true, network: NetworkConfig::default(), requests: RequestConfig::default(), #[cfg(feature = "cpu-pinning")] @@ -59,8 +62,6 @@ pub struct NetworkConfig { pub multiple_client_ipv4s: bool, /// Number of first client port pub first_port: u16, - /// Socket worker poll timeout in microseconds - pub poll_timeout: u64, /// Size of socket recv buffer. Use 0 for OS default. /// /// This setting can have a big impact on dropped packages. It might @@ -81,7 +82,6 @@ impl Default for NetworkConfig { Self { multiple_client_ipv4s: true, first_port: 45_000, - poll_timeout: 1, recv_buffer: 8_000_000, } } @@ -92,6 +92,8 @@ impl Default for NetworkConfig { pub struct RequestConfig { /// Number of torrents to simulate pub number_of_torrents: usize, + /// Number of peers to simulate + pub number_of_peers: usize, /// Maximum number of torrents to ask about in scrape requests pub scrape_max_torrents: usize, /// Ask for this number of peers in announce requests @@ -105,30 +107,21 @@ pub struct RequestConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, - /// Peers choose torrents according to this Gamma distribution shape - pub torrent_gamma_shape: f64, - /// Peers choose torrents according to this Gamma distribution scale - pub torrent_gamma_scale: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, - /// Probability that an additional connect request will be sent for each - /// mio event - pub additional_request_probability: f32, } impl Default for RequestConfig { fn default() -> Self { Self { number_of_torrents: 10_000, + number_of_peers: 100_000, scrape_max_torrents: 10, announce_peers_wanted: 30, weight_connect: 0, weight_announce: 100, weight_scrape: 1, - torrent_gamma_shape: 0.2, - torrent_gamma_scale: 100.0, peer_seeder_probability: 0.75, - additional_request_probability: 0.5, } } } diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 42a4ada..61dfbd5 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -1,3 +1,4 @@ +use std::iter::repeat_with; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::{atomic::Ordering, Arc}; @@ -6,16 +7,19 @@ use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use rand_distr::Gamma; +use aquatic_common::IndexMap; +use aquatic_udp_protocol::{InfoHash, Port}; +use hdrhistogram::Histogram; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use rand_distr::{Distribution, WeightedAliasIndex}; -pub mod common; +mod common; pub mod config; -pub mod utils; -pub mod worker; +mod worker; use common::*; use config::Config; -use utils::*; use worker::*; impl aquatic_common::cli::Config for Config { @@ -39,26 +43,17 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { println!("Starting client with config: {:#?}", config); - let mut info_hashes = Vec::with_capacity(config.requests.number_of_torrents); - - for _ in 0..config.requests.number_of_torrents { - info_hashes.push(generate_info_hash()); - } + let info_hash_dist = InfoHashDist::new(&config)?; + let peers_by_worker = create_peers(&config, &info_hash_dist); let state = LoadTestState { - info_hashes: Arc::from(info_hashes.into_boxed_slice()), - statistics: Arc::new(Statistics::default()), + info_hashes: info_hash_dist.into_arc_info_hashes(), + statistics: Arc::new(SharedStatistics::default()), }; - let gamma = Gamma::new( - config.requests.torrent_gamma_shape, - config.requests.torrent_gamma_scale, - ) - .unwrap(); - // Start workers - for i in 0..config.workers { + for (i, peers) in (0..config.workers).zip(peers_by_worker) { let port = config.network.first_port + (i as u16); let ip = if config.server_address.is_ipv6() { @@ -82,7 +77,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - Worker::run(state, gamma, config, addr) + Worker::run(config, state, peers, addr) })?; } @@ -208,3 +203,121 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { fn fetch_and_reset(atomic_usize: &AtomicUsize) -> f64 { atomic_usize.fetch_and(0, Ordering::Relaxed) as f64 } + +fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec> { + let mut rng = SmallRng::seed_from_u64(0xc3a58be617b3acce); + + let mut opt_peers_per_info_hash: Option> = + config.peer_histogram.then_some(IndexMap::default()); + + let mut all_peers = repeat_with(|| { + let num_scrape_indices = rng.gen_range(1..config.requests.scrape_max_torrents + 1); + + let scrape_info_hash_indices = repeat_with(|| info_hash_dist.get_random_index(&mut rng)) + .take(num_scrape_indices) + .collect::>() + .into_boxed_slice(); + + let (announce_info_hash_index, announce_info_hash) = info_hash_dist.get_random(&mut rng); + + if let Some(peers_per_info_hash) = opt_peers_per_info_hash.as_mut() { + *peers_per_info_hash + .entry(announce_info_hash_index) + .or_default() += 1; + } + + Peer { + announce_info_hash, + announce_port: Port::new(rng.gen()), + scrape_info_hash_indices, + } + }) + .take(config.requests.number_of_peers) + .collect::>(); + + if let Some(peers_per_info_hash) = opt_peers_per_info_hash { + let mut histogram = Histogram::::new(2).unwrap(); + + for num_peers in peers_per_info_hash.values() { + histogram.record(*num_peers).unwrap(); + } + + let percentiles = [ + 1.0, 10.0, 25.0, 50.0, 75.0, 85.0, 90.0, 95.0, 98.0, 99.9, 100.0, + ]; + + for p in percentiles { + let value = histogram.value_at_percentile(p); + + println!("Peers at info hash percentile {}: {}", p, value); + } + } + + let mut peers_by_worker = Vec::new(); + + let num_peers_per_worker = all_peers.len() / config.workers as usize; + + for _ in 0..(config.workers as usize) { + peers_by_worker.push( + all_peers + .split_off(all_peers.len() - num_peers_per_worker) + .into_boxed_slice(), + ); + + all_peers.shrink_to_fit(); + } + + peers_by_worker +} + +struct InfoHashDist { + info_hashes: Box<[InfoHash]>, + dist: WeightedAliasIndex, +} + +impl InfoHashDist { + fn new(config: &Config) -> anyhow::Result { + let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + + let info_hashes = repeat_with(|| { + let mut bytes = [0u8; 20]; + + for byte in bytes.iter_mut() { + *byte = rng.gen(); + } + + InfoHash(bytes) + }) + .take(config.requests.number_of_torrents) + .collect::>() + .into_boxed_slice(); + + let num_torrents = config.requests.number_of_torrents as u32; + + let weights = (0..num_torrents) + .map(|i| { + let floor = num_torrents as f64 / config.requests.number_of_peers as f64; + + floor + (7.0f64 - ((300.0 * f64::from(i)) / f64::from(num_torrents))).exp() + }) + .collect(); + + let dist = WeightedAliasIndex::new(weights)?; + + Ok(Self { info_hashes, dist }) + } + + fn get_random(&self, rng: &mut impl Rng) -> (usize, InfoHash) { + let index = self.dist.sample(rng); + + (index, self.info_hashes[index]) + } + + fn get_random_index(&self, rng: &mut impl Rng) -> usize { + self.dist.sample(rng) + } + + fn into_arc_info_hashes(self) -> Arc<[InfoHash]> { + Arc::from(self.info_hashes) + } +} diff --git a/crates/udp_load_test/src/utils.rs b/crates/udp_load_test/src/utils.rs deleted file mode 100644 index 6b7f748..0000000 --- a/crates/udp_load_test/src/utils.rs +++ /dev/null @@ -1,36 +0,0 @@ -use rand::prelude::*; -use rand_distr::Gamma; - -use aquatic_udp_protocol::*; - -pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma, max: usize) -> usize { - let p: f64 = rng.sample(gamma); - let p = (p.min(101.0f64) - 1.0) / 100.0; - - (p * max as f64) as usize -} - -pub fn generate_peer_id() -> PeerId { - PeerId(random_20_bytes()) -} - -pub fn generate_info_hash() -> InfoHash { - InfoHash(random_20_bytes()) -} - -pub fn generate_transaction_id(rng: &mut impl Rng) -> TransactionId { - TransactionId::new(rng.gen()) -} - -pub fn create_connect_request(transaction_id: TransactionId) -> Request { - (ConnectRequest { transaction_id }).into() -} - -// Don't use SmallRng here for now -fn random_20_bytes() -> [u8; 20] { - let mut bytes = [0; 20]; - - thread_rng().fill_bytes(&mut bytes[..]); - - bytes -} diff --git a/crates/udp_load_test/src/worker.rs b/crates/udp_load_test/src/worker.rs new file mode 100644 index 0000000..102875c --- /dev/null +++ b/crates/udp_load_test/src/worker.rs @@ -0,0 +1,365 @@ +use std::io::{Cursor, ErrorKind}; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use rand::Rng; +use rand::{prelude::SmallRng, SeedableRng}; +use rand_distr::{Distribution, WeightedIndex}; +use socket2::{Domain, Protocol, Socket, Type}; + +use aquatic_udp_protocol::*; + +use crate::common::{LoadTestState, Peer}; +use crate::config::Config; + +const MAX_PACKET_SIZE: usize = 8192; + +pub struct Worker { + config: Config, + shared_state: LoadTestState, + peers: Box<[Peer]>, + request_type_dist: RequestTypeDist, + addr: SocketAddr, + socket: UdpSocket, + buffer: [u8; MAX_PACKET_SIZE], + rng: SmallRng, + statistics: LocalStatistics, +} + +impl Worker { + pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { + let socket = create_socket(&config, addr); + let buffer = [0u8; MAX_PACKET_SIZE]; + let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + let statistics = LocalStatistics::default(); + let request_type_dist = RequestTypeDist::new(&config).unwrap(); + + let mut instance = Self { + config, + shared_state, + peers, + request_type_dist, + addr, + socket, + buffer, + rng, + statistics, + }; + + instance.run_inner(); + } + + fn run_inner(&mut self) { + let connection_id = self.aquire_connection_id(); + + let mut requests_sent = 0usize; + let mut responses_received = 0usize; + + let mut peer_index = 0usize; + let mut loop_index = 0usize; + + loop { + let response_ratio = responses_received as f64 / requests_sent.max(1) as f64; + + if response_ratio >= 0.95 || requests_sent == 0 || self.rng.gen::() == 0 { + match self.request_type_dist.sample(&mut self.rng) { + RequestType::Connect => { + self.send_connect_request(u32::MAX - 1); + } + RequestType::Announce => { + self.send_announce_request(connection_id, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } + RequestType::Scrape => { + self.send_scrape_request(connection_id, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } + } + + requests_sent += 1; + } + + match self.socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(response) => { + self.handle_response(response); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + + responses_received += 1; + } + Err(err) if err.kind() == ErrorKind::WouldBlock => (), + Err(err) => { + eprintln!("recv error: {:#}", err); + } + } + + if loop_index % 1024 == 0 { + self.update_shared_statistics(); + } + + loop_index = loop_index.wrapping_add(1); + } + } + + fn aquire_connection_id(&mut self) -> ConnectionId { + loop { + self.send_connect_request(u32::MAX); + + for _ in 0..100 { + match self.socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(Response::Connect(r)) => { + return r.connection_id; + } + Ok(r) => { + eprintln!("Received non-connect response: {:?}", r); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + ::std::thread::sleep(Duration::from_millis(10)); + } + Err(err) => { + eprintln!("recv error: {:#}", err); + } + }; + } + } + } + + fn send_connect_request(&mut self, transaction_id: u32) { + let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes())); + + let request = ConnectRequest { transaction_id }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.socket.send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn send_announce_request(&mut self, connection_id: ConnectionId, peer_index: usize) { + let peer = self.peers.get(peer_index).unwrap(); + + let (event, bytes_left) = { + if self + .rng + .gen_bool(self.config.requests.peer_seeder_probability) + { + (AnnounceEvent::Completed, NumberOfBytes::new(0)) + } else { + (AnnounceEvent::Started, NumberOfBytes::new(50)) + } + }; + + let transaction_id = + TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); + + let request = AnnounceRequest { + connection_id, + action_placeholder: Default::default(), + transaction_id, + info_hash: peer.announce_info_hash, + peer_id: PeerId([0; 20]), + bytes_downloaded: NumberOfBytes::new(50), + bytes_uploaded: NumberOfBytes::new(50), + bytes_left, + event: event.into(), + ip_address: Ipv4AddrBytes([0; 4]), + key: PeerKey::new(0), + peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), + port: peer.announce_port, + }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.socket.send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn send_scrape_request(&mut self, connection_id: ConnectionId, peer_index: usize) { + let peer = self.peers.get(peer_index).unwrap(); + + let transaction_id = + TransactionId::new(i32::from_ne_bytes((peer_index as u32).to_ne_bytes())); + + let mut info_hashes = Vec::with_capacity(peer.scrape_info_hash_indices.len()); + + for i in peer.scrape_info_hash_indices.iter() { + info_hashes.push(self.shared_state.info_hashes[*i].to_owned()) + } + + let request = ScrapeRequest { + connection_id, + transaction_id, + info_hashes, + }; + + let mut cursor = Cursor::new(self.buffer); + + request.write_bytes(&mut cursor).unwrap(); + + let position = cursor.position() as usize; + + match self.socket.send(&cursor.get_ref()[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + + fn handle_response(&mut self, response: Response) { + match response { + Response::Connect(_) => { + self.statistics.responses_connect += 1; + } + Response::AnnounceIpv4(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + } + Response::AnnounceIpv6(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + } + Response::Scrape(_) => { + self.statistics.responses_scrape += 1; + } + Response::Error(_) => { + self.statistics.responses_error += 1; + } + } + } + + fn update_shared_statistics(&mut self) { + let shared_statistics = &self.shared_state.statistics; + + shared_statistics + .requests + .fetch_add(self.statistics.requests, Ordering::Relaxed); + shared_statistics + .responses_connect + .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); + shared_statistics + .responses_announce + .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); + shared_statistics + .responses_scrape + .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); + shared_statistics + .responses_error + .fetch_add(self.statistics.responses_error, Ordering::Relaxed); + shared_statistics + .response_peers + .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + + self.statistics = LocalStatistics::default(); + } +} + +fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { + let socket = if addr.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + if config.network.recv_buffer != 0 { + if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + config.network.recv_buffer, err + ); + } + } + + socket + .bind(&addr.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); + + socket + .connect(&config.server_address.into()) + .expect("socket: connect to server"); + + socket.into() +} + +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum RequestType { + Announce, + Connect, + Scrape, +} + +pub struct RequestTypeDist(WeightedIndex); + +impl RequestTypeDist { + fn new(config: &Config) -> anyhow::Result { + let weights = [ + config.requests.weight_announce, + config.requests.weight_connect, + config.requests.weight_scrape, + ]; + + Ok(Self(WeightedIndex::new(weights)?)) + } + + fn sample(&self, rng: &mut impl Rng) -> RequestType { + const ITEMS: [RequestType; 3] = [ + RequestType::Announce, + RequestType::Connect, + RequestType::Scrape, + ]; + + ITEMS[self.0.sample(rng)] + } +} + +#[derive(Default)] +pub struct LocalStatistics { + pub requests: usize, + pub response_peers: usize, + pub responses_connect: usize, + pub responses_announce: usize, + pub responses_scrape: usize, + pub responses_error: usize, +} diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs deleted file mode 100644 index 8c7c606..0000000 --- a/crates/udp_load_test/src/worker/mod.rs +++ /dev/null @@ -1,371 +0,0 @@ -use std::io::Cursor; -use std::net::SocketAddr; -use std::sync::atomic::Ordering; -use std::time::Duration; - -use mio::{net::UdpSocket, Events, Interest, Poll, Token}; -use rand::Rng; -use rand::{prelude::SmallRng, SeedableRng}; -use rand_distr::{Distribution, Gamma, WeightedIndex}; -use socket2::{Domain, Protocol, Socket, Type}; - -use aquatic_udp_protocol::*; - -use crate::config::Config; -use crate::{common::*, utils::*}; - -const MAX_PACKET_SIZE: usize = 8192; - -pub struct Worker { - config: Config, - shared_state: LoadTestState, - gamma: Gamma, - addr: SocketAddr, - socket: UdpSocket, - buffer: [u8; MAX_PACKET_SIZE], - rng: SmallRng, - torrent_peers: TorrentPeerMap, - statistics: SocketWorkerLocalStatistics, -} - -impl Worker { - pub fn run(shared_state: LoadTestState, gamma: Gamma, config: Config, addr: SocketAddr) { - let socket = UdpSocket::from_std(create_socket(&config, addr)); - let buffer = [0u8; MAX_PACKET_SIZE]; - let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); - let torrent_peers = TorrentPeerMap::default(); - let statistics = SocketWorkerLocalStatistics::default(); - - let mut instance = Self { - config, - shared_state, - gamma, - addr, - socket, - buffer, - rng, - torrent_peers, - statistics, - }; - - instance.run_inner(); - } - - fn run_inner(&mut self) { - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(1); - - poll.registry() - .register(&mut self.socket, Token(0), Interest::READABLE) - .unwrap(); - - // Bootstrap request cycle - let initial_request = create_connect_request(generate_transaction_id(&mut self.rng)); - self.send_request(initial_request); - - let timeout = Duration::from_micros(self.config.network.poll_timeout); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - for _ in events.iter() { - while let Ok(amt) = self.socket.recv(&mut self.buffer) { - match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { - Ok(response) => { - if let Some(request) = self.process_response(response) { - self.send_request(request); - } - } - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); - } - } - } - - if self.rng.gen::() <= self.config.requests.additional_request_probability { - let additional_request = - create_connect_request(generate_transaction_id(&mut self.rng)); - - self.send_request(additional_request); - } - - self.update_shared_statistics(); - } - } - } - - fn process_response(&mut self, response: Response) -> Option { - match response { - Response::Connect(r) => { - self.statistics.responses_connect += 1; - - // Fetch the torrent peer or create it if is doesn't exists. Update - // the connection id if fetched. Create a request and move the - // torrent peer appropriately. - - let mut torrent_peer = self - .torrent_peers - .remove(&r.transaction_id) - .unwrap_or_else(|| self.create_torrent_peer(r.connection_id)); - - torrent_peer.connection_id = r.connection_id; - - let new_transaction_id = generate_transaction_id(&mut self.rng); - let request = self.create_random_request(new_transaction_id, &torrent_peer); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - Response::AnnounceIpv4(r) => { - self.statistics.responses_announce += 1; - self.statistics.response_peers += r.peers.len(); - - self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) - } - Response::AnnounceIpv6(r) => { - self.statistics.responses_announce += 1; - self.statistics.response_peers += r.peers.len(); - - self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) - } - Response::Scrape(r) => { - self.statistics.responses_scrape += 1; - - self.if_torrent_peer_move_and_create_random_request(r.transaction_id) - } - Response::Error(r) => { - self.statistics.responses_error += 1; - - if !r.message.to_lowercase().contains("connection") { - eprintln!( - "Received error response which didn't contain the word 'connection': {}", - r.message - ); - } - - if let Some(torrent_peer) = self.torrent_peers.remove(&r.transaction_id) { - let new_transaction_id = generate_transaction_id(&mut self.rng); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(create_connect_request(new_transaction_id)) - } else { - Some(create_connect_request(generate_transaction_id( - &mut self.rng, - ))) - } - } - } - } - - fn if_torrent_peer_move_and_create_random_request( - &mut self, - transaction_id: TransactionId, - ) -> Option { - let torrent_peer = self.torrent_peers.remove(&transaction_id)?; - - let new_transaction_id = generate_transaction_id(&mut self.rng); - - let request = self.create_random_request(new_transaction_id, &torrent_peer); - - self.torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - - fn create_torrent_peer(&mut self, connection_id: ConnectionId) -> TorrentPeer { - let num_scrape_hashes = self - .rng - .gen_range(1..self.config.requests.scrape_max_torrents); - - let scrape_hash_indices = (0..num_scrape_hashes) - .map(|_| self.random_info_hash_index()) - .collect::>() - .into_boxed_slice(); - - let info_hash_index = self.random_info_hash_index(); - - TorrentPeer { - info_hash: self.shared_state.info_hashes[info_hash_index], - scrape_hash_indices, - connection_id, - peer_id: generate_peer_id(), - port: Port::new(self.rng.gen()), - } - } - - fn create_random_request( - &mut self, - transaction_id: TransactionId, - torrent_peer: &TorrentPeer, - ) -> Request { - const ITEMS: [RequestType; 3] = [ - RequestType::Announce, - RequestType::Connect, - RequestType::Scrape, - ]; - - let weights = [ - self.config.requests.weight_announce as u32, - self.config.requests.weight_connect as u32, - self.config.requests.weight_scrape as u32, - ]; - - let dist = WeightedIndex::new(weights).expect("random request weighted index"); - - match ITEMS[dist.sample(&mut self.rng)] { - RequestType::Announce => self.create_announce_request(torrent_peer, transaction_id), - RequestType::Connect => (ConnectRequest { transaction_id }).into(), - RequestType::Scrape => self.create_scrape_request(torrent_peer, transaction_id), - } - } - - fn create_announce_request( - &mut self, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, - ) -> Request { - let (event, bytes_left) = { - if self - .rng - .gen_bool(self.config.requests.peer_seeder_probability) - { - (AnnounceEvent::Completed, NumberOfBytes::new(0)) - } else { - (AnnounceEvent::Started, NumberOfBytes::new(50)) - } - }; - - (AnnounceRequest { - connection_id: torrent_peer.connection_id, - action_placeholder: Default::default(), - transaction_id, - info_hash: torrent_peer.info_hash, - peer_id: torrent_peer.peer_id, - bytes_downloaded: NumberOfBytes::new(50), - bytes_uploaded: NumberOfBytes::new(50), - bytes_left, - event: event.into(), - ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), - port: torrent_peer.port, - }) - .into() - } - - fn create_scrape_request( - &self, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, - ) -> Request { - let indeces = &torrent_peer.scrape_hash_indices; - - let mut scape_hashes = Vec::with_capacity(indeces.len()); - - for i in indeces.iter() { - scape_hashes.push(self.shared_state.info_hashes[*i].to_owned()) - } - - (ScrapeRequest { - connection_id: torrent_peer.connection_id, - transaction_id, - info_hashes: scape_hashes, - }) - .into() - } - - fn random_info_hash_index(&mut self) -> usize { - gamma_usize( - &mut self.rng, - self.gamma, - &self.config.requests.number_of_torrents - 1, - ) - } - - fn send_request(&mut self, request: Request) { - let mut cursor = Cursor::new(self.buffer); - - match request.write_bytes(&mut cursor) { - Ok(()) => { - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match self.socket.send(&inner[..position]) { - Ok(_) => { - self.statistics.requests += 1; - } - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); - } - } - } - Err(err) => { - eprintln!("request_to_bytes err: {}", err); - } - } - } - - fn update_shared_statistics(&mut self) { - self.shared_state - .statistics - .requests - .fetch_add(self.statistics.requests, Ordering::Relaxed); - self.shared_state - .statistics - .responses_connect - .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); - self.shared_state - .statistics - .responses_announce - .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); - self.shared_state - .statistics - .responses_scrape - .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); - self.shared_state - .statistics - .responses_error - .fetch_add(self.statistics.responses_error, Ordering::Relaxed); - self.shared_state - .statistics - .response_peers - .fetch_add(self.statistics.response_peers, Ordering::Relaxed); - - self.statistics = SocketWorkerLocalStatistics::default(); - } -} - -fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer) { - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, err - ); - } - } - - socket - .bind(&addr.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); - - socket - .connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into() -}