diff --git a/aquatic_udp/src/lib/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs index 22263b3..72c0e28 100644 --- a/aquatic_udp/src/lib/network_uring.rs +++ b/aquatic_udp/src/lib/network_uring.rs @@ -316,8 +316,7 @@ pub fn run_socket_worker( let num_to_queue = (space_in_send_queue).min(local_responses.len()); let drain_from_index = local_responses.len() - num_to_queue; - for (response, addr) in local_responses.drain(drain_from_index..) - { + for (response, addr) in local_responses.drain(drain_from_index..) { queue_response( &config, &mut sq, diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index 79fc3e5..8b816e7 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -1,16 +1,16 @@ -use std::{io::Cursor, vec::Drain}; use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::time::Duration; +use std::{io::Cursor, vec::Drain}; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; -use rand::{SeedableRng, prelude::SmallRng, thread_rng}; +use rand::{prelude::SmallRng, thread_rng, SeedableRng}; use rand_distr::Pareto; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; -use crate::{common::*, handler::{process_response}, utils::*}; +use crate::{common::*, handler::process_response, utils::*}; const MAX_PACKET_SIZE: usize = 4096; @@ -76,7 +76,9 @@ pub fn run_worker_thread( let mut requests = Vec::new(); // Bootstrap request cycle by adding a request - requests.push(create_connect_request(generate_transaction_id(&mut thread_rng()))); + requests.push(create_connect_request(generate_transaction_id( + &mut thread_rng(), + ))); loop { poll.poll(&mut events, Some(timeout)) @@ -84,20 +86,16 @@ pub fn run_worker_thread( for event in events.iter() { if (event.token() == token) & event.is_readable() { - read_responses( - &socket, - &mut buffer, - &mut local_state, - &mut responses, - ); + read_responses(&socket, &mut buffer, &mut local_state, &mut responses); } } - let total_responses = responses.len() + if thread_id.0 == 0 { - state.responses.fetch_and(0, Ordering::SeqCst) - } else { - state.responses.fetch_add(responses.len(), Ordering::SeqCst) - }; + let total_responses = responses.len() + + if thread_id.0 == 0 { + state.responses.fetch_and(0, Ordering::SeqCst) + } else { + state.responses.fetch_add(responses.len(), Ordering::SeqCst) + }; // Somewhat dubious heuristic for deciding how fast to create // and send additional requests @@ -112,8 +110,14 @@ pub fn run_worker_thread( } for response in responses.drain(..) { - let opt_request = - process_response(&mut rng, pareto, &state.info_hashes, &config, &mut torrent_peers, response); + let opt_request = process_response( + &mut rng, + pareto, + &state.info_hashes, + &config, + &mut torrent_peers, + response, + ); if let Some(new_request) = opt_request { requests.push(new_request);