udp load tester: open multiple sockets per worker; minor other fixes

This commit is contained in:
Joakim Frostegård 2024-02-05 23:44:34 +01:00
parent 6745eba2de
commit d8bdcfcf0a
4 changed files with 60 additions and 44 deletions

View file

@ -22,4 +22,5 @@ pub struct Peer {
pub announce_info_hash: InfoHash, pub announce_info_hash: InfoHash,
pub announce_port: Port, pub announce_port: Port,
pub scrape_info_hash_indices: Box<[usize]>, pub scrape_info_hash_indices: Box<[usize]>,
pub socket_index: u8,
} }

View file

@ -60,8 +60,8 @@ pub struct NetworkConfig {
/// ///
/// Setting this to true can cause issues on macOS. /// Setting this to true can cause issues on macOS.
pub multiple_client_ipv4s: bool, pub multiple_client_ipv4s: bool,
/// Number of first client port /// Number of sockets to open per worker
pub first_port: u16, pub sockets_per_worker: u8,
/// Size of socket recv buffer. Use 0 for OS default. /// Size of socket recv buffer. Use 0 for OS default.
/// ///
/// This setting can have a big impact on dropped packages. It might /// This setting can have a big impact on dropped packages. It might
@ -81,7 +81,7 @@ impl Default for NetworkConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
multiple_client_ipv4s: true, multiple_client_ipv4s: true,
first_port: 45_000, sockets_per_worker: 4,
recv_buffer: 8_000_000, recv_buffer: 8_000_000,
} }
} }

View file

@ -54,8 +54,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
// Start workers // Start workers
for (i, peers) in (0..config.workers).zip(peers_by_worker) { for (i, peers) in (0..config.workers).zip(peers_by_worker) {
let port = config.network.first_port + (i as u16);
let ip = if config.server_address.is_ipv6() { let ip = if config.server_address.is_ipv6() {
Ipv6Addr::LOCALHOST.into() Ipv6Addr::LOCALHOST.into()
} else if config.network.multiple_client_ipv4s { } else if config.network.multiple_client_ipv4s {
@ -64,7 +62,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
Ipv4Addr::LOCALHOST.into() Ipv4Addr::LOCALHOST.into()
}; };
let addr = SocketAddr::new(ip, port); let addr = SocketAddr::new(ip, 0);
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
@ -230,6 +228,7 @@ fn create_peers(config: &Config, info_hash_dist: &InfoHashDist) -> Vec<Box<[Peer
announce_info_hash, announce_info_hash,
announce_port: Port::new(rng.gen()), announce_port: Port::new(rng.gen()),
scrape_info_hash_indices, scrape_info_hash_indices,
socket_index: rng.gen_range(0..config.network.sockets_per_worker),
} }
}) })
.take(config.requests.number_of_peers) .take(config.requests.number_of_peers)

View file

@ -21,7 +21,7 @@ pub struct Worker {
peers: Box<[Peer]>, peers: Box<[Peer]>,
request_type_dist: RequestTypeDist, request_type_dist: RequestTypeDist,
addr: SocketAddr, addr: SocketAddr,
socket: UdpSocket, sockets: Vec<UdpSocket>,
buffer: [u8; MAX_PACKET_SIZE], buffer: [u8; MAX_PACKET_SIZE],
rng: SmallRng, rng: SmallRng,
statistics: LocalStatistics, statistics: LocalStatistics,
@ -29,7 +29,12 @@ pub struct Worker {
impl Worker { impl Worker {
pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) { pub fn run(config: Config, shared_state: LoadTestState, peers: Box<[Peer]>, addr: SocketAddr) {
let socket = create_socket(&config, addr); let mut sockets = Vec::new();
for _ in 0..config.network.sockets_per_worker {
sockets.push(create_socket(&config, addr));
}
let buffer = [0u8; MAX_PACKET_SIZE]; let buffer = [0u8; MAX_PACKET_SIZE];
let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce); let rng = SmallRng::seed_from_u64(0xc3aa8be617b3acce);
let statistics = LocalStatistics::default(); let statistics = LocalStatistics::default();
@ -41,7 +46,7 @@ impl Worker {
peers, peers,
request_type_dist, request_type_dist,
addr, addr,
socket, sockets,
buffer, buffer,
rng, rng,
statistics, statistics,
@ -56,48 +61,59 @@ impl Worker {
let mut requests_sent = 0usize; let mut requests_sent = 0usize;
let mut responses_received = 0usize; let mut responses_received = 0usize;
let mut connect_socket_index = 0u8;
let mut peer_index = 0usize; let mut peer_index = 0usize;
let mut loop_index = 0usize; let mut loop_index = 0usize;
loop { loop {
let response_ratio = responses_received as f64 / requests_sent.max(1) as f64; let response_ratio = responses_received as f64 / requests_sent.max(1) as f64;
if response_ratio >= 0.95 || requests_sent == 0 || self.rng.gen::<u8>() == 0 { if response_ratio >= 0.90 || requests_sent == 0 || self.rng.gen::<u8>() == 0 {
match self.request_type_dist.sample(&mut self.rng) { for _ in 0..self.sockets.len() {
RequestType::Connect => { match self.request_type_dist.sample(&mut self.rng) {
self.send_connect_request(u32::MAX - 1); RequestType::Connect => {
} self.send_connect_request(connect_socket_index, u32::MAX - 1);
RequestType::Announce => {
self.send_announce_request(connection_id, peer_index);
peer_index = (peer_index + 1) % self.peers.len(); connect_socket_index = connect_socket_index.wrapping_add(1)
} % self.config.network.sockets_per_worker;
RequestType::Scrape => { }
self.send_scrape_request(connection_id, peer_index); RequestType::Announce => {
self.send_announce_request(connection_id, peer_index);
peer_index = (peer_index + 1) % self.peers.len(); peer_index = (peer_index + 1) % self.peers.len();
}
RequestType::Scrape => {
self.send_scrape_request(connection_id, peer_index);
peer_index = (peer_index + 1) % self.peers.len();
}
} }
requests_sent += 1;
} }
requests_sent += 1;
} }
match self.socket.recv(&mut self.buffer[..]) { for socket_index in 0..self.sockets.len() {
Ok(amt) => { // Do this instead of iterating over Vec to fix borrow checker complaint
match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { let socket = self.sockets.get(socket_index).unwrap();
Ok(response) => {
self.handle_response(response);
}
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
}
responses_received += 1; match socket.recv(&mut self.buffer[..]) {
} Ok(amt) => {
Err(err) if err.kind() == ErrorKind::WouldBlock => (), match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) {
Err(err) => { Ok(response) => {
eprintln!("recv error: {:#}", err); self.handle_response(response);
}
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
}
responses_received += 1;
}
Err(err) if err.kind() == ErrorKind::WouldBlock => (),
Err(err) => {
eprintln!("recv error: {:#}", err);
}
} }
} }
@ -111,10 +127,10 @@ impl Worker {
fn aquire_connection_id(&mut self) -> ConnectionId { fn aquire_connection_id(&mut self) -> ConnectionId {
loop { loop {
self.send_connect_request(u32::MAX); self.send_connect_request(0, u32::MAX);
for _ in 0..100 { for _ in 0..100 {
match self.socket.recv(&mut self.buffer[..]) { match self.sockets[0].recv(&mut self.buffer[..]) {
Ok(amt) => { Ok(amt) => {
match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) { match Response::parse_bytes(&self.buffer[0..amt], self.addr.is_ipv4()) {
Ok(Response::Connect(r)) => { Ok(Response::Connect(r)) => {
@ -139,7 +155,7 @@ impl Worker {
} }
} }
fn send_connect_request(&mut self, transaction_id: u32) { fn send_connect_request(&mut self, socket_index: u8, transaction_id: u32) {
let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes())); let transaction_id = TransactionId::new(i32::from_ne_bytes(transaction_id.to_ne_bytes()));
let request = ConnectRequest { transaction_id }; let request = ConnectRequest { transaction_id };
@ -150,7 +166,7 @@ impl Worker {
let position = cursor.position() as usize; let position = cursor.position() as usize;
match self.socket.send(&cursor.get_ref()[..position]) { match self.sockets[socket_index as usize].send(&cursor.get_ref()[..position]) {
Ok(_) => { Ok(_) => {
self.statistics.requests += 1; self.statistics.requests += 1;
} }
@ -199,7 +215,7 @@ impl Worker {
let position = cursor.position() as usize; let position = cursor.position() as usize;
match self.socket.send(&cursor.get_ref()[..position]) { match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) {
Ok(_) => { Ok(_) => {
self.statistics.requests += 1; self.statistics.requests += 1;
} }
@ -233,7 +249,7 @@ impl Worker {
let position = cursor.position() as usize; let position = cursor.position() as usize;
match self.socket.send(&cursor.get_ref()[..position]) { match self.sockets[peer.socket_index as usize].send(&cursor.get_ref()[..position]) {
Ok(_) => { Ok(_) => {
self.statistics.requests += 1; self.statistics.requests += 1;
} }