fix more clippy warnings

This commit is contained in:
Joakim Frostegård 2020-08-02 00:36:56 +02:00
parent aabdf76a5d
commit 561cc3db55
16 changed files with 83 additions and 106 deletions

View file

@ -4,7 +4,6 @@ use std::io::Read;
use anyhow::Context; use anyhow::Context;
use gumdrop::Options; use gumdrop::Options;
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use toml;
#[derive(Debug, Options)] #[derive(Debug, Options)]

View file

@ -179,7 +179,7 @@ impl Request {
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes){ for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes){
let segment_end = ampersand_iter.next() let segment_end = ampersand_iter.next()
.unwrap_or(query_string.len()); .unwrap_or_else(|| query_string.len());
let key = query_string.get(position..equal_sign_index) let key = query_string.get(position..equal_sign_index)
.with_context(|| format!("no key at {}..{}", position, equal_sign_index))?; .with_context(|| format!("no key at {}..{}", position, equal_sign_index))?;

View file

@ -1,7 +1,3 @@
use aquatic_udp;
use aquatic_cli_helpers;
#[global_allocator] #[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

View file

@ -146,8 +146,8 @@ pub struct State {
} }
impl State { impl Default for State {
pub fn new() -> Self { fn default() -> Self {
Self { Self {
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
torrents: Arc::new(Mutex::new(TorrentMaps::default())), torrents: Arc::new(Mutex::new(TorrentMaps::default())),

View file

@ -16,7 +16,7 @@ use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::new(); let state = State::default();
let (request_sender, request_receiver) = unbounded(); let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded();

View file

@ -51,29 +51,27 @@ pub fn run_socket_worker(
for event in events.iter(){ for event in events.iter(){
let token = event.token(); let token = event.token();
if token.0 == token_num { if (token.0 == token_num) & event.is_readable(){
if event.is_readable(){ read_requests(
read_requests( &state,
&state, &config,
&config, &mut socket,
&mut socket, &mut buffer,
&mut buffer, &mut requests,
&mut requests, &mut local_responses,
&mut local_responses, );
);
for r in requests.drain(..){ for r in requests.drain(..){
if let Err(err) = request_sender.send(r){ if let Err(err) = request_sender.send(r){
eprintln!("error sending to request_sender: {}", err); eprintln!("error sending to request_sender: {}", err);
}
} }
state.statistics.readable_events.fetch_add(1, Ordering::SeqCst);
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
} }
state.statistics.readable_events.fetch_add(1, Ordering::SeqCst);
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
} }
} }
@ -102,8 +100,9 @@ fn create_socket(config: &Config) -> ::std::net::UdpSocket {
socket.set_nonblocking(true) socket.set_nonblocking(true)
.expect("socket: set nonblocking"); .expect("socket: set nonblocking");
socket.bind(&config.network.address.into()) socket.bind(&config.network.address.into()).unwrap_or_else(|err|
.expect(&format!("socket: bind to {}", &config.network.address)); panic!("socket: bind to {}: {:?}", config.network.address, err)
);
let recv_buffer_size = config.network.socket_recv_buffer_size; let recv_buffer_size = config.network.socket_recv_buffer_size;

View file

@ -20,7 +20,7 @@ pub fn bench_announce_handler(
request_sender: &Sender<(Request, SocketAddr)>, request_sender: &Sender<(Request, SocketAddr)>,
response_receiver: &Receiver<(Response, SocketAddr)>, response_receiver: &Receiver<(Response, SocketAddr)>,
rng: &mut impl Rng, rng: &mut impl Rng,
info_hashes: &Vec<InfoHash>, info_hashes: &[InfoHash],
) -> (usize, Duration) { ) -> (usize, Duration) {
let requests = create_requests( let requests = create_requests(
state, state,
@ -58,15 +58,12 @@ pub fn bench_announce_handler(
let total = bench_config.num_announce_requests * (round + 1); let total = bench_config.num_announce_requests * (round + 1);
while num_responses < total { while num_responses < total {
match response_receiver.recv(){ if let Ok((Response::Announce(r), _)) = response_receiver.recv() {
Ok((Response::Announce(r), _)) => { num_responses += 1;
num_responses += 1;
if let Some(last_peer) = r.peers.last(){ if let Some(last_peer) = r.peers.last(){
dummy ^= last_peer.port.0; dummy ^= last_peer.port.0;
} }
},
_ => {}
} }
} }
} }
@ -84,7 +81,7 @@ pub fn bench_announce_handler(
pub fn create_requests( pub fn create_requests(
state: &State, state: &State,
rng: &mut impl Rng, rng: &mut impl Rng,
info_hashes: &Vec<InfoHash>, info_hashes: &[InfoHash],
number: usize, number: usize,
) -> Vec<(AnnounceRequest, SocketAddr)> { ) -> Vec<(AnnounceRequest, SocketAddr)> {
let pareto = Pareto::new(1., PARETO_SHAPE).unwrap(); let pareto = Pareto::new(1., PARETO_SHAPE).unwrap();
@ -100,15 +97,11 @@ pub fn create_requests(
.cloned() .cloned()
.collect(); .collect();
for i in 0..number { for connection_key in connection_keys.into_iter(){
let info_hash_index = pareto_usize(rng, pareto, max_index); let info_hash_index = pareto_usize(rng, pareto, max_index);
// Will panic if less connection requests than announce requests
let connection_id = connection_keys[i].connection_id;
let src = connection_keys[i].socket_addr;
let request = AnnounceRequest { let request = AnnounceRequest {
connection_id, connection_id: connection_key.connection_id,
transaction_id: TransactionId(rng.gen()), transaction_id: TransactionId(rng.gen()),
info_hash: info_hashes[info_hash_index], info_hash: info_hashes[info_hash_index],
peer_id: PeerId(rng.gen()), peer_id: PeerId(rng.gen()),
@ -122,7 +115,7 @@ pub fn create_requests(
port: Port(rng.gen()) port: Port(rng.gen())
}; };
requests.push((request, src)); requests.push((request, connection_key.socket_addr));
} }
requests requests

View file

@ -48,12 +48,9 @@ pub fn bench_connect_handler(
let total = bench_config.num_connect_requests * (round + 1); let total = bench_config.num_connect_requests * (round + 1);
while num_responses < total { while num_responses < total {
match response_receiver.recv(){ if let Ok((Response::Connect(r), _)) = response_receiver.recv(){
Ok((Response::Connect(r), _)) => { num_responses += 1;
num_responses += 1; dummy ^= r.connection_id.0;
dummy ^= r.connection_id.0;
},
_ => {}
} }
} }
} }

View file

@ -48,7 +48,7 @@ fn main(){
pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
// Setup common state, spawn request handlers // Setup common state, spawn request handlers
let state = State::new(); let state = State::default();
let aquatic_config = Config::default(); let aquatic_config = Config::default();
let (request_sender, request_receiver) = unbounded(); let (request_sender, request_receiver) = unbounded();

View file

@ -20,7 +20,7 @@ pub fn bench_scrape_handler(
request_sender: &Sender<(Request, SocketAddr)>, request_sender: &Sender<(Request, SocketAddr)>,
response_receiver: &Receiver<(Response, SocketAddr)>, response_receiver: &Receiver<(Response, SocketAddr)>,
rng: &mut impl Rng, rng: &mut impl Rng,
info_hashes: &Vec<InfoHash>, info_hashes: &[InfoHash],
) -> (usize, Duration) { ) -> (usize, Duration) {
let requests = create_requests( let requests = create_requests(
state, state,
@ -59,15 +59,12 @@ pub fn bench_scrape_handler(
let total = bench_config.num_scrape_requests * (round + 1); let total = bench_config.num_scrape_requests * (round + 1);
while num_responses < total { while num_responses < total {
match response_receiver.recv(){ if let Ok((Response::Scrape(r), _)) = response_receiver.recv(){
Ok((Response::Scrape(r), _)) => { num_responses += 1;
num_responses += 1;
if let Some(stat) = r.torrent_stats.last(){ if let Some(stat) = r.torrent_stats.last(){
dummy ^= stat.leechers.0; dummy ^= stat.leechers.0;
} }
},
_ => {}
} }
} }
} }
@ -86,7 +83,7 @@ pub fn bench_scrape_handler(
pub fn create_requests( pub fn create_requests(
state: &State, state: &State,
rng: &mut impl Rng, rng: &mut impl Rng,
info_hashes: &Vec<InfoHash>, info_hashes: &[InfoHash],
number: usize, number: usize,
hashes_per_request: usize, hashes_per_request: usize,
) -> Vec<(ScrapeRequest, SocketAddr)> { ) -> Vec<(ScrapeRequest, SocketAddr)> {
@ -103,7 +100,7 @@ pub fn create_requests(
let mut requests = Vec::new(); let mut requests = Vec::new();
for i in 0..number { for connection_key in connection_keys.into_iter(){
let mut request_info_hashes = Vec::new(); let mut request_info_hashes = Vec::new();
for _ in 0..hashes_per_request { for _ in 0..hashes_per_request {
@ -111,17 +108,13 @@ pub fn create_requests(
request_info_hashes.push(info_hashes[info_hash_index]) request_info_hashes.push(info_hashes[info_hash_index])
} }
// Will panic if less connection requests than scrape requests
let connection_id = connection_keys[i].connection_id;
let src = connection_keys[i].socket_addr;
let request = ScrapeRequest { let request = ScrapeRequest {
connection_id, connection_id: connection_key.connection_id,
transaction_id: TransactionId(rng.gen()), transaction_id: TransactionId(rng.gen()),
info_hashes: request_info_hashes, info_hashes: request_info_hashes,
}; };
requests.push((request, src)); requests.push((request, connection_key.socket_addr));
} }
requests requests

View file

@ -172,7 +172,7 @@ fn process_response(
pareto, pareto,
info_hashes, info_hashes,
r.connection_id r.connection_id
).to_owned() )
}); });
let new_transaction_id = generate_transaction_id(rng); let new_transaction_id = generate_transaction_id(rng);
@ -191,22 +191,22 @@ fn process_response(
}, },
Response::Announce(r) => { Response::Announce(r) => {
return if_torrent_peer_move_and_create_random_request( if_torrent_peer_move_and_create_random_request(
config, config,
rng, rng,
info_hashes, info_hashes,
torrent_peers, torrent_peers,
r.transaction_id r.transaction_id
); )
}, },
Response::Scrape(r) => { Response::Scrape(r) => {
return if_torrent_peer_move_and_create_random_request( if_torrent_peer_move_and_create_random_request(
config, config,
rng, rng,
info_hashes, info_hashes,
torrent_peers, torrent_peers,
r.transaction_id r.transaction_id
); )
}, },
Response::Error(r) => { Response::Error(r) => {
if !r.message.to_lowercase().contains("connection"){ if !r.message.to_lowercase().contains("connection"){

View file

@ -122,7 +122,7 @@ fn run(config: Config) -> ::anyhow::Result<()> {
generate_transaction_id(&mut thread_rng()) generate_transaction_id(&mut thread_rng())
); );
sender.send(request.into()) sender.send(request)
.expect("bootstrap: add initial request to request queue"); .expect("bootstrap: add initial request to request queue");
} }

View file

@ -40,7 +40,7 @@ pub fn create_socket(
} }
socket.bind(&addr.into()) socket.bind(&addr.into())
.expect(&format!("socket: bind to {}", addr)); .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", addr, err));
socket.connect(&config.server_address.into()) socket.connect(&config.server_address.into())
.expect("socket: connect to server"); .expect("socket: connect to server");
@ -80,28 +80,27 @@ pub fn run_socket_thread(
.expect("failed polling"); .expect("failed polling");
for event in events.iter(){ for event in events.iter(){
if event.token() == token { if (event.token() == token) & event.is_readable(){
if event.is_readable(){ read_responses(
read_responses( thread_id,
thread_id, &socket,
&socket, &mut buffer,
&mut buffer, &mut local_state,
&mut local_state, &mut responses
&mut responses );
);
for r in responses.drain(..){ for r in responses.drain(..){
response_channel_sender.send(r) response_channel_sender.send(r)
.expect(&format!( .unwrap_or_else(|err| panic!(
"add response to channel in socket worker {}", "add response to channel in socket worker {}: {:?}",
thread_id.0 thread_id.0,
)); err
} ));
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
} }
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
} }
send_requests( send_requests(

View file

@ -272,7 +272,9 @@ pub fn send_out_messages(
use ::tungstenite::Error::Io; use ::tungstenite::Error::Io;
match established_ws.ws.write_message(out_message.to_ws_message()){ let ws_message = out_message.into_ws_message();
match established_ws.ws.write_message(ws_message){
Ok(()) => { Ok(()) => {
debug!("sent message"); debug!("sent message");
}, },

View file

@ -12,6 +12,8 @@ use crate::config::*;
use crate::utils::create_random_request; use crate::utils::create_random_request;
// Allow large enum variant WebSocket because it should be very common
#[allow(clippy::large_enum_variant)]
pub enum ConnectionState { pub enum ConnectionState {
TcpStream(TcpStream), TcpStream(TcpStream),
WebSocket(WebSocket<TcpStream>), WebSocket(WebSocket<TcpStream>),
@ -321,11 +323,8 @@ pub fn run_socket_thread(
} }
connections.insert(k, connection); connections.insert(k, connection);
} else if let Some(c) = connection.advance(config){
} else { connections.insert(k, c);
if let Some(connection) = connection.advance(config){
connections.insert(k, connection);
}
} }
} else { } else {
// println!("connection not found for token {}", k); // println!("connection not found for token {}", k);

View file

@ -271,7 +271,7 @@ pub enum OutMessage {
impl OutMessage { impl OutMessage {
#[inline] #[inline]
pub fn to_ws_message(self) -> tungstenite::Message { pub fn into_ws_message(self) -> tungstenite::Message {
let json = match self { let json = match self {
Self::AnnounceResponse(message) => { Self::AnnounceResponse(message) => {
serde_json::to_string( serde_json::to_string(