diff --git a/Cargo.lock b/Cargo.lock index 63ee8fd..3326978 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,10 +144,15 @@ name = "aquatic_http_private" version = "0.1.0" dependencies = [ "anyhow", + "aquatic_cli_helpers", + "aquatic_common", "aquatic_http_protocol", + "aquatic_toml_config", "axum", "dotenv", "hex", + "mimalloc", + "serde", "socket2 0.4.4", "sqlx", "tokio", diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index eeab0a5..a2665fd 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -3,13 +3,24 @@ name = "aquatic_http_private" version = "0.1.0" edition = "2021" +[lib] +name = "aquatic_http_private" + +[[bin]] +name = "aquatic_http_private" + [dependencies] +aquatic_cli_helpers = "0.2.0" +aquatic_common = "0.2.0" aquatic_http_protocol = "0.2.0" +aquatic_toml_config = "0.2.0" anyhow = "1" axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] } dotenv = "0.15" hex = "0.4" +mimalloc = { version = "0.1", default-features = false } +serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4", features = ["all"] } sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } tokio = { version = "1", features = ["full"] } diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs new file mode 100644 index 0000000..bad3d72 --- /dev/null +++ b/aquatic_http_private/src/config.rs @@ -0,0 +1,121 @@ +use std::{net::SocketAddr, path::PathBuf}; + +use aquatic_common::privileges::PrivilegeConfig; +use aquatic_toml_config::TomlConfig; +use serde::Deserialize; + +use aquatic_cli_helpers::LogLevel; + +/// aquatic_http_private configuration +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct Config { + /// Socket workers receive requests from the socket, parse them and send + /// them on to the request workers. They then receive responses from the + /// request workers, encode them and send them back over the socket. + pub socket_workers: usize, + /// Request workers receive a number of requests from socket workers, + /// generate responses and send them back to the socket workers. + pub request_workers: usize, + pub log_level: LogLevel, + pub network: NetworkConfig, + pub protocol: ProtocolConfig, + pub cleaning: CleaningConfig, + pub privileges: PrivilegeConfig, +} + +impl Default for Config { + fn default() -> Self { + Self { + socket_workers: 1, + request_workers: 1, + log_level: LogLevel::default(), + network: NetworkConfig::default(), + protocol: ProtocolConfig::default(), + cleaning: CleaningConfig::default(), + privileges: PrivilegeConfig::default(), + } + } +} + +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct NetworkConfig { + /// Bind to this address + pub address: SocketAddr, + /// Path to TLS certificate (DER-encoded X.509) + pub tls_certificate_path: PathBuf, + /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) + pub tls_private_key_path: PathBuf, + /// Keep connections alive after sending a response + pub keep_alive: bool, +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + address: SocketAddr::from(([0, 0, 0, 0], 3000)), + tls_certificate_path: "".into(), + tls_private_key_path: "".into(), + keep_alive: false, + } + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct ProtocolConfig { + /// Maximum number of torrents to accept in scrape request + pub max_scrape_torrents: usize, + /// Maximum number of requested peers to accept in announce request + pub max_peers: usize, + /// Ask peers to announce this often (seconds) + pub peer_announce_interval: usize, +} + +impl Default for ProtocolConfig { + fn default() -> Self { + Self { + max_scrape_torrents: 100, + max_peers: 50, + peer_announce_interval: 300, + } + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct CleaningConfig { + /// Clean peers this often (seconds) + pub torrent_cleaning_interval: u64, + /// Clean connections this often (seconds) + pub connection_cleaning_interval: u64, + /// Remove peers that have not announced for this long (seconds) + pub max_peer_age: u64, + /// Remove connections that haven't seen valid requests for this long (seconds) + pub max_connection_idle: u64, +} + +impl Default for CleaningConfig { + fn default() -> Self { + Self { + torrent_cleaning_interval: 30, + connection_cleaning_interval: 60, + max_peer_age: 360, + max_connection_idle: 180, + } + } +} + +#[cfg(test)] +mod tests { + use super::Config; + + ::aquatic_toml_config::gen_serialize_deserialize_test!(Config); +} diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs new file mode 100644 index 0000000..f0ce764 --- /dev/null +++ b/aquatic_http_private/src/lib.rs @@ -0,0 +1,41 @@ +pub mod config; +mod workers; + +use dotenv::dotenv; + +pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tracker"; +pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); + +pub fn run(config: config::Config) -> anyhow::Result<()> { + dotenv().ok(); + + let mut handles = Vec::new(); + + for _ in 0..config.socket_workers { + let config = config.clone(); + + let handle = ::std::thread::Builder::new() + .name("socket".into()) + .spawn(move || workers::socket::run_socket_worker(config))?; + + handles.push(handle); + } + + for _ in 0..config.request_workers { + let config = config.clone(); + + let handle = ::std::thread::Builder::new() + .name("request".into()) + .spawn(move || workers::request::run_request_worker(config))?; + + handles.push(handle); + } + + for handle in handles { + handle + .join() + .map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??; + } + + Ok(()) +} diff --git a/aquatic_http_private/src/main.rs b/aquatic_http_private/src/main.rs index 0768d00..c26aaeb 100644 --- a/aquatic_http_private/src/main.rs +++ b/aquatic_http_private/src/main.rs @@ -1,24 +1,14 @@ -mod workers; -use dotenv::dotenv; +use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_http_private::config::Config; -fn main() -> anyhow::Result<()> { - dotenv().ok(); +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; - let mut handles = Vec::new(); - - for _ in 0..2 { - let handle = ::std::thread::Builder::new() - .name("socket".into()) - .spawn(move || workers::socket::run_socket_worker())?; - - handles.push(handle); - } - - for handle in handles { - handle - .join() - .map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??; - } - - Ok(()) +fn main() { + run_app_with_cli_and_config::( + aquatic_http_private::APP_NAME, + aquatic_http_private::APP_VERSION, + aquatic_http_private::run, + None, + ) } diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs index d22cc84..63fc0ec 100644 --- a/aquatic_http_private/src/workers/mod.rs +++ b/aquatic_http_private/src/workers/mod.rs @@ -1 +1,2 @@ +pub mod request; pub mod socket; diff --git a/aquatic_http_private/src/workers/request.rs b/aquatic_http_private/src/workers/request.rs new file mode 100644 index 0000000..068f293 --- /dev/null +++ b/aquatic_http_private/src/workers/request.rs @@ -0,0 +1,5 @@ +use crate::config::Config; + +pub fn run_request_worker(config: Config) -> anyhow::Result<()> { + Ok(()) +} diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs index 696d5b9..f986816 100644 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -1,16 +1,16 @@ mod db; mod routes; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener}; +use std::net::{SocketAddr, TcpListener}; use anyhow::Context; use axum::{routing::get, Extension, Router}; use sqlx::mysql::MySqlPoolOptions; -pub fn run_socket_worker() -> anyhow::Result<()> { - let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000)); +use crate::config::Config; - let tcp_listener = create_tcp_listener(addr, false)?; +pub fn run_socket_worker(config: Config) -> anyhow::Result<()> { + let tcp_listener = create_tcp_listener(config.network.address)?; let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -40,7 +40,7 @@ async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> { Ok(()) } -fn create_tcp_listener(addr: SocketAddr, only_ipv6: bool) -> anyhow::Result { +fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result { let domain = if addr.is_ipv4() { socket2::Domain::IPV4 } else { @@ -49,10 +49,6 @@ fn create_tcp_listener(addr: SocketAddr, only_ipv6: bool) -> anyhow::Result