diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 9381d7f..b33e784 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -7,7 +7,7 @@ use crate::config::Config; pub mod handlers; pub mod network; -fn start_workers(config: Config) { +pub fn run(config: Config) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); @@ -31,4 +31,6 @@ fn start_workers(config: Config) { response_mesh_builder.clone(), ); } + + Ok(()) } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index c1aa9e7..fa4d882 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,121 +1,27 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::thread::Builder; -use std::time::Duration; +use std::sync::Arc; -use anyhow::Context; -use crossbeam_channel::unbounded; -use privdrop::PrivDrop; +use aquatic_common::access_list::{AccessList, AccessListMode}; pub mod common; pub mod config; pub mod glommio; pub mod mio; -pub mod tasks; -use common::State; use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::default(); + mio::run(config) +} - tasks::update_access_list(&config, &state.access_list); - - let num_bound_sockets = start_workers(config.clone(), state.clone())?; - - if config.privileges.drop_privileges { - let mut counter = 0usize; - - loop { - let sockets = num_bound_sockets.load(Ordering::SeqCst); - - if sockets == config.socket_workers { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); +pub fn update_access_list(config: &Config, access_list: &Arc) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = access_list.update_from_path(&config.access_list.path) { + ::log::error!("Update access list from path: {:?}", err); } } - } - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - tasks::update_access_list(&config, &state.access_list); - - state.torrents.lock().clean(&config, &state.access_list); + AccessListMode::Off => {} } } - -pub fn start_workers(config: Config, state: State) -> ::anyhow::Result> { - let (request_sender, request_receiver) = unbounded(); - let (response_sender, response_receiver) = unbounded(); - - for i in 0..config.request_workers { - let state = state.clone(); - let config = config.clone(); - let request_receiver = request_receiver.clone(); - let response_sender = response_sender.clone(); - - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - mio::handlers::run_request_worker(state, config, request_receiver, response_sender) - }) - .with_context(|| "spawn request worker")?; - } - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - for i in 0..config.socket_workers { - let state = state.clone(); - let config = config.clone(); - let request_sender = request_sender.clone(); - let response_receiver = response_receiver.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - mio::network::run_socket_worker( - state, - config, - i, - request_sender, - response_receiver, - num_bound_sockets, - ) - }) - .with_context(|| "spawn socket worker")?; - } - - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); - - Builder::new() - .name("statistics-collector".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::gather_and_print_statistics(&state, &config); - }) - .with_context(|| "spawn statistics worker")?; - } - - Ok(num_bound_sockets) -} diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 030b2ee..a59f2ec 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -1,2 +1,122 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use std::thread::Builder; +use std::time::Duration; + +use anyhow::Context; +use crossbeam_channel::unbounded; +use privdrop::PrivDrop; + pub mod handlers; pub mod network; +pub mod tasks; + +use crate::common::State; +use crate::config::Config; +use crate::update_access_list; + +pub fn run(config: Config) -> ::anyhow::Result<()> { + let state = State::default(); + + update_access_list(&config, &state.access_list); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; + + if config.privileges.drop_privileges { + let mut counter = 0usize; + + loop { + let sockets = num_bound_sockets.load(Ordering::SeqCst); + + if sockets == config.socket_workers { + PrivDrop::default() + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) + .apply()?; + + break; + } + + ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } + } + } + + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + update_access_list(&config, &state.access_list); + + state.torrents.lock().clean(&config, &state.access_list); + } +} + +pub fn start_workers( + config: Config, + state: State, + num_bound_sockets: Arc, +) -> ::anyhow::Result<()> { + let (request_sender, request_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + + for i in 0..config.request_workers { + let state = state.clone(); + let config = config.clone(); + let request_receiver = request_receiver.clone(); + let response_sender = response_sender.clone(); + + Builder::new() + .name(format!("request-{:02}", i + 1)) + .spawn(move || { + handlers::run_request_worker(state, config, request_receiver, response_sender) + }) + .with_context(|| "spawn request worker")?; + } + + for i in 0..config.socket_workers { + let state = state.clone(); + let config = config.clone(); + let request_sender = request_sender.clone(); + let response_receiver = response_receiver.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + network::run_socket_worker( + state, + config, + i, + request_sender, + response_receiver, + num_bound_sockets, + ) + }) + .with_context(|| "spawn socket worker")?; + } + + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new() + .name("statistics-collector".to_string()) + .spawn(move || loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::gather_and_print_statistics(&state, &config); + }) + .with_context(|| "spawn statistics worker")?; + } + + Ok(()) +} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 6171244..7e79b0a 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -4,7 +4,7 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::vec::Drain; use crossbeam_channel::{Receiver, Sender}; diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/mio/tasks.rs similarity index 84% rename from aquatic_udp/src/lib/tasks.rs rename to aquatic_udp/src/lib/mio/tasks.rs index 2665c45..2fde39d 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/mio/tasks.rs @@ -1,24 +1,10 @@ use std::sync::atomic::Ordering; -use std::sync::Arc; use histogram::Histogram; -use aquatic_common::access_list::AccessListMode; - use crate::common::*; use crate::config::Config; -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} - } -} - pub fn gather_and_print_statistics(state: &State, config: &Config) { let interval = config.statistics.interval;