From e4dfe2852cb0b5c023d986f4c07f8020008a354a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 6 Apr 2020 20:14:24 +0200 Subject: [PATCH] aquatic: add Config struct, use in code --- aquatic/src/lib/config.rs | 22 +++++++++++++++ aquatic/src/lib/handlers.rs | 6 +++-- aquatic/src/lib/lib.rs | 10 ++++--- aquatic/src/lib/network.rs | 27 +++++++++---------- .../src/bin/bench_handlers/announce.rs | 3 +++ aquatic_bench/src/bin/bench_handlers/main.rs | 4 ++- 6 files changed, 51 insertions(+), 21 deletions(-) create mode 100644 aquatic/src/lib/config.rs diff --git a/aquatic/src/lib/config.rs b/aquatic/src/lib/config.rs new file mode 100644 index 0000000..da3677f --- /dev/null +++ b/aquatic/src/lib/config.rs @@ -0,0 +1,22 @@ +use std::net::SocketAddr; + + +#[derive(Clone)] +pub struct Config { + pub address: SocketAddr, + pub recv_buffer_size: usize, + pub max_scrape_torrents: u8, + pub max_response_peers: usize, +} + + +impl Default for Config { + fn default() -> Self { + Self { + address: SocketAddr::from(([127, 0, 0, 1], 3000)), + recv_buffer_size: 4096 * 16, + max_scrape_torrents: 255, + max_response_peers: 255, + } + } +} \ No newline at end of file diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index e173300..9fbb976 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -8,6 +8,7 @@ use rand::{Rng, SeedableRng, rngs::SmallRng, thread_rng}; use bittorrent_udp::types::*; use crate::common::*; +use crate::config::Config; pub fn handle_connect_requests( @@ -42,6 +43,7 @@ pub fn handle_connect_requests( pub fn handle_announce_requests( state: &State, + config: &Config, responses: &mut Vec<(Response, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>, ){ @@ -100,8 +102,8 @@ pub fn handle_announce_requests( let response_peers = extract_response_peers( &mut rng, &torrent_data.peers, - 255 - ); // FIXME num peers + config.max_response_peers, + ); // FIXME: check how many peers announcing peer wants let response = Response::Announce(AnnounceResponse { transaction_id: request.transaction_id, diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index a979983..0e7e1d6 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -1,25 +1,27 @@ use std::time::Duration; pub mod common; +pub mod config; pub mod handlers; pub mod network; pub mod tasks; +use config::Config; use common::State; pub fn run(){ - let addr = ([127, 0, 0, 1], 3000).into(); + let config = Config::default(); let state = State::new(); - let socket = network::create_socket(addr, 4096 * 8); - let socket_timeout = Duration::from_millis(1000); + let socket = network::create_socket(&config); for i in 0..4 { let socket = socket.try_clone().unwrap(); let state = state.clone(); + let config = config.clone(); ::std::thread::spawn(move || { - network::run_event_loop(state, socket, i, socket_timeout); + network::run_event_loop(state, config, socket, i); }); } diff --git a/aquatic/src/lib/network.rs b/aquatic/src/lib/network.rs index 1afe72d..85f37b9 100644 --- a/aquatic/src/lib/network.rs +++ b/aquatic/src/lib/network.rs @@ -1,5 +1,4 @@ use std::net::SocketAddr; -use std::time::Duration; use std::io::ErrorKind; use mio::{Events, Poll, Interest, Token}; @@ -11,16 +10,13 @@ use bittorrent_udp::types::IpVersion; use bittorrent_udp::converters::{response_to_bytes, request_from_bytes}; use crate::common::*; +use crate::config::Config; use crate::handlers::*; -pub fn create_socket( - addr: SocketAddr, - recv_buffer_size: usize, -) -> ::std::net::UdpSocket { - +pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { let mut builder = &{ - if addr.is_ipv4(){ + if config.address.is_ipv4(){ UdpBuilder::new_v4().expect("socket: build") } else { UdpBuilder::new_v6().expect("socket: build") @@ -30,16 +26,16 @@ pub fn create_socket( builder = builder.reuse_port(true) .expect("socket: set reuse port"); - let socket = builder.bind(&addr) - .expect(&format!("socket: bind to {}", addr)); + let socket = builder.bind(&config.address) + .expect(&format!("socket: bind to {}", &config.address)); socket.set_nonblocking(true) .expect("socket: set nonblocking"); - if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size){ + if let Err(err) = socket.set_recv_buffer_size(config.recv_buffer_size){ eprintln!( "socket: failed setting recv buffer to {}: {:?}", - recv_buffer_size, + config.recv_buffer_size, err ); } @@ -50,9 +46,9 @@ pub fn create_socket( pub fn run_event_loop( state: State, + config: Config, socket: ::std::net::UdpSocket, token_num: usize, - poll_timeout: Duration, ){ let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -73,7 +69,7 @@ pub fn run_event_loop( let mut responses: Vec<(Response, SocketAddr)> = Vec::with_capacity(1024); loop { - poll.poll(&mut events, Some(poll_timeout)) + poll.poll(&mut events, None) .expect("failed polling"); for event in events.iter(){ @@ -83,6 +79,7 @@ pub fn run_event_loop( if event.is_readable(){ handle_readable_socket( &state, + &config, &mut socket, &mut buffer, &mut responses, @@ -104,6 +101,7 @@ pub fn run_event_loop( /// Read requests, generate and send back responses fn handle_readable_socket( state: &State, + config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], responses: &mut Vec<(Response, SocketAddr)>, @@ -116,7 +114,7 @@ fn handle_readable_socket( Ok((amt, src)) => { let request = request_from_bytes( &buffer[..amt], - 255u8 // FIXME + config.max_scrape_torrents ); match request { @@ -162,6 +160,7 @@ fn handle_readable_socket( ); handle_announce_requests( state, + config, responses, announce_requests.drain(..), ); diff --git a/aquatic_bench/src/bin/bench_handlers/announce.rs b/aquatic_bench/src/bin/bench_handlers/announce.rs index 924dceb..df473ca 100644 --- a/aquatic_bench/src/bin/bench_handlers/announce.rs +++ b/aquatic_bench/src/bin/bench_handlers/announce.rs @@ -6,6 +6,7 @@ use rand_distr::Pareto; use aquatic::handlers::*; use aquatic::common::*; +use aquatic::config::Config; use aquatic_bench::*; use crate::common::*; @@ -16,6 +17,7 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000; pub fn bench( state: &State, + config: &Config, requests: Vec<(AnnounceRequest, SocketAddr)>, ) -> (f64, f64) { let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); @@ -26,6 +28,7 @@ pub fn bench( handle_announce_requests( &state, + config, &mut responses, requests, ); diff --git a/aquatic_bench/src/bin/bench_handlers/main.rs b/aquatic_bench/src/bin/bench_handlers/main.rs index d73bacb..7b1c788 100644 --- a/aquatic_bench/src/bin/bench_handlers/main.rs +++ b/aquatic_bench/src/bin/bench_handlers/main.rs @@ -16,6 +16,7 @@ use num_format::{Locale, ToFormattedString}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use aquatic::common::*; +use aquatic::config::Config; mod announce; @@ -76,6 +77,7 @@ fn main(){ let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); let info_hashes = create_info_hashes(&mut rng); + let config = Config::default(); let state_for_scrape: State = { let requests = announce::create_requests( @@ -103,7 +105,7 @@ fn main(){ state.connections.insert(key, time); } - let d = announce::bench(&state, requests.clone()); + let d = announce::bench(&state, &config, requests.clone()); announce_data.0 += d.0; announce_data.1 += d.1;