aquatic: add Config struct, use in code

This commit is contained in:
Joakim Frostegård 2020-04-06 20:14:24 +02:00
parent ad1fa5b833
commit e4dfe2852c
6 changed files with 51 additions and 21 deletions

22
aquatic/src/lib/config.rs Normal file
View file

@ -0,0 +1,22 @@
use std::net::SocketAddr;
#[derive(Clone)]
pub struct Config {
pub address: SocketAddr,
pub recv_buffer_size: usize,
pub max_scrape_torrents: u8,
pub max_response_peers: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
address: SocketAddr::from(([127, 0, 0, 1], 3000)),
recv_buffer_size: 4096 * 16,
max_scrape_torrents: 255,
max_response_peers: 255,
}
}
}

View file

@ -8,6 +8,7 @@ use rand::{Rng, SeedableRng, rngs::SmallRng, thread_rng};
use bittorrent_udp::types::*; use bittorrent_udp::types::*;
use crate::common::*; use crate::common::*;
use crate::config::Config;
pub fn handle_connect_requests( pub fn handle_connect_requests(
@ -42,6 +43,7 @@ pub fn handle_connect_requests(
pub fn handle_announce_requests( pub fn handle_announce_requests(
state: &State, state: &State,
config: &Config,
responses: &mut Vec<(Response, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>,
requests: Drain<(AnnounceRequest, SocketAddr)>, requests: Drain<(AnnounceRequest, SocketAddr)>,
){ ){
@ -100,8 +102,8 @@ pub fn handle_announce_requests(
let response_peers = extract_response_peers( let response_peers = extract_response_peers(
&mut rng, &mut rng,
&torrent_data.peers, &torrent_data.peers,
255 config.max_response_peers,
); // FIXME num peers ); // FIXME: check how many peers announcing peer wants
let response = Response::Announce(AnnounceResponse { let response = Response::Announce(AnnounceResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,

View file

@ -1,25 +1,27 @@
use std::time::Duration; use std::time::Duration;
pub mod common; pub mod common;
pub mod config;
pub mod handlers; pub mod handlers;
pub mod network; pub mod network;
pub mod tasks; pub mod tasks;
use config::Config;
use common::State; use common::State;
pub fn run(){ pub fn run(){
let addr = ([127, 0, 0, 1], 3000).into(); let config = Config::default();
let state = State::new(); let state = State::new();
let socket = network::create_socket(addr, 4096 * 8); let socket = network::create_socket(&config);
let socket_timeout = Duration::from_millis(1000);
for i in 0..4 { for i in 0..4 {
let socket = socket.try_clone().unwrap(); let socket = socket.try_clone().unwrap();
let state = state.clone(); let state = state.clone();
let config = config.clone();
::std::thread::spawn(move || { ::std::thread::spawn(move || {
network::run_event_loop(state, socket, i, socket_timeout); network::run_event_loop(state, config, socket, i);
}); });
} }

View file

@ -1,5 +1,4 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration;
use std::io::ErrorKind; use std::io::ErrorKind;
use mio::{Events, Poll, Interest, Token}; use mio::{Events, Poll, Interest, Token};
@ -11,16 +10,13 @@ use bittorrent_udp::types::IpVersion;
use bittorrent_udp::converters::{response_to_bytes, request_from_bytes}; use bittorrent_udp::converters::{response_to_bytes, request_from_bytes};
use crate::common::*; use crate::common::*;
use crate::config::Config;
use crate::handlers::*; use crate::handlers::*;
pub fn create_socket( pub fn create_socket(config: &Config) -> ::std::net::UdpSocket {
addr: SocketAddr,
recv_buffer_size: usize,
) -> ::std::net::UdpSocket {
let mut builder = &{ let mut builder = &{
if addr.is_ipv4(){ if config.address.is_ipv4(){
UdpBuilder::new_v4().expect("socket: build") UdpBuilder::new_v4().expect("socket: build")
} else { } else {
UdpBuilder::new_v6().expect("socket: build") UdpBuilder::new_v6().expect("socket: build")
@ -30,16 +26,16 @@ pub fn create_socket(
builder = builder.reuse_port(true) builder = builder.reuse_port(true)
.expect("socket: set reuse port"); .expect("socket: set reuse port");
let socket = builder.bind(&addr) let socket = builder.bind(&config.address)
.expect(&format!("socket: bind to {}", addr)); .expect(&format!("socket: bind to {}", &config.address));
socket.set_nonblocking(true) socket.set_nonblocking(true)
.expect("socket: set nonblocking"); .expect("socket: set nonblocking");
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size){ if let Err(err) = socket.set_recv_buffer_size(config.recv_buffer_size){
eprintln!( eprintln!(
"socket: failed setting recv buffer to {}: {:?}", "socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size, config.recv_buffer_size,
err err
); );
} }
@ -50,9 +46,9 @@ pub fn create_socket(
pub fn run_event_loop( pub fn run_event_loop(
state: State, state: State,
config: Config,
socket: ::std::net::UdpSocket, socket: ::std::net::UdpSocket,
token_num: usize, token_num: usize,
poll_timeout: Duration,
){ ){
let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut buffer = [0u8; MAX_PACKET_SIZE];
@ -73,7 +69,7 @@ pub fn run_event_loop(
let mut responses: Vec<(Response, SocketAddr)> = Vec::with_capacity(1024); let mut responses: Vec<(Response, SocketAddr)> = Vec::with_capacity(1024);
loop { loop {
poll.poll(&mut events, Some(poll_timeout)) poll.poll(&mut events, None)
.expect("failed polling"); .expect("failed polling");
for event in events.iter(){ for event in events.iter(){
@ -83,6 +79,7 @@ pub fn run_event_loop(
if event.is_readable(){ if event.is_readable(){
handle_readable_socket( handle_readable_socket(
&state, &state,
&config,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
&mut responses, &mut responses,
@ -104,6 +101,7 @@ pub fn run_event_loop(
/// Read requests, generate and send back responses /// Read requests, generate and send back responses
fn handle_readable_socket( fn handle_readable_socket(
state: &State, state: &State,
config: &Config,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
responses: &mut Vec<(Response, SocketAddr)>, responses: &mut Vec<(Response, SocketAddr)>,
@ -116,7 +114,7 @@ fn handle_readable_socket(
Ok((amt, src)) => { Ok((amt, src)) => {
let request = request_from_bytes( let request = request_from_bytes(
&buffer[..amt], &buffer[..amt],
255u8 // FIXME config.max_scrape_torrents
); );
match request { match request {
@ -162,6 +160,7 @@ fn handle_readable_socket(
); );
handle_announce_requests( handle_announce_requests(
state, state,
config,
responses, responses,
announce_requests.drain(..), announce_requests.drain(..),
); );

View file

@ -6,6 +6,7 @@ use rand_distr::Pareto;
use aquatic::handlers::*; use aquatic::handlers::*;
use aquatic::common::*; use aquatic::common::*;
use aquatic::config::Config;
use aquatic_bench::*; use aquatic_bench::*;
use crate::common::*; use crate::common::*;
@ -16,6 +17,7 @@ const ANNOUNCE_REQUESTS: usize = 1_000_000;
pub fn bench( pub fn bench(
state: &State, state: &State,
config: &Config,
requests: Vec<(AnnounceRequest, SocketAddr)>, requests: Vec<(AnnounceRequest, SocketAddr)>,
) -> (f64, f64) { ) -> (f64, f64) {
let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS); let mut responses = Vec::with_capacity(ANNOUNCE_REQUESTS);
@ -26,6 +28,7 @@ pub fn bench(
handle_announce_requests( handle_announce_requests(
&state, &state,
config,
&mut responses, &mut responses,
requests, requests,
); );

View file

@ -16,6 +16,7 @@ use num_format::{Locale, ToFormattedString};
use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng}; use rand::{Rng, thread_rng, rngs::SmallRng, SeedableRng};
use aquatic::common::*; use aquatic::common::*;
use aquatic::config::Config;
mod announce; mod announce;
@ -76,6 +77,7 @@ fn main(){
let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
let info_hashes = create_info_hashes(&mut rng); let info_hashes = create_info_hashes(&mut rng);
let config = Config::default();
let state_for_scrape: State = { let state_for_scrape: State = {
let requests = announce::create_requests( let requests = announce::create_requests(
@ -103,7 +105,7 @@ fn main(){
state.connections.insert(key, time); state.connections.insert(key, time);
} }
let d = announce::bench(&state, requests.clone()); let d = announce::bench(&state, &config, requests.clone());
announce_data.0 += d.0; announce_data.0 += d.0;
announce_data.1 += d.1; announce_data.1 += d.1;