diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 0ce9a51..caa26eb 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -84,7 +84,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - run_worker_thread(state, gamma, &config, addr) + Worker::run(state, gamma, config, addr) })?; } diff --git a/crates/udp_load_test/src/worker/mod.rs b/crates/udp_load_test/src/worker/mod.rs index 66c6165..b3be476 100644 --- a/crates/udp_load_test/src/worker/mod.rs +++ b/crates/udp_load_test/src/worker/mod.rs @@ -1,5 +1,3 @@ -mod request_gen; - use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; @@ -8,87 +6,75 @@ use std::time::Duration; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use rand::Rng; use rand::{prelude::SmallRng, SeedableRng}; -use rand_distr::Gamma; +use rand_distr::{Distribution, Gamma, WeightedIndex}; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; use crate::config::Config; use crate::{common::*, utils::*}; -use request_gen::process_response; const MAX_PACKET_SIZE: usize = 8192; -pub fn run_worker_thread( - state: LoadTestState, +pub struct Worker { + config: Config, + shared_state: LoadTestState, gamma: Gamma, - config: &Config, addr: SocketAddr, -) { - let mut socket = UdpSocket::from_std(create_socket(config, addr)); - let mut buffer = [0u8; MAX_PACKET_SIZE]; + socket: UdpSocket, + buffer: [u8; MAX_PACKET_SIZE], + rng: SmallRng, + torrent_peers: TorrentPeerMap, + statistics: SocketWorkerLocalStatistics, +} - let mut rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); - let mut torrent_peers = TorrentPeerMap::default(); +impl Worker { + pub fn run(shared_state: LoadTestState, gamma: Gamma, config: Config, addr: SocketAddr) { + let socket = UdpSocket::from_std(create_socket(&config, addr)); + let buffer = [0u8; MAX_PACKET_SIZE]; + let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); + let torrent_peers = TorrentPeerMap::default(); + let statistics = SocketWorkerLocalStatistics::default(); - let token = Token(0); - let interests = Interest::READABLE; - let timeout = Duration::from_micros(config.network.poll_timeout); + let mut instance = Self { + config, + shared_state, + gamma, + addr, + socket, + buffer, + rng, + torrent_peers, + statistics, + }; - let mut poll = Poll::new().expect("create poll"); + instance.run_inner(); + } - poll.registry() - .register(&mut socket, token, interests) - .unwrap(); + fn run_inner(&mut self) { + let mut poll = Poll::new().expect("create poll"); + let mut events = Events::with_capacity(1); - let mut events = Events::with_capacity(1); + poll.registry() + .register(&mut self.socket, Token(0), Interest::READABLE) + .unwrap(); - let mut statistics = SocketWorkerLocalStatistics::default(); + // Bootstrap request cycle + let initial_request = create_connect_request(generate_transaction_id(&mut self.rng)); + self.send_request(initial_request); - // Bootstrap request cycle - let initial_request = create_connect_request(generate_transaction_id(&mut rng)); - send_request(&mut socket, &mut buffer, &mut statistics, initial_request); + let timeout = Duration::from_micros(self.config.network.poll_timeout); - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); - for event in events.iter() { - if (event.token() == token) & event.is_readable() { - while let Ok(amt) = socket.recv(&mut buffer) { - match Response::from_bytes(&buffer[0..amt], addr.is_ipv4()) { + for _ in events.iter() { + while let Ok(amt) = self.socket.recv(&mut self.buffer) { + match Response::from_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { Ok(response) => { - match response { - Response::AnnounceIpv4(ref r) => { - statistics.responses_announce += 1; - statistics.response_peers += r.peers.len(); - } - Response::AnnounceIpv6(ref r) => { - statistics.responses_announce += 1; - statistics.response_peers += r.peers.len(); - } - Response::Scrape(_) => { - statistics.responses_scrape += 1; - } - Response::Connect(_) => { - statistics.responses_connect += 1; - } - Response::Error(_) => { - statistics.responses_error += 1; - } - } - - let opt_request = process_response( - &mut rng, - gamma, - &state.info_hashes, - &config, - &mut torrent_peers, - response, - ); - - if let Some(request) = opt_request { - send_request(&mut socket, &mut buffer, &mut statistics, request); + if let Some(request) = self.process_response(response) { + self.send_request(request); } } Err(err) => { @@ -97,79 +83,259 @@ pub fn run_worker_thread( } } - if rng.gen::() <= config.requests.additional_request_probability { + if self.rng.gen::() <= self.config.requests.additional_request_probability { let additional_request = - create_connect_request(generate_transaction_id(&mut rng)); + create_connect_request(generate_transaction_id(&mut self.rng)); - send_request( - &mut socket, - &mut buffer, - &mut statistics, - additional_request, + self.send_request(additional_request); + } + + self.update_shared_statistics(); + } + } + } + + fn process_response(&mut self, response: Response) -> Option { + match response { + Response::Connect(r) => { + self.statistics.responses_connect += 1; + + // Fetch the torrent peer or create it if is doesn't exists. Update + // the connection id if fetched. Create a request and move the + // torrent peer appropriately. + + let mut torrent_peer = self + .torrent_peers + .remove(&r.transaction_id) + .unwrap_or_else(|| self.create_torrent_peer(r.connection_id)); + + torrent_peer.connection_id = r.connection_id; + + let new_transaction_id = generate_transaction_id(&mut self.rng); + let request = self.create_random_request(new_transaction_id, &torrent_peer); + + self.torrent_peers.insert(new_transaction_id, torrent_peer); + + Some(request) + } + Response::AnnounceIpv4(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + + self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) + } + Response::AnnounceIpv6(r) => { + self.statistics.responses_announce += 1; + self.statistics.response_peers += r.peers.len(); + + self.if_torrent_peer_move_and_create_random_request(r.fixed.transaction_id) + } + Response::Scrape(r) => { + self.statistics.responses_scrape += 1; + + self.if_torrent_peer_move_and_create_random_request(r.transaction_id) + } + Response::Error(r) => { + self.statistics.responses_error += 1; + + if !r.message.to_lowercase().contains("connection") { + eprintln!( + "Received error response which didn't contain the word 'connection': {}", + r.message ); } - update_shared_statistics(&state, &mut statistics); - } - } - } -} + if let Some(torrent_peer) = self.torrent_peers.remove(&r.transaction_id) { + let new_transaction_id = generate_transaction_id(&mut self.rng); -fn send_request( - socket: &mut UdpSocket, - buffer: &mut [u8], - statistics: &mut SocketWorkerLocalStatistics, - request: Request, -) { - let mut cursor = Cursor::new(buffer); + self.torrent_peers.insert(new_transaction_id, torrent_peer); - match request.write(&mut cursor) { - Ok(()) => { - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match socket.send(&inner[..position]) { - Ok(_) => { - statistics.requests += 1; - } - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); + Some(create_connect_request(new_transaction_id)) + } else { + Some(create_connect_request(generate_transaction_id( + &mut self.rng, + ))) } } } - Err(err) => { - eprintln!("request_to_bytes err: {}", err); + } + + fn if_torrent_peer_move_and_create_random_request( + &mut self, + transaction_id: TransactionId, + ) -> Option { + let torrent_peer = self.torrent_peers.remove(&transaction_id)?; + + let new_transaction_id = generate_transaction_id(&mut self.rng); + + let request = self.create_random_request(new_transaction_id, &torrent_peer); + + self.torrent_peers.insert(new_transaction_id, torrent_peer); + + Some(request) + } + + fn create_torrent_peer(&mut self, connection_id: ConnectionId) -> TorrentPeer { + let num_scrape_hashes = self + .rng + .gen_range(1..self.config.requests.scrape_max_torrents); + + let scrape_hash_indices = (0..num_scrape_hashes) + .map(|_| self.random_info_hash_index()) + .collect::>() + .into_boxed_slice(); + + let info_hash_index = self.random_info_hash_index(); + + TorrentPeer { + info_hash: self.shared_state.info_hashes[info_hash_index], + scrape_hash_indices, + connection_id, + peer_id: generate_peer_id(), + port: Port::new(self.rng.gen()), } } -} -fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorkerLocalStatistics) { - state - .statistics - .requests - .fetch_add(statistics.requests, Ordering::Relaxed); - state - .statistics - .responses_connect - .fetch_add(statistics.responses_connect, Ordering::Relaxed); - state - .statistics - .responses_announce - .fetch_add(statistics.responses_announce, Ordering::Relaxed); - state - .statistics - .responses_scrape - .fetch_add(statistics.responses_scrape, Ordering::Relaxed); - state - .statistics - .responses_error - .fetch_add(statistics.responses_error, Ordering::Relaxed); - state - .statistics - .response_peers - .fetch_add(statistics.response_peers, Ordering::Relaxed); + fn create_random_request( + &mut self, + transaction_id: TransactionId, + torrent_peer: &TorrentPeer, + ) -> Request { + const ITEMS: [RequestType; 3] = [ + RequestType::Announce, + RequestType::Connect, + RequestType::Scrape, + ]; - *statistics = SocketWorkerLocalStatistics::default(); + let weights = [ + self.config.requests.weight_announce as u32, + self.config.requests.weight_connect as u32, + self.config.requests.weight_scrape as u32, + ]; + + let dist = WeightedIndex::new(weights).expect("random request weighted index"); + + match ITEMS[dist.sample(&mut self.rng)] { + RequestType::Announce => self.create_announce_request(torrent_peer, transaction_id), + RequestType::Connect => (ConnectRequest { transaction_id }).into(), + RequestType::Scrape => self.create_scrape_request(torrent_peer, transaction_id), + } + } + + fn create_announce_request( + &mut self, + torrent_peer: &TorrentPeer, + transaction_id: TransactionId, + ) -> Request { + let (event, bytes_left) = { + if self + .rng + .gen_bool(self.config.requests.peer_seeder_probability) + { + (AnnounceEvent::Completed, NumberOfBytes::new(0)) + } else { + (AnnounceEvent::Started, NumberOfBytes::new(50)) + } + }; + + (AnnounceRequest { + connection_id: torrent_peer.connection_id, + action_placeholder: Default::default(), + transaction_id, + info_hash: torrent_peer.info_hash, + peer_id: torrent_peer.peer_id, + bytes_downloaded: NumberOfBytes::new(50), + bytes_uploaded: NumberOfBytes::new(50), + bytes_left, + event: event.into(), + ip_address: Ipv4AddrBytes([0; 4]), + key: PeerKey::new(0), + peers_wanted: NumberOfPeers::new(self.config.requests.announce_peers_wanted), + port: torrent_peer.port, + }) + .into() + } + + fn create_scrape_request( + &self, + torrent_peer: &TorrentPeer, + transaction_id: TransactionId, + ) -> Request { + let indeces = &torrent_peer.scrape_hash_indices; + + let mut scape_hashes = Vec::with_capacity(indeces.len()); + + for i in indeces.iter() { + scape_hashes.push(self.shared_state.info_hashes[*i].to_owned()) + } + + (ScrapeRequest { + connection_id: torrent_peer.connection_id, + transaction_id, + info_hashes: scape_hashes, + }) + .into() + } + + fn random_info_hash_index(&mut self) -> usize { + gamma_usize( + &mut self.rng, + self.gamma, + &self.config.requests.number_of_torrents - 1, + ) + } + + fn send_request(&mut self, request: Request) { + let mut cursor = Cursor::new(self.buffer); + + match request.write(&mut cursor) { + Ok(()) => { + let position = cursor.position() as usize; + let inner = cursor.get_ref(); + + match self.socket.send(&inner[..position]) { + Ok(_) => { + self.statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + Err(err) => { + eprintln!("request_to_bytes err: {}", err); + } + } + } + + fn update_shared_statistics(&mut self) { + self.shared_state + .statistics + .requests + .fetch_add(self.statistics.requests, Ordering::Relaxed); + self.shared_state + .statistics + .responses_connect + .fetch_add(self.statistics.responses_connect, Ordering::Relaxed); + self.shared_state + .statistics + .responses_announce + .fetch_add(self.statistics.responses_announce, Ordering::Relaxed); + self.shared_state + .statistics + .responses_scrape + .fetch_add(self.statistics.responses_scrape, Ordering::Relaxed); + self.shared_state + .statistics + .responses_error + .fetch_add(self.statistics.responses_error, Ordering::Relaxed); + self.shared_state + .statistics + .response_peers + .fetch_add(self.statistics.response_peers, Ordering::Relaxed); + + self.statistics = SocketWorkerLocalStatistics::default(); + } } fn create_socket(config: &Config, addr: SocketAddr) -> ::std::net::UdpSocket { diff --git a/crates/udp_load_test/src/worker/request_gen.rs b/crates/udp_load_test/src/worker/request_gen.rs deleted file mode 100644 index 6ce2355..0000000 --- a/crates/udp_load_test/src/worker/request_gen.rs +++ /dev/null @@ -1,218 +0,0 @@ -use std::sync::Arc; - -use rand::distributions::WeightedIndex; -use rand::prelude::*; -use rand_distr::Gamma; - -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; -use crate::utils::*; - -pub fn process_response( - rng: &mut impl Rng, - gamma: Gamma, - info_hashes: &Arc<[InfoHash]>, - config: &Config, - torrent_peers: &mut TorrentPeerMap, - response: Response, -) -> Option { - match response { - Response::Connect(r) => { - // Fetch the torrent peer or create it if is doesn't exists. Update - // the connection id if fetched. Create a request and move the - // torrent peer appropriately. - - let torrent_peer = torrent_peers - .remove(&r.transaction_id) - .map(|mut torrent_peer| { - torrent_peer.connection_id = r.connection_id; - - torrent_peer - }) - .unwrap_or_else(|| { - create_torrent_peer(config, rng, gamma, info_hashes, r.connection_id) - }); - - let new_transaction_id = generate_transaction_id(rng); - - let request = - create_random_request(config, rng, info_hashes, new_transaction_id, &torrent_peer); - - torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } - Response::AnnounceIpv4(r) => if_torrent_peer_move_and_create_random_request( - config, - rng, - info_hashes, - torrent_peers, - r.fixed.transaction_id, - ), - Response::AnnounceIpv6(r) => if_torrent_peer_move_and_create_random_request( - config, - rng, - info_hashes, - torrent_peers, - r.fixed.transaction_id, - ), - Response::Scrape(r) => if_torrent_peer_move_and_create_random_request( - config, - rng, - info_hashes, - torrent_peers, - r.transaction_id, - ), - Response::Error(r) => { - if !r.message.to_lowercase().contains("connection") { - eprintln!( - "Received error response which didn't contain the word 'connection': {}", - r.message - ); - } - - if let Some(torrent_peer) = torrent_peers.remove(&r.transaction_id) { - let new_transaction_id = generate_transaction_id(rng); - - torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(create_connect_request(new_transaction_id)) - } else { - Some(create_connect_request(generate_transaction_id(rng))) - } - } - } -} - -fn if_torrent_peer_move_and_create_random_request( - config: &Config, - rng: &mut impl Rng, - info_hashes: &Arc<[InfoHash]>, - torrent_peers: &mut TorrentPeerMap, - transaction_id: TransactionId, -) -> Option { - if let Some(torrent_peer) = torrent_peers.remove(&transaction_id) { - let new_transaction_id = generate_transaction_id(rng); - - let request = - create_random_request(config, rng, info_hashes, new_transaction_id, &torrent_peer); - - torrent_peers.insert(new_transaction_id, torrent_peer); - - Some(request) - } else { - None - } -} - -fn create_random_request( - config: &Config, - rng: &mut impl Rng, - info_hashes: &Arc<[InfoHash]>, - transaction_id: TransactionId, - torrent_peer: &TorrentPeer, -) -> Request { - const ITEMS: [RequestType; 3] = [ - RequestType::Announce, - RequestType::Connect, - RequestType::Scrape, - ]; - - let weights = [ - config.requests.weight_announce as u32, - config.requests.weight_connect as u32, - config.requests.weight_scrape as u32, - ]; - - let dist = WeightedIndex::new(weights).expect("random request weighted index"); - - match ITEMS[dist.sample(rng)] { - RequestType::Announce => create_announce_request(config, rng, torrent_peer, transaction_id), - RequestType::Connect => create_connect_request(transaction_id), - RequestType::Scrape => create_scrape_request(&info_hashes, torrent_peer, transaction_id), - } -} - -fn create_announce_request( - config: &Config, - rng: &mut impl Rng, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, -) -> Request { - let (event, bytes_left) = { - if rng.gen_bool(config.requests.peer_seeder_probability) { - (AnnounceEvent::Completed, NumberOfBytes::new(0)) - } else { - (AnnounceEvent::Started, NumberOfBytes::new(50)) - } - }; - - (AnnounceRequest { - connection_id: torrent_peer.connection_id, - action_placeholder: Default::default(), - transaction_id, - info_hash: torrent_peer.info_hash, - peer_id: torrent_peer.peer_id, - bytes_downloaded: NumberOfBytes::new(50), - bytes_uploaded: NumberOfBytes::new(50), - bytes_left, - event: event.into(), - ip_address: Ipv4AddrBytes([0; 4]), - key: PeerKey::new(0), - peers_wanted: NumberOfPeers::new(config.requests.announce_peers_wanted), - port: torrent_peer.port, - }) - .into() -} - -fn create_scrape_request( - info_hashes: &Arc<[InfoHash]>, - torrent_peer: &TorrentPeer, - transaction_id: TransactionId, -) -> Request { - let indeces = &torrent_peer.scrape_hash_indices; - - let mut scape_hashes = Vec::with_capacity(indeces.len()); - - for i in indeces.iter() { - scape_hashes.push(info_hashes[*i].to_owned()) - } - - (ScrapeRequest { - connection_id: torrent_peer.connection_id, - transaction_id, - info_hashes: scape_hashes, - }) - .into() -} - -fn create_torrent_peer( - config: &Config, - rng: &mut impl Rng, - gamma: Gamma, - info_hashes: &Arc<[InfoHash]>, - connection_id: ConnectionId, -) -> TorrentPeer { - let num_scrape_hashes = rng.gen_range(1..config.requests.scrape_max_torrents); - - let scrape_hash_indices = (0..num_scrape_hashes) - .map(|_| select_info_hash_index(config, rng, gamma)) - .collect::>() - .into_boxed_slice(); - - let info_hash_index = select_info_hash_index(config, rng, gamma); - - TorrentPeer { - info_hash: info_hashes[info_hash_index], - scrape_hash_indices, - connection_id, - peer_id: generate_peer_id(), - port: Port::new(rng.gen()), - } -} - -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, gamma: Gamma) -> usize { - gamma_usize(rng, gamma, config.requests.number_of_torrents - 1) -}