http_private: add config, launch with cli helpers

This commit is contained in:
Joakim Frostegård 2022-04-02 14:35:40 +02:00
parent 6e97bff93f
commit 088daa72ff
8 changed files with 200 additions and 30 deletions

5
Cargo.lock generated
View file

@ -144,10 +144,15 @@ name = "aquatic_http_private"
version = "0.1.0"
dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http_protocol",
"aquatic_toml_config",
"axum",
"dotenv",
"hex",
"mimalloc",
"serde",
"socket2 0.4.4",
"sqlx",
"tokio",

View file

@ -3,13 +3,24 @@ name = "aquatic_http_private"
version = "0.1.0"
edition = "2021"
[lib]
name = "aquatic_http_private"
[[bin]]
name = "aquatic_http_private"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = "0.2.0"
aquatic_http_protocol = "0.2.0"
aquatic_toml_config = "0.2.0"
anyhow = "1"
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
dotenv = "0.15"
hex = "0.4"
mimalloc = { version = "0.1", default-features = false }
serde = { version = "1", features = ["derive"] }
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }

View file

@ -0,0 +1,121 @@
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::privileges::PrivilegeConfig;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
/// aquatic_http_private configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the request workers. They then receive responses from the
/// request workers, encode them and send them back over the socket.
pub socket_workers: usize,
/// Request workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
pub request_workers: usize,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
}
}
}
impl aquatic_cli_helpers::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf,
/// Keep connections alive after sending a response
pub keep_alive: bool,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)),
tls_certificate_path: "".into(),
tls_private_key_path: "".into(),
keep_alive: false,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of requested peers to accept in announce request
pub max_peers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
max_scrape_torrents: 100,
max_peers: 50,
peer_announce_interval: 300,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,
/// Clean connections this often (seconds)
pub connection_cleaning_interval: u64,
/// Remove peers that have not announced for this long (seconds)
pub max_peer_age: u64,
/// Remove connections that haven't seen valid requests for this long (seconds)
pub max_connection_idle: u64,
}
impl Default for CleaningConfig {
fn default() -> Self {
Self {
torrent_cleaning_interval: 30,
connection_cleaning_interval: 60,
max_peer_age: 360,
max_connection_idle: 180,
}
}
}
#[cfg(test)]
mod tests {
use super::Config;
::aquatic_toml_config::gen_serialize_deserialize_test!(Config);
}

View file

@ -0,0 +1,41 @@
pub mod config;
mod workers;
use dotenv::dotenv;
pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: config::Config) -> anyhow::Result<()> {
dotenv().ok();
let mut handles = Vec::new();
for _ in 0..config.socket_workers {
let config = config.clone();
let handle = ::std::thread::Builder::new()
.name("socket".into())
.spawn(move || workers::socket::run_socket_worker(config))?;
handles.push(handle);
}
for _ in 0..config.request_workers {
let config = config.clone();
let handle = ::std::thread::Builder::new()
.name("request".into())
.spawn(move || workers::request::run_request_worker(config))?;
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??;
}
Ok(())
}

View file

@ -1,24 +1,14 @@
mod workers;
use dotenv::dotenv;
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_http_private::config::Config;
fn main() -> anyhow::Result<()> {
dotenv().ok();
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
let mut handles = Vec::new();
for _ in 0..2 {
let handle = ::std::thread::Builder::new()
.name("socket".into())
.spawn(move || workers::socket::run_socket_worker())?;
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??;
}
Ok(())
fn main() {
run_app_with_cli_and_config::<Config>(
aquatic_http_private::APP_NAME,
aquatic_http_private::APP_VERSION,
aquatic_http_private::run,
None,
)
}

View file

@ -1 +1,2 @@
pub mod request;
pub mod socket;

View file

@ -0,0 +1,5 @@
use crate::config::Config;
pub fn run_request_worker(config: Config) -> anyhow::Result<()> {
Ok(())
}

View file

@ -1,16 +1,16 @@
mod db;
mod routes;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener};
use std::net::{SocketAddr, TcpListener};
use anyhow::Context;
use axum::{routing::get, Extension, Router};
use sqlx::mysql::MySqlPoolOptions;
pub fn run_socket_worker() -> anyhow::Result<()> {
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
use crate::config::Config;
let tcp_listener = create_tcp_listener(addr, false)?;
pub fn run_socket_worker(config: Config) -> anyhow::Result<()> {
let tcp_listener = create_tcp_listener(config.network.address)?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
@ -40,7 +40,7 @@ async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
Ok(())
}
fn create_tcp_listener(addr: SocketAddr, only_ipv6: bool) -> anyhow::Result<TcpListener> {
fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result<TcpListener> {
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
@ -49,10 +49,6 @@ fn create_tcp_listener(addr: SocketAddr, only_ipv6: bool) -> anyhow::Result<TcpL
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
if only_ipv6 {
socket.set_only_v6(true).with_context(|| "set only_ipv6")?;
}
socket
.set_reuse_port(true)
.with_context(|| "set_reuse_port")?;