diff --git a/aquatic_udp_load_test/src/common.rs b/aquatic_udp_load_test/src/common.rs index 85474fd..fe81667 100644 --- a/aquatic_udp_load_test/src/common.rs +++ b/aquatic_udp_load_test/src/common.rs @@ -85,10 +85,6 @@ pub struct HandlerConfig { pub torrent_selection_pareto_shape: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, - /// Part of additional request creation calculation, meaning requests - /// which are not dependent on previous responses from server. Higher - /// means more. - pub additional_request_factor: f64, } impl Default for Config { @@ -125,9 +121,8 @@ impl Default for HandlerConfig { peer_seeder_probability: 0.25, scrape_max_torrents: 50, weight_connect: 0, - weight_announce: 1, + weight_announce: 5, weight_scrape: 1, - additional_request_factor: 0.4, torrent_selection_pareto_shape: 2.0, } } @@ -158,7 +153,6 @@ pub struct Statistics { pub struct LoadTestState { pub info_hashes: Arc>, pub statistics: Arc, - pub responses: Arc, } #[derive(PartialEq, Eq, Clone, Copy)] diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 50468c8..9bd4dc4 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -51,7 +51,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = LoadTestState { info_hashes: Arc::new(info_hashes), statistics: Arc::new(Statistics::default()), - responses: Default::default(), }; let pareto = Pareto::new(1.0, config.handler.torrent_selection_pareto_shape).unwrap(); diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index 8b816e7..a74e28c 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -1,7 +1,7 @@ +use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::time::Duration; -use std::{io::Cursor, vec::Drain}; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use rand::{prelude::SmallRng, thread_rng, SeedableRng}; @@ -71,14 +71,11 @@ pub fn run_worker_thread( let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut local_state = SocketWorkerLocalStatistics::default(); - let mut responses = Vec::new(); - let mut requests = Vec::new(); + let mut statistics = SocketWorkerLocalStatistics::default(); - // Bootstrap request cycle by adding a request - requests.push(create_connect_request(generate_transaction_id( - &mut thread_rng(), - ))); + // Bootstrap request cycle + let initial_request = create_connect_request(generate_transaction_id(&mut thread_rng())); + send_request(&mut socket, &mut buffer, &mut statistics, initial_request); loop { poll.poll(&mut events, Some(timeout)) @@ -86,121 +83,87 @@ pub fn run_worker_thread( for event in events.iter() { if (event.token() == token) & event.is_readable() { - read_responses(&socket, &mut buffer, &mut local_state, &mut responses); - } - } + while let Ok(amt) = socket.recv(&mut buffer) { + match Response::from_bytes(&buffer[0..amt]) { + Ok(response) => { + match response { + Response::AnnounceIpv4(ref r) => { + statistics.responses_announce += 1; + statistics.response_peers += r.peers.len(); + } + Response::AnnounceIpv6(ref r) => { + statistics.responses_announce += 1; + statistics.response_peers += r.peers.len(); + } + Response::Scrape(_) => { + statistics.responses_scrape += 1; + } + Response::Connect(_) => { + statistics.responses_connect += 1; + } + Response::Error(_) => { + statistics.responses_error += 1; + } + } - let total_responses = responses.len() - + if thread_id.0 == 0 { - state.responses.fetch_and(0, Ordering::SeqCst) - } else { - state.responses.fetch_add(responses.len(), Ordering::SeqCst) - }; + let opt_request = process_response( + &mut rng, + pareto, + &state.info_hashes, + &config, + &mut torrent_peers, + response, + ); - // Somewhat dubious heuristic for deciding how fast to create - // and send additional requests - let num_additional_to_send = { - let n = total_responses as f64 / (config.workers as f64 * 4.0); - - (n * config.handler.additional_request_factor) as usize + 10 - }; - - for _ in 0..num_additional_to_send { - requests.push(create_connect_request(generate_transaction_id(&mut rng))); - } - - for response in responses.drain(..) { - let opt_request = process_response( - &mut rng, - pareto, - &state.info_hashes, - &config, - &mut torrent_peers, - response, - ); - - if let Some(new_request) = opt_request { - requests.push(new_request); - } - } - - send_requests( - &state, - &mut socket, - &mut buffer, - &mut local_state, - requests.drain(..), - ); - } -} - -fn read_responses( - socket: &UdpSocket, - buffer: &mut [u8], - ls: &mut SocketWorkerLocalStatistics, - responses: &mut Vec, -) { - while let Ok(amt) = socket.recv(buffer) { - match Response::from_bytes(&buffer[0..amt]) { - Ok(response) => { - match response { - Response::AnnounceIpv4(ref r) => { - ls.responses_announce += 1; - ls.response_peers += r.peers.len(); - } - Response::AnnounceIpv6(ref r) => { - ls.responses_announce += 1; - ls.response_peers += r.peers.len(); - } - Response::Scrape(_) => { - ls.responses_scrape += 1; - } - Response::Connect(_) => { - ls.responses_connect += 1; - } - Response::Error(_) => { - ls.responses_error += 1; + if let Some(request) = opt_request { + send_request(&mut socket, &mut buffer, &mut statistics, request); + } + } + Err(err) => { + eprintln!("Received invalid response: {:#?}", err); + } } } - responses.push(response) - } - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); + let additional_request = create_connect_request(generate_transaction_id(&mut rng)); + + send_request(&mut socket, &mut buffer, &mut statistics, additional_request); + + update_shared_statistics(&state, &mut statistics); } } } } -fn send_requests( - state: &LoadTestState, +fn send_request( socket: &mut UdpSocket, buffer: &mut [u8], statistics: &mut SocketWorkerLocalStatistics, - requests: Drain, + request: Request, ) { let mut cursor = Cursor::new(buffer); - for request in requests { - cursor.set_position(0); + match request.write(&mut cursor) { + Ok(()) => { + let position = cursor.position() as usize; + let inner = cursor.get_ref(); - if let Err(err) = request.write(&mut cursor) { + match socket.send(&inner[..position]) { + Ok(_) => { + statistics.requests += 1; + } + Err(err) => { + eprintln!("Couldn't send packet: {:?}", err); + } + } + } + Err(err) => { eprintln!("request_to_bytes err: {}", err); } - - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match socket.send(&inner[..position]) { - Ok(_) => { - statistics.requests += 1; - } - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); - } - } } +} +fn update_shared_statistics(state: &LoadTestState, statistics: &mut SocketWorkerLocalStatistics) { state .statistics .requests