diff --git a/TODO.md b/TODO.md index f4c6537..cf2d0e0 100644 --- a/TODO.md +++ b/TODO.md @@ -9,11 +9,10 @@ * test * test full torrent transfer (offer-answer exchange) * torrent state cleaning -* config - * limit number of info hashes allowed in scrape requests -* cli, mimalloc * log crate instead of println/eprintln * privdrop +* some config.network fields are actually used in handler. maybe they should + be checked while parsing? not completely clear ## aquatic_udp * mio: set oneshot for epoll and kqueue? otherwise, stop reregistering? diff --git a/aquatic_ws/src/bin/main.rs b/aquatic_ws/src/bin/main.rs index fc15728..9a957b0 100644 --- a/aquatic_ws/src/bin/main.rs +++ b/aquatic_ws/src/bin/main.rs @@ -1,6 +1,14 @@ use aquatic_ws; +use cli_helpers; + + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main(){ - aquatic_ws::run(); + cli_helpers::run_app_with_cli_and_config::( + "aquatic: webtorrent tracker", + aquatic_ws::run, + ) } \ No newline at end of file diff --git a/aquatic_ws/src/lib/config.rs b/aquatic_ws/src/lib/config.rs new file mode 100644 index 0000000..5458491 --- /dev/null +++ b/aquatic_ws/src/lib/config.rs @@ -0,0 +1,145 @@ +use std::net::SocketAddr; + +use serde::{Serialize, Deserialize}; + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct Config { + /// Socket workers receive requests from the socket, parse them and send + /// them on to the request workers. They then recieve responses from the + /// request workers, encode them and send them back over the socket. + pub socket_workers: usize, + /// Request workers receive a number of requests from socket workers, + /// generate responses and send them back to the socket workers. + pub network: NetworkConfig, + pub handlers: HandlerConfig, + pub cleaning: CleaningConfig, + pub privileges: PrivilegeConfig, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct NetworkConfig { + /// Bind to this address + pub address: SocketAddr, + /// Maximum number of torrents to accept in scrape request + pub max_scrape_torrents: usize, // FIXME: should this really be in NetworkConfig? + /// Maximum number of offers to accept in announce request + pub max_offers: usize, // FIXME: should this really be in NetworkConfig? + /// Ask peers to announce this often (seconds) + pub peer_announce_interval: usize, // FIXME: should this really be in NetworkConfig? + /// Size of socket recv buffer. Use 0 for OS default. + /// + /// This setting can have a big impact on dropped packages. It might + /// require changing system defaults. Some examples of commands to set + /// recommended values for different operating systems: + /// + /// macOS: + /// $ sudo sysctl net.inet.udp.recvspace=6000000 + /// $ sudo sysctl net.inet.udp.maxdgram=500000 # Not necessary, but recommended + /// $ sudo sysctl kern.ipc.maxsockbuf=8388608 # Not necessary, but recommended + /// + /// Linux: + /// $ sudo sysctl -w net.core.rmem_max=104857600 + /// $ sudo sysctl -w net.core.rmem_default=104857600 + pub socket_recv_buffer_size: usize, // FIXME: implement + pub poll_event_capacity: usize, + pub poll_timeout_milliseconds: u64, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct HandlerConfig { + /// Maximum number of requests to receive from channel before locking + /// mutex and starting work + pub max_requests_per_iter: usize, + pub channel_recv_timeout_microseconds: u64, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct CleaningConfig { + /// Clean peers this often (seconds) + pub interval: u64, // FIXME: implement + /// Remove peers that haven't announced for this long (seconds) + pub max_peer_age: u64, + /// Remove connections that are older than this (seconds) + pub max_connection_age: u64, +} + + +// FIXME: implement +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct PrivilegeConfig { + /// Chroot and switch user after binding to sockets + pub drop_privileges: bool, + /// Chroot to this path + pub chroot_path: String, + /// User to switch to after chrooting + pub user: String, +} + + +impl Default for Config { + fn default() -> Self { + Self { + socket_workers: 1, + network: NetworkConfig::default(), + handlers: HandlerConfig::default(), + cleaning: CleaningConfig::default(), + privileges: PrivilegeConfig::default(), + } + } +} + + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + address: SocketAddr::from(([127, 0, 0, 1], 3000)), + max_scrape_torrents: 255, + max_offers: 10, + peer_announce_interval: 60 * 15, + poll_event_capacity: 4096, + poll_timeout_milliseconds: 50, + socket_recv_buffer_size: 4096 * 128, + } + } +} + + +impl Default for HandlerConfig { + fn default() -> Self { + Self { + max_requests_per_iter: 10000, + channel_recv_timeout_microseconds: 200, + } + } +} + + +impl Default for CleaningConfig { + fn default() -> Self { + Self { + interval: 30, + max_peer_age: 60 * 20, + max_connection_age: 60 * 5, + } + } +} + + +impl Default for PrivilegeConfig { + fn default() -> Self { + Self { + drop_privileges: false, + chroot_path: ".".to_string(), + user: "nobody".to_string(), + } + } +} \ No newline at end of file diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index 8444040..9427a68 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -8,10 +8,12 @@ use rand::{Rng, SeedableRng, rngs::SmallRng}; use aquatic_common::extract_response_peers; use crate::common::*; +use crate::config::Config; use crate::protocol::*; pub fn run_request_worker( + config: Config, state: State, in_message_receiver: InMessageReceiver, out_message_sender: OutMessageSender, @@ -23,12 +25,14 @@ pub fn run_request_worker( let mut rng = SmallRng::from_entropy(); - let timeout = Duration::from_micros(200); // FIXME: config + let timeout = Duration::from_micros( + config.handlers.channel_recv_timeout_microseconds + ); loop { let mut opt_torrent_map_guard: Option> = None; - for i in 0..1000 { // FIXME config + for i in 0..config.handlers.max_requests_per_iter { let opt_in_message = if i == 0 { in_message_receiver.recv().ok() } else { @@ -56,6 +60,7 @@ pub fn run_request_worker( .unwrap_or_else(|| state.torrents.lock()); handle_announce_requests( + &config, &mut rng, &mut torrent_map_guard, &mut out_messages, @@ -63,6 +68,7 @@ pub fn run_request_worker( ); handle_scrape_requests( + &config, &mut torrent_map_guard, &mut out_messages, scrape_requests.drain(..) @@ -78,12 +84,13 @@ pub fn run_request_worker( pub fn handle_announce_requests( + config: &Config, rng: &mut impl Rng, torrents: &mut TorrentMap, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, requests: Drain<(ConnectionMeta, AnnounceRequest)>, ){ - let valid_until = ValidUntil::new(240); + let valid_until = ValidUntil::new(config.cleaning.max_peer_age); for (sender_meta, request) in requests { let info_hash = request.info_hash; @@ -144,7 +151,9 @@ pub fn handle_announce_requests( // If peer sent offers, send them on to random peers if let Some(offers) = request.offers { - let max_num_peers_to_take = offers.len().min(10); // FIXME: config + // FIXME: config: also maybe check this when parsing request + let max_num_peers_to_take = offers.len() + .min(config.network.max_offers); #[inline] fn f(peer: &Peer) -> Peer { @@ -197,7 +206,7 @@ pub fn handle_announce_requests( info_hash, complete: torrent_data.num_seeders, incomplete: torrent_data.num_leechers, - announce_interval: 120, // FIXME: config + announce_interval: config.network.peer_announce_interval, }); messages_out.push((sender_meta, response)); @@ -206,6 +215,7 @@ pub fn handle_announce_requests( pub fn handle_scrape_requests( + config: &Config, torrents: &mut TorrentMap, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, requests: Drain<(ConnectionMeta, ScrapeRequest)>, @@ -217,9 +227,11 @@ pub fn handle_scrape_requests( files: HashMap::with_capacity(num_info_hashes), }; + let max_torrents = config.network.max_scrape_torrents; + // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. - for info_hash in request.info_hashes { + for info_hash in request.info_hashes.into_iter().take(max_torrents){ if let Some(torrent_data) = torrents.get(&info_hash){ let stats = ScrapeStatistics { complete: torrent_data.num_seeders, diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index a7822ee..48c2769 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -2,23 +2,24 @@ //! exact protocol is achieved pub mod common; +pub mod config; pub mod handler; pub mod network; pub mod protocol; use common::*; +use config::Config; -pub fn run(){ - let address: ::std::net::SocketAddr = "0.0.0.0:3000".parse().unwrap(); - +pub fn run(config: Config){ let state = State::default(); let (in_message_sender, in_message_receiver) = ::flume::unbounded(); let mut out_message_senders = Vec::new(); - for i in 0..1 { + for i in 0..config.socket_workers { + let config = config.clone(); let in_message_sender = in_message_sender.clone(); let (out_message_sender, out_message_receiver) = ::flume::unbounded(); @@ -27,7 +28,7 @@ pub fn run(){ ::std::thread::spawn(move || { network::run_socket_worker( - address, + config, i, in_message_sender, out_message_receiver, @@ -39,6 +40,7 @@ pub fn run(){ ::std::thread::spawn(move || { handler::run_request_worker( + config, state, in_message_receiver, out_message_sender, diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 1dface2..c536bf3 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -10,6 +10,7 @@ use mio::{Events, Poll, Interest, Token}; use mio::net::{TcpListener, TcpStream}; use crate::common::*; +use crate::config::Config; use crate::protocol::*; @@ -100,16 +101,18 @@ pub fn remove_inactive_connections( pub fn run_socket_worker( - address: SocketAddr, + config: Config, socket_worker_index: usize, in_message_sender: InMessageSender, out_message_receiver: OutMessageReceiver, ){ - let poll_timeout = Duration::from_millis(50); // FIXME: config + let poll_timeout = Duration::from_millis( + config.network.poll_timeout_milliseconds + ); - let mut listener = TcpListener::bind(address).unwrap(); + let mut listener = TcpListener::bind(config.network.address).unwrap(); let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(1024); // FIXME: config + let mut events = Events::with_capacity(config.network.poll_event_capacity); poll.registry() .register(&mut listener, Token(0), Interest::READABLE) @@ -124,7 +127,7 @@ pub fn run_socket_worker( poll.poll(&mut events, Some(poll_timeout)) .expect("failed polling"); - let valid_until = ValidUntil::new(180); // FIXME: config + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); for event in events.iter(){ let token = event.token();