diff --git a/crates/udp_load_test/src/common.rs b/crates/udp_load_test/src/common.rs index 7470830..7a17374 100644 --- a/crates/udp_load_test/src/common.rs +++ b/crates/udp_load_test/src/common.rs @@ -22,4 +22,5 @@ pub struct Peer { pub announce_info_hash: InfoHash, pub announce_port: Port, pub scrape_info_hash_indices: Box<[usize]>, + pub socket_index: u8, } diff --git a/crates/udp_load_test/src/config.rs b/crates/udp_load_test/src/config.rs index 0f0b701..7513d41 100644 --- a/crates/udp_load_test/src/config.rs +++ b/crates/udp_load_test/src/config.rs @@ -60,8 +60,8 @@ pub struct NetworkConfig { /// /// Setting this to true can cause issues on macOS. pub multiple_client_ipv4s: bool, - /// Number of first client port - pub first_port: u16, + /// Number of sockets to open per worker + pub sockets_per_worker: u8, /// Size of socket recv buffer. Use 0 for OS default. /// /// This setting can have a big impact on dropped packages. It might @@ -81,7 +81,7 @@ impl Default for NetworkConfig { fn default() -> Self { Self { multiple_client_ipv4s: true, - first_port: 45_000, + sockets_per_worker: 4, recv_buffer: 8_000_000, } } diff --git a/crates/udp_load_test/src/lib.rs b/crates/udp_load_test/src/lib.rs index 61dfbd5..ee6b658 100644 --- a/crates/udp_load_test/src/lib.rs +++ b/crates/udp_load_test/src/lib.rs @@ -54,8 +54,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { // Start workers for (i, peers) in (0..config.workers).zip(peers_by_worker) { - let port = config.network.first_port + (i as u16); - let ip = if config.server_address.is_ipv6() { Ipv6Addr::LOCALHOST.into() } else if config.network.multiple_client_ipv4s { @@ -64,7 +62,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { Ipv4Addr::LOCALHOST.into() }; - let addr = SocketAddr::new(ip, port); + let addr = SocketAddr::new(ip, 0); let config = config.clone(); let state = state.clone(); @@ -230,6 +228,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec, request_type_dist: RequestTypeDist, addr: SocketAddr, - socket: UdpSocket, + sockets: Vec, buffer: [u8; MAX_PACKET_SIZE], rng: SmallRng, statistics: LocalStatistics, @@ -29,7 +29,12 @@ pub struct Worker { impl Worker { pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { - let socket = create_socket(&config, addr); + let mut sockets = Vec::new(); + + for _ in 0..config.network.sockets_per_worker { + sockets.push(create_socket(&config, addr)); + } + let buffer = [0u8; MAX_PACKET_SIZE]; let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); let statistics = LocalStatistics::default(); @@ -41,7 +46,7 @@ impl Worker { peers, request_type_dist, addr, - socket, + sockets, buffer, rng, statistics, @@ -56,48 +61,59 @@ impl Worker { let mut requests_sent = 0usize; let mut responses_received = 0usize; + let mut connect_socket_index = 0u8; let mut peer_index = 0usize; let mut loop_index = 0usize; loop { let response_ratio = responses_received as f64 / requests_sent.max(1) as f64; - if response_ratio >= 0.95 || requests_sent == 0 || self.rng.gen::() == 0 { - match self.request_type_dist.sample(&mut self.rng) { - RequestType::Connect => { - self.send_connect_request(u32::MAX - 1); - } - RequestType::Announce => { - self.send_announce_request(connection_id, peer_index); + if response_ratio >= 0.90 || requests_sent == 0 || self.rng.gen::() == 0 { + for _ in 0..self.sockets.len() { + match self.request_type_dist.sample(&mut self.rng) { + RequestType::Connect => { + self.send_connect_request(connect_socket_index, u32::MAX - 1); - peer_index = (peer_index + 1) % self.peers.len(); - } - RequestType::Scrape => { - self.send_scrape_request(connection_id, peer_index); + connect_socket_index = connect_socket_index.wrapping_add(1) + % self.config.network.sockets_per_worker; + } + RequestType::Announce => { + self.send_announce_request(connection_id, peer_index); - peer_index = (peer_index + 1) % self.peers.len(); + peer_index = (peer_index + 1) % self.peers.len(); + } + RequestType::Scrape => { + self.send_scrape_request(connection_id, peer_index); + + peer_index = (peer_index + 1) % self.peers.len(); + } } + + requests_sent += 1; } - - requests_sent += 1; } - match self.socket.recv(&mut self.buffer[..]) { - Ok(amt) => { - match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { - Ok(response) => { - self.handle_response(response); - } - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); - } - } + for socket_index in 0..self.sockets.len() { + // Do this instead of iterating over Vec to fix borrow checker complaint + let socket = self.sockets.get(socket_index).unwrap(); - responses_received += 1; - } - Err(err) if err.kind() == ErrorKind::WouldBlock => (), - Err(err) => { - eprintln!("recv error: {:#}", err); + match socket.recv(&mut self.buffer[..]) { + Ok(amt) => { + match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { + Ok(response) => { + self.handle_response(response); + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } + } + + responses_received += 1; + } + Err(err) if err.kind() == ErrorKind::WouldBlock => (), + Err(err) => { + eprintln!("recv error: {:#}", err); + } } } @@ -111,10 +127,10 @@ impl Worker { fn aquire_connection_id(&mut self) -> ConnectionId { loop { - self.send_connect_request(u32::MAX); + self.send_connect_request(0, u32::MAX); for _ in 0..100 { - match self.socket.recv(&mut self.buffer[..]) { + match self.sockets[0].recv(&mut self.buffer[..]) { Ok(amt) => { match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { Ok(Response::Connect(r)) => { @@ -139,7 +155,7 @@ impl Worker { } } - fn send_connect_request(&mut self, transaction_id: u32) { + fn send_connect_request(&mut self, socket_index: u8, transaction_id: u32) { let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes())); let request = ConnectRequest { transaction_id }; @@ -150,7 +166,7 @@ impl Worker { let position = cursor.position() as usize; - match self.socket.send(&cursor.get_ref()[..position]) { + match self.sockets[socket_index as usize].send(&cursor.get_ref()[..position]) { Ok(_) => { self.statistics.requests += 1; } @@ -199,7 +215,7 @@ impl Worker { let position = cursor.position() as usize; - match self.socket.send(&cursor.get_ref()[..position]) { + match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) { Ok(_) => { self.statistics.requests += 1; } @@ -233,7 +249,7 @@ impl Worker { let position = cursor.position() as usize; - match self.socket.send(&cursor.get_ref()[..position]) { + match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) { Ok(_) => { self.statistics.requests += 1; }