From ad7e46478878c9bb341fe42eb7a790562c4db3a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 26 Oct 2021 15:26:06 +0200 Subject: [PATCH] aquatic_http: split into mio and glommio modules --- Cargo.lock | 3 + aquatic_http/Cargo.toml | 22 ++- aquatic_http/src/lib/glommio/mod.rs | 10 ++ aquatic_http/src/lib/lib.rs | 155 ++---------------- aquatic_http/src/lib/{ => mio}/common.rs | 0 aquatic_http/src/lib/{ => mio}/handler.rs | 2 +- aquatic_http/src/lib/mio/mod.rs | 150 +++++++++++++++++ .../src/lib/{ => mio}/network/connection.rs | 2 +- aquatic_http/src/lib/{ => mio}/network/mod.rs | 2 +- .../src/lib/{ => mio}/network/stream.rs | 0 .../src/lib/{ => mio}/network/utils.rs | 0 aquatic_http/src/lib/{ => mio}/tasks.rs | 3 +- 12 files changed, 196 insertions(+), 153 deletions(-) create mode 100644 aquatic_http/src/lib/glommio/mod.rs rename aquatic_http/src/lib/{ => mio}/common.rs (100%) rename aquatic_http/src/lib/{ => mio}/handler.rs (99%) create mode 100644 aquatic_http/src/lib/mio/mod.rs rename aquatic_http/src/lib/{ => mio}/network/connection.rs (99%) rename aquatic_http/src/lib/{ => mio}/network/mod.rs (99%) rename aquatic_http/src/lib/{ => mio}/network/stream.rs (100%) rename aquatic_http/src/lib/{ => mio}/network/utils.rs (100%) rename aquatic_http/src/lib/{ => mio}/tasks.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index c83d439..607e122 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,8 +91,11 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", + "cfg-if", "crossbeam-channel", "either", + "futures-lite", + "glommio", "hashbrown 0.11.2", "histogram", "indexmap", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index f604413..092bd5d 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -15,28 +15,40 @@ path = "src/lib/lib.rs" name = "aquatic_http" path = "src/bin/main.rs" +[features] +default = ["with-mio"] +with-glommio = ["glommio", "futures-lite"] +with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "socket2"] + [dependencies] anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" -crossbeam-channel = "0.5" +cfg-if = "1" either = "1" hashbrown = "0.11.2" -histogram = "0.6" indexmap = "1" itoa = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } memchr = "2" -mio = { version = "0.7", features = ["tcp", "os-poll", "os-util"] } -native-tls = "0.2" parking_lot = "0.11" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } smartstring = "0.2" -socket2 = { version = "0.4.1", features = ["all"] } + +# mio +crossbeam-channel = { version = "0.5", optional = true } +histogram = { version = "0.6", optional = true } +mio = { version = "0.7", features = ["tcp", "os-poll", "os-util"], optional = true } +native-tls = { version = "0.2", optional = true } +socket2 = { version = "0.4.1", features = ["all"], optional = true } + +# glommio +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } +futures-lite = { version = "1", optional = true } [dev-dependencies] quickcheck = "1.0" diff --git a/aquatic_http/src/lib/glommio/mod.rs b/aquatic_http/src/lib/glommio/mod.rs new file mode 100644 index 0000000..0f2955f --- /dev/null +++ b/aquatic_http/src/lib/glommio/mod.rs @@ -0,0 +1,10 @@ +use glommio::prelude::*; + +use crate::config::Config; + +pub fn run( + config: Config, +) -> anyhow::Result<()> { + + Ok(()) +} \ No newline at end of file diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 3f96ef2..ee6f8e8 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -1,153 +1,20 @@ -use std::sync::Arc; -use std::thread::Builder; -use std::time::Duration; +use cfg_if::cfg_if; -use anyhow::Context; -use mio::{Poll, Waker}; -use parking_lot::Mutex; -use privdrop::PrivDrop; - -pub mod common; pub mod config; -pub mod handler; -pub mod network; -pub mod tasks; -use common::*; -use config::Config; -use network::utils::create_tls_acceptor; +#[cfg(feature = "with-mio")] +pub mod mio; +#[cfg(all(feature = "with-glommio", target_os = "linux"))] +pub mod glommio; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; -pub fn run(config: Config) -> anyhow::Result<()> { - let state = State::default(); - - tasks::update_access_list(&config, &state); - - start_workers(config.clone(), state.clone())?; - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - tasks::update_access_list(&config, &state); - - state - .torrent_maps - .lock() - .clean(&config, &state.access_list.load_full()); - } -} - -pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { - let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?; - - let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); - - let mut out_message_senders = Vec::new(); - let mut wakers = Vec::new(); - - let socket_worker_statuses: SocketWorkerStatuses = { - let mut statuses = Vec::new(); - - for _ in 0..config.socket_workers { - statuses.push(None); - } - - Arc::new(Mutex::new(statuses)) - }; - - for i in 0..config.socket_workers { - let config = config.clone(); - let state = state.clone(); - let socket_worker_statuses = socket_worker_statuses.clone(); - let request_channel_sender = request_channel_sender.clone(); - let opt_tls_acceptor = opt_tls_acceptor.clone(); - let poll = Poll::new().expect("create poll"); - let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN).expect("create waker")); - - let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded(); - - out_message_senders.push(response_channel_sender); - wakers.push(waker); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - network::run_socket_worker( - config, - state, - i, - socket_worker_statuses, - request_channel_sender, - response_channel_receiver, - opt_tls_acceptor, - poll, - ); - })?; - } - - // Wait for socket worker statuses. On error from any, quit program. - // On success from all, drop privileges if corresponding setting is set - // and continue program. - loop { - ::std::thread::sleep(::std::time::Duration::from_millis(10)); - - if let Some(statuses) = socket_worker_statuses.try_lock() { - for opt_status in statuses.iter() { - if let Some(Err(err)) = opt_status { - return Err(::anyhow::anyhow!(err.to_owned())); - } - } - - if statuses.iter().all(Option::is_some) { - if config.privileges.drop_privileges { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply() - .context("Couldn't drop root privileges")?; - } - - break; - } +pub fn run(config: config::Config) -> ::anyhow::Result<()> { + cfg_if! { + if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { + glommio::run(config) + } else { + mio::run(config) } } - - let response_channel_sender = ResponseChannelSender::new(out_message_senders); - - for i in 0..config.request_workers { - let config = config.clone(); - let state = state.clone(); - let request_channel_receiver = request_channel_receiver.clone(); - let response_channel_sender = response_channel_sender.clone(); - let wakers = wakers.clone(); - - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - handler::run_request_worker( - config, - state, - request_channel_receiver, - response_channel_sender, - wakers, - ); - })?; - } - - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); - - Builder::new() - .name("statistics".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::print_statistics(&state); - }) - .expect("spawn statistics thread"); - } - - Ok(()) } diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/mio/common.rs similarity index 100% rename from aquatic_http/src/lib/common.rs rename to aquatic_http/src/lib/mio/common.rs diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/mio/handler.rs similarity index 99% rename from aquatic_http/src/lib/handler.rs rename to aquatic_http/src/lib/mio/handler.rs index cd823d8..c54df07 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/mio/handler.rs @@ -13,8 +13,8 @@ use aquatic_common::extract_response_peers; use aquatic_http_protocol::request::*; use aquatic_http_protocol::response::*; -use crate::common::*; use crate::config::Config; +use super::common::*; pub fn run_request_worker( config: Config, diff --git a/aquatic_http/src/lib/mio/mod.rs b/aquatic_http/src/lib/mio/mod.rs new file mode 100644 index 0000000..ea61019 --- /dev/null +++ b/aquatic_http/src/lib/mio/mod.rs @@ -0,0 +1,150 @@ +use std::sync::Arc; +use std::thread::Builder; +use std::time::Duration; + +use anyhow::Context; +use mio::{Poll, Waker}; +use parking_lot::Mutex; +use privdrop::PrivDrop; + +pub mod common; +pub mod handler; +pub mod network; +pub mod tasks; + +use crate::config::Config; +use common::*; +use network::utils::create_tls_acceptor; + +pub fn run(config: Config) -> anyhow::Result<()> { + let state = State::default(); + + tasks::update_access_list(&config, &state); + + start_workers(config.clone(), state.clone())?; + + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + tasks::update_access_list(&config, &state); + + state + .torrent_maps + .lock() + .clean(&config, &state.access_list.load_full()); + } +} + +pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { + let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?; + + let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); + + let mut out_message_senders = Vec::new(); + let mut wakers = Vec::new(); + + let socket_worker_statuses: SocketWorkerStatuses = { + let mut statuses = Vec::new(); + + for _ in 0..config.socket_workers { + statuses.push(None); + } + + Arc::new(Mutex::new(statuses)) + }; + + for i in 0..config.socket_workers { + let config = config.clone(); + let state = state.clone(); + let socket_worker_statuses = socket_worker_statuses.clone(); + let request_channel_sender = request_channel_sender.clone(); + let opt_tls_acceptor = opt_tls_acceptor.clone(); + let poll = Poll::new().expect("create poll"); + let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN).expect("create waker")); + + let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded(); + + out_message_senders.push(response_channel_sender); + wakers.push(waker); + + Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + network::run_socket_worker( + config, + state, + i, + socket_worker_statuses, + request_channel_sender, + response_channel_receiver, + opt_tls_acceptor, + poll, + ); + })?; + } + + // Wait for socket worker statuses. On error from any, quit program. + // On success from all, drop privileges if corresponding setting is set + // and continue program. + loop { + ::std::thread::sleep(::std::time::Duration::from_millis(10)); + + if let Some(statuses) = socket_worker_statuses.try_lock() { + for opt_status in statuses.iter() { + if let Some(Err(err)) = opt_status { + return Err(::anyhow::anyhow!(err.to_owned())); + } + } + + if statuses.iter().all(Option::is_some) { + if config.privileges.drop_privileges { + PrivDrop::default() + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) + .apply() + .context("Couldn't drop root privileges")?; + } + + break; + } + } + } + + let response_channel_sender = ResponseChannelSender::new(out_message_senders); + + for i in 0..config.request_workers { + let config = config.clone(); + let state = state.clone(); + let request_channel_receiver = request_channel_receiver.clone(); + let response_channel_sender = response_channel_sender.clone(); + let wakers = wakers.clone(); + + Builder::new() + .name(format!("request-{:02}", i + 1)) + .spawn(move || { + handler::run_request_worker( + config, + state, + request_channel_receiver, + response_channel_sender, + wakers, + ); + })?; + } + + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new() + .name("statistics".to_string()) + .spawn(move || loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::print_statistics(&state); + }) + .expect("spawn statistics thread"); + } + + Ok(()) +} diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/mio/network/connection.rs similarity index 99% rename from aquatic_http/src/lib/network/connection.rs rename to aquatic_http/src/lib/mio/network/connection.rs index 4324dfc..57fee71 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/mio/network/connection.rs @@ -10,7 +10,7 @@ use native_tls::{MidHandshakeTlsStream, TlsAcceptor}; use aquatic_http_protocol::request::{Request, RequestParseError}; -use crate::common::*; +use crate::mio::common::*; use super::stream::Stream; diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/mio/network/mod.rs similarity index 99% rename from aquatic_http/src/lib/network/mod.rs rename to aquatic_http/src/lib/mio/network/mod.rs index dd7d889..ed0a552 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/mio/network/mod.rs @@ -13,7 +13,7 @@ use native_tls::TlsAcceptor; use aquatic_http_protocol::response::*; -use crate::common::*; +use crate::mio::common::*; use crate::config::Config; pub mod connection; diff --git a/aquatic_http/src/lib/network/stream.rs b/aquatic_http/src/lib/mio/network/stream.rs similarity index 100% rename from aquatic_http/src/lib/network/stream.rs rename to aquatic_http/src/lib/mio/network/stream.rs diff --git a/aquatic_http/src/lib/network/utils.rs b/aquatic_http/src/lib/mio/network/utils.rs similarity index 100% rename from aquatic_http/src/lib/network/utils.rs rename to aquatic_http/src/lib/mio/network/utils.rs diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/mio/tasks.rs similarity index 97% rename from aquatic_http/src/lib/tasks.rs rename to aquatic_http/src/lib/mio/tasks.rs index 341a434..67c9158 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/mio/tasks.rs @@ -2,7 +2,8 @@ use histogram::Histogram; use aquatic_common::access_list::{AccessListMode, AccessListQuery}; -use crate::{common::*, config::Config}; +use crate::config::Config; +use super::common::*; pub fn update_access_list(config: &Config, state: &State) { match config.access_list.mode {