diff --git a/aquatic_cli_helpers/src/lib.rs b/aquatic_cli_helpers/src/lib.rs index 9414854..ed4d66d 100644 --- a/aquatic_cli_helpers/src/lib.rs +++ b/aquatic_cli_helpers/src/lib.rs @@ -4,7 +4,6 @@ use std::io::Read; use anyhow::Context; use gumdrop::Options; use serde::{Serialize, de::DeserializeOwned}; -use toml; #[derive(Debug, Options)] diff --git a/aquatic_http_protocol/src/request.rs b/aquatic_http_protocol/src/request.rs index bf5e3c3..3007c7d 100644 --- a/aquatic_http_protocol/src/request.rs +++ b/aquatic_http_protocol/src/request.rs @@ -179,7 +179,7 @@ impl Request { for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes){ let segment_end = ampersand_iter.next() - .unwrap_or(query_string.len()); + .unwrap_or_else(|| query_string.len()); let key = query_string.get(position..equal_sign_index) .with_context(|| format!("no key at {}..{}", position, equal_sign_index))?; diff --git a/aquatic_udp/src/bin/aquatic_udp.rs b/aquatic_udp/src/bin/aquatic_udp.rs index 21957d5..001b28e 100644 --- a/aquatic_udp/src/bin/aquatic_udp.rs +++ b/aquatic_udp/src/bin/aquatic_udp.rs @@ -1,7 +1,3 @@ -use aquatic_udp; -use aquatic_cli_helpers; - - #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 1b0c9c6..148852f 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -146,8 +146,8 @@ pub struct State { } -impl State { - pub fn new() -> Self { +impl Default for State { + fn default() -> Self { Self { connections: Arc::new(Mutex::new(HashMap::new())), torrents: Arc::new(Mutex::new(TorrentMaps::default())), diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 089d31a..4c4d2c9 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -16,7 +16,7 @@ use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::new(); + let state = State::default(); let (request_sender, request_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded(); diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index f058c52..3a74708 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -51,29 +51,27 @@ pub fn run_socket_worker( for event in events.iter(){ let token = event.token(); - if token.0 == token_num { - if event.is_readable(){ - read_requests( - &state, - &config, - &mut socket, - &mut buffer, - &mut requests, - &mut local_responses, - ); + if (token.0 == token_num) & event.is_readable(){ + read_requests( + &state, + &config, + &mut socket, + &mut buffer, + &mut requests, + &mut local_responses, + ); - for r in requests.drain(..){ - if let Err(err) = request_sender.send(r){ - eprintln!("error sending to request_sender: {}", err); - } + for r in requests.drain(..){ + if let Err(err) = request_sender.send(r){ + eprintln!("error sending to request_sender: {}", err); } - - state.statistics.readable_events.fetch_add(1, Ordering::SeqCst); - - poll.registry() - .reregister(&mut socket, token, interests) - .unwrap(); } + + state.statistics.readable_events.fetch_add(1, Ordering::SeqCst); + + poll.registry() + .reregister(&mut socket, token, interests) + .unwrap(); } } @@ -102,8 +100,9 @@ fn create_socket(config: &Config) -> ::std::net::UdpSocket { socket.set_nonblocking(true) .expect("socket: set nonblocking"); - socket.bind(&config.network.address.into()) - .expect(&format!("socket: bind to {}", &config.network.address)); + socket.bind(&config.network.address.into()).unwrap_or_else(|err| + panic!("socket: bind to {}: {:?}", config.network.address, err) + ); let recv_buffer_size = config.network.socket_recv_buffer_size; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index b1e9793..c35275e 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -20,7 +20,7 @@ pub fn bench_announce_handler( request_sender: &Sender<(Request, SocketAddr)>, response_receiver: &Receiver<(Response, SocketAddr)>, rng: &mut impl Rng, - info_hashes: &Vec, + info_hashes: &[InfoHash], ) -> (usize, Duration) { let requests = create_requests( state, @@ -58,15 +58,12 @@ pub fn bench_announce_handler( let total = bench_config.num_announce_requests * (round + 1); while num_responses < total { - match response_receiver.recv(){ - Ok((Response::Announce(r), _)) => { - num_responses += 1; + if let Ok((Response::Announce(r), _)) = response_receiver.recv() { + num_responses += 1; - if let Some(last_peer) = r.peers.last(){ - dummy ^= last_peer.port.0; - } - }, - _ => {} + if let Some(last_peer) = r.peers.last(){ + dummy ^= last_peer.port.0; + } } } } @@ -84,7 +81,7 @@ pub fn bench_announce_handler( pub fn create_requests( state: &State, rng: &mut impl Rng, - info_hashes: &Vec, + info_hashes: &[InfoHash], number: usize, ) -> Vec<(AnnounceRequest, SocketAddr)> { let pareto = Pareto::new(1., PARETO_SHAPE).unwrap(); @@ -100,15 +97,11 @@ pub fn create_requests( .cloned() .collect(); - for i in 0..number { + for connection_key in connection_keys.into_iter(){ let info_hash_index = pareto_usize(rng, pareto, max_index); - // Will panic if less connection requests than announce requests - let connection_id = connection_keys[i].connection_id; - let src = connection_keys[i].socket_addr; - let request = AnnounceRequest { - connection_id, + connection_id: connection_key.connection_id, transaction_id: TransactionId(rng.gen()), info_hash: info_hashes[info_hash_index], peer_id: PeerId(rng.gen()), @@ -122,7 +115,7 @@ pub fn create_requests( port: Port(rng.gen()) }; - requests.push((request, src)); + requests.push((request, connection_key.socket_addr)); } requests diff --git a/aquatic_udp_bench/src/connect.rs b/aquatic_udp_bench/src/connect.rs index c6f589b..458d081 100644 --- a/aquatic_udp_bench/src/connect.rs +++ b/aquatic_udp_bench/src/connect.rs @@ -48,12 +48,9 @@ pub fn bench_connect_handler( let total = bench_config.num_connect_requests * (round + 1); while num_responses < total { - match response_receiver.recv(){ - Ok((Response::Connect(r), _)) => { - num_responses += 1; - dummy ^= r.connection_id.0; - }, - _ => {} + if let Ok((Response::Connect(r), _)) = response_receiver.recv(){ + num_responses += 1; + dummy ^= r.connection_id.0; } } } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 0005fa0..0dc0851 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -48,7 +48,7 @@ fn main(){ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers - let state = State::new(); + let state = State::default(); let aquatic_config = Config::default(); let (request_sender, request_receiver) = unbounded(); diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index dffa2e3..17c9813 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -20,7 +20,7 @@ pub fn bench_scrape_handler( request_sender: &Sender<(Request, SocketAddr)>, response_receiver: &Receiver<(Response, SocketAddr)>, rng: &mut impl Rng, - info_hashes: &Vec, + info_hashes: &[InfoHash], ) -> (usize, Duration) { let requests = create_requests( state, @@ -59,15 +59,12 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - match response_receiver.recv(){ - Ok((Response::Scrape(r), _)) => { - num_responses += 1; + if let Ok((Response::Scrape(r), _)) = response_receiver.recv(){ + num_responses += 1; - if let Some(stat) = r.torrent_stats.last(){ - dummy ^= stat.leechers.0; - } - }, - _ => {} + if let Some(stat) = r.torrent_stats.last(){ + dummy ^= stat.leechers.0; + } } } } @@ -86,7 +83,7 @@ pub fn bench_scrape_handler( pub fn create_requests( state: &State, rng: &mut impl Rng, - info_hashes: &Vec, + info_hashes: &[InfoHash], number: usize, hashes_per_request: usize, ) -> Vec<(ScrapeRequest, SocketAddr)> { @@ -103,7 +100,7 @@ pub fn create_requests( let mut requests = Vec::new(); - for i in 0..number { + for connection_key in connection_keys.into_iter(){ let mut request_info_hashes = Vec::new(); for _ in 0..hashes_per_request { @@ -111,17 +108,13 @@ pub fn create_requests( request_info_hashes.push(info_hashes[info_hash_index]) } - // Will panic if less connection requests than scrape requests - let connection_id = connection_keys[i].connection_id; - let src = connection_keys[i].socket_addr; - let request = ScrapeRequest { - connection_id, + connection_id: connection_key.connection_id, transaction_id: TransactionId(rng.gen()), info_hashes: request_info_hashes, }; - requests.push((request, src)); + requests.push((request, connection_key.socket_addr)); } requests diff --git a/aquatic_udp_load_test/src/handler.rs b/aquatic_udp_load_test/src/handler.rs index b0b0845..1756a74 100644 --- a/aquatic_udp_load_test/src/handler.rs +++ b/aquatic_udp_load_test/src/handler.rs @@ -172,7 +172,7 @@ fn process_response( pareto, info_hashes, r.connection_id - ).to_owned() + ) }); let new_transaction_id = generate_transaction_id(rng); @@ -191,22 +191,22 @@ fn process_response( }, Response::Announce(r) => { - return if_torrent_peer_move_and_create_random_request( + if_torrent_peer_move_and_create_random_request( config, rng, info_hashes, torrent_peers, r.transaction_id - ); + ) }, Response::Scrape(r) => { - return if_torrent_peer_move_and_create_random_request( + if_torrent_peer_move_and_create_random_request( config, rng, info_hashes, torrent_peers, r.transaction_id - ); + ) }, Response::Error(r) => { if !r.message.to_lowercase().contains("connection"){ diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 807f00a..a01f9b3 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -122,7 +122,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { generate_transaction_id(&mut thread_rng()) ); - sender.send(request.into()) + sender.send(request) .expect("bootstrap: add initial request to request queue"); } diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index 8fdde03..6ff8087 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -40,7 +40,7 @@ pub fn create_socket( } socket.bind(&addr.into()) - .expect(&format!("socket: bind to {}", addr)); + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err)); socket.connect(&config.server_address.into()) .expect("socket: connect to server"); @@ -80,28 +80,27 @@ pub fn run_socket_thread( .expect("failed polling"); for event in events.iter(){ - if event.token() == token { - if event.is_readable(){ - read_responses( - thread_id, - &socket, - &mut buffer, - &mut local_state, - &mut responses - ); + if (event.token() == token) & event.is_readable(){ + read_responses( + thread_id, + &socket, + &mut buffer, + &mut local_state, + &mut responses + ); - for r in responses.drain(..){ - response_channel_sender.send(r) - .expect(&format!( - "add response to channel in socket worker {}", - thread_id.0 - )); - } - - poll.registry() - .reregister(&mut socket, token, interests) - .unwrap(); + for r in responses.drain(..){ + response_channel_sender.send(r) + .unwrap_or_else(|err| panic!( + "add response to channel in socket worker {}: {:?}", + thread_id.0, + err + )); } + + poll.registry() + .reregister(&mut socket, token, interests) + .unwrap(); } send_requests( diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index c4706e2..9fb2301 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -272,7 +272,9 @@ pub fn send_out_messages( use ::tungstenite::Error::Io; - match established_ws.ws.write_message(out_message.to_ws_message()){ + let ws_message = out_message.into_ws_message(); + + match established_ws.ws.write_message(ws_message){ Ok(()) => { debug!("sent message"); }, diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index f9d400e..3c434ec 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -12,6 +12,8 @@ use crate::config::*; use crate::utils::create_random_request; +// Allow large enum variant WebSocket because it should be very common +#[allow(clippy::large_enum_variant)] pub enum ConnectionState { TcpStream(TcpStream), WebSocket(WebSocket), @@ -321,11 +323,8 @@ pub fn run_socket_thread( } connections.insert(k, connection); - - } else { - if let Some(connection) = connection.advance(config){ - connections.insert(k, connection); - } + } else if let Some(c) = connection.advance(config){ + connections.insert(k, c); } } else { // println!("connection not found for token {}", k); diff --git a/aquatic_ws_protocol/src/lib.rs b/aquatic_ws_protocol/src/lib.rs index 2915647..fec2172 100644 --- a/aquatic_ws_protocol/src/lib.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -271,7 +271,7 @@ pub enum OutMessage { impl OutMessage { #[inline] - pub fn to_ws_message(self) -> tungstenite::Message { + pub fn into_ws_message(self) -> tungstenite::Message { let json = match self { Self::AnnounceResponse(message) => { serde_json::to_string(