From c8a08cb124f526d37d7cfbb5a0a76bd51dd14006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 10 Apr 2023 00:20:44 +0200 Subject: [PATCH] Remove aquatic_http_private --- Cargo.lock | 471 +----------------- Cargo.toml | 1 - aquatic_http_private/Cargo.toml | 38 -- aquatic_http_private/README.md | 96 ---- aquatic_http_private/src/common.rs | 52 -- aquatic_http_private/src/config.rs | 119 ----- aquatic_http_private/src/lib.rs | 103 ---- aquatic_http_private/src/main.rs | 14 - aquatic_http_private/src/workers/mod.rs | 2 - aquatic_http_private/src/workers/socket/db.rs | 119 ----- .../src/workers/socket/mod.rs | 104 ---- .../src/workers/socket/routes.rs | 65 --- .../src/workers/socket/tls.rs | 151 ------ .../src/workers/swarm/common.rs | 121 ----- aquatic_http_private/src/workers/swarm/mod.rs | 220 -------- 15 files changed, 3 insertions(+), 1673 deletions(-) delete mode 100644 aquatic_http_private/Cargo.toml delete mode 100644 aquatic_http_private/README.md delete mode 100644 aquatic_http_private/src/common.rs delete mode 100644 aquatic_http_private/src/config.rs delete mode 100644 aquatic_http_private/src/lib.rs delete mode 100644 aquatic_http_private/src/main.rs delete mode 100644 aquatic_http_private/src/workers/mod.rs delete mode 100644 aquatic_http_private/src/workers/socket/db.rs delete mode 100644 aquatic_http_private/src/workers/socket/mod.rs delete mode 100644 aquatic_http_private/src/workers/socket/routes.rs delete mode 100644 aquatic_http_private/src/workers/socket/tls.rs delete mode 100644 aquatic_http_private/src/workers/swarm/common.rs delete mode 100644 aquatic_http_private/src/workers/swarm/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 8425a34..8fe8394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,31 +147,6 @@ dependencies = [ "serde", ] -[[package]] -name = "aquatic_http_private" -version = "0.8.0" -dependencies = [ - "anyhow", - "aquatic_common", - "aquatic_http_protocol", - "aquatic_toml_config", - "axum", - "dotenv", - "futures-util", - "hex", - "hyper", - "log", - "mimalloc", - "rand", - "rustls", - "serde", - "signal-hook", - "socket2", - "sqlx", - "tokio", - "tokio-rustls", -] - [[package]] name = "aquatic_http_protocol" version = "0.8.0" @@ -409,15 +384,6 @@ dependencies = [ "tungstenite", ] -[[package]] -name = "atoi" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" -dependencies = [ - "num-traits", -] - [[package]] name = "atty" version = "0.2.14" @@ -446,7 +412,6 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "headers", "http", "http-body", "hyper", @@ -508,12 +473,6 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" -[[package]] -name = "base64ct" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" - [[package]] name = "bendy" version = "0.4.0-beta.2" @@ -682,12 +641,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "const-oid" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" - [[package]] name = "constant_time_eq" version = "0.1.5" @@ -709,21 +662,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" - [[package]] name = "crc32fast" version = "1.3.2" @@ -836,16 +774,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crypto-bigint" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21" -dependencies = [ - "generic-array", - "subtle", -] - [[package]] name = "crypto-common" version = "0.1.6" @@ -877,17 +805,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "der" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" -dependencies = [ - "const-oid", - "crypto-bigint", - "pem-rfc7468", -] - [[package]] name = "digest" version = "0.9.0" @@ -913,18 +830,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" -[[package]] -name = "dotenv" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" - -[[package]] -name = "dotenvy" -version = "0.15.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" - [[package]] name = "duplicate" version = "0.4.1" @@ -984,12 +889,6 @@ dependencies = [ "libc", ] -[[package]] -name = "event-listener" -version = "2.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" - [[package]] name = "fastrand" version = "1.9.0" @@ -1088,17 +987,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-intrusive" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" -dependencies = [ - "futures-core", - "lock_api", - "parking_lot 0.11.2", -] - [[package]] name = "futures-io" version = "0.3.27" @@ -1294,15 +1182,6 @@ dependencies = [ "serde", ] -[[package]] -name = "hashlink" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa" -dependencies = [ - "hashbrown 0.12.3", -] - [[package]] name = "hdrhistogram" version = "7.5.2" @@ -1317,39 +1196,11 @@ dependencies = [ "num-traits", ] -[[package]] -name = "headers" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" -dependencies = [ - "base64 0.13.1", - "bitflags 1.3.2", - "bytes", - "headers-core", - "http", - "httpdate", - "mime", - "sha1", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http", -] - [[package]] name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "hermit-abi" @@ -1558,9 +1409,6 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -dependencies = [ - "spin 0.5.2", -] [[package]] name = "libc" @@ -1682,7 +1530,7 @@ dependencies = [ "ipnet", "metrics", "metrics-util", - "parking_lot 0.12.1", + "parking_lot", "portable-atomic", "quanta", "thiserror", @@ -1711,7 +1559,7 @@ dependencies = [ "hashbrown 0.12.3", "metrics", "num_cpus", - "parking_lot 0.12.1", + "parking_lot", "portable-atomic", "quanta", "sketches-ddsketch 0.2.0", @@ -1822,34 +1670,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-bigint" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - -[[package]] -name = "num-bigint-dig" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2399c9463abc5f909349d8aa9ba080e0b88b3ce2885389b60b993f39b1a56905" -dependencies = [ - "byteorder", - "lazy_static", - "libm", - "num-integer", - "num-iter", - "num-traits", - "rand", - "smallvec", - "zeroize", -] - [[package]] name = "num-format" version = "0.4.4" @@ -1949,17 +1769,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.6", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -1967,21 +1776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.7", -] - -[[package]] -name = "parking_lot_core" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi 0.3.9", + "parking_lot_core", ] [[package]] @@ -1997,21 +1792,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "paste" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79" - -[[package]] -name = "pem-rfc7468" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01de5d978f34aa4b2296576379fcc416034702fd94117c56ffd8a1a767cefb30" -dependencies = [ - "base64ct", -] - [[package]] name = "percent-encoding" version = "2.2.0" @@ -2050,28 +1830,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "pkcs1" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a78f66c04ccc83dd4486fd46c33896f4e17b24a7a3a6400dedc48ed0ddd72320" -dependencies = [ - "der", - "pkcs8", - "zeroize", -] - -[[package]] -name = "pkcs8" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" -dependencies = [ - "der", - "spki", - "zeroize", -] - [[package]] name = "pkg-config" version = "0.3.26" @@ -2329,26 +2087,6 @@ dependencies = [ "libc", ] -[[package]] -name = "rsa" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cf22754c49613d2b3b119f0e5d46e34a2c628a937e3024b8762de4e7d8c710b" -dependencies = [ - "byteorder", - "digest 0.10.5", - "num-bigint-dig", - "num-integer", - "num-iter", - "num-traits", - "pkcs1", - "pkcs8", - "rand_core", - "smallvec", - "subtle", - "zeroize", -] - [[package]] name = "rustc-demangle" version = "0.1.21" @@ -2490,17 +2228,6 @@ dependencies = [ "digest 0.10.5", ] -[[package]] -name = "sha2" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.5", -] - [[package]] name = "signal-hook" version = "0.3.15" @@ -2626,138 +2353,12 @@ dependencies = [ "lock_api", ] -[[package]] -name = "spki" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" -dependencies = [ - "base64ct", - "der", -] - -[[package]] -name = "sqlformat" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e" -dependencies = [ - "itertools", - "nom", - "unicode_categories", -] - -[[package]] -name = "sqlx" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9249290c05928352f71c077cc44a464d880c63f26f7534728cca008e135c0428" -dependencies = [ - "sqlx-core", - "sqlx-macros", -] - -[[package]] -name = "sqlx-core" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105" -dependencies = [ - "ahash 0.7.6", - "atoi", - "bitflags 1.3.2", - "byteorder", - "bytes", - "crc", - "crossbeam-queue", - "digest 0.10.5", - "dotenvy", - "either", - "event-listener", - "futures-channel", - "futures-core", - "futures-intrusive", - "futures-util", - "generic-array", - "hashlink", - "hex", - "indexmap", - "itoa", - "libc", - "log", - "memchr", - "num-bigint", - "once_cell", - "paste", - "percent-encoding", - "rand", - "rsa", - "rustls", - "rustls-pemfile", - "sha1", - "sha2", - "smallvec", - "sqlformat", - "sqlx-rt", - "stringprep", - "thiserror", - "tokio-stream", - "url", - "webpki-roots", -] - -[[package]] -name = "sqlx-macros" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9" -dependencies = [ - "dotenvy", - "either", - "heck", - "once_cell", - "proc-macro2", - "quote", - "sha2", - "sqlx-core", - "sqlx-rt", - "syn", - "url", -] - -[[package]] -name = "sqlx-rt" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396" -dependencies = [ - "once_cell", - "tokio", - "tokio-rustls", -] - [[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "stringprep" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - -[[package]] -name = "subtle" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" - [[package]] name = "syn" version = "1.0.109" @@ -2865,52 +2466,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg", - "bytes", "libc", - "memchr", "mio", - "num_cpus", - "parking_lot 0.12.1", "pin-project-lite", - "signal-hook-registry", "socket2", - "tokio-macros", "windows-sys 0.45.0", ] -[[package]] -name = "tokio-macros" -version = "1.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls", - "tokio", - "webpki", -] - -[[package]] -name = "tokio-stream" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "toml" version = "0.5.11" @@ -3052,24 +2614,12 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" - [[package]] name = "unicode-width" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" -[[package]] -name = "unicode_categories" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" - [[package]] name = "untrusted" version = "0.7.1" @@ -3229,15 +2779,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "webpki-roots" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] - [[package]] name = "winapi" version = "0.2.8" @@ -3361,9 +2902,3 @@ name = "windows_x86_64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - -[[package]] -name = "zeroize" -version = "1.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" diff --git a/Cargo.toml b/Cargo.toml index a69cb14..1bd11d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ members = [ "aquatic_common", "aquatic_http", "aquatic_http_load_test", - "aquatic_http_private", "aquatic_http_protocol", "aquatic_toml_config", "aquatic_toml_config_derive", diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml deleted file mode 100644 index 2bcde27..0000000 --- a/aquatic_http_private/Cargo.toml +++ /dev/null @@ -1,38 +0,0 @@ -[package] -name = "aquatic_http_private" -keywords = ["http", "benchmark", "peer-to-peer", "torrent", "bittorrent"] -version.workspace = true -authors.workspace = true -edition.workspace = true -license.workspace = true -repository.workspace = true -readme.workspace = true -rust-version.workspace = true - -[lib] -name = "aquatic_http_private" - -[[bin]] -name = "aquatic_http_private" - -[dependencies] -aquatic_common = { workspace = true, features = ["rustls"] } -aquatic_http_protocol = { workspace = true, features = ["axum"] } -aquatic_toml_config.workspace = true - -anyhow = "1" -axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] } -dotenv = "0.15" -futures-util = { version = "0.3", default-features = false } -hex = "0.4" -hyper = "0.14" -log = "0.4" -mimalloc = { version = "0.1", default-features = false } -rand = { version = "0.8", features = ["small_rng"] } -rustls = "0.20" -serde = { version = "1", features = ["derive"] } -signal-hook = { version = "0.3" } -socket2 = { version = "0.4", features = ["all"] } -sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "mysql" ] } -tokio = { version = "1", features = ["full"] } -tokio-rustls = "0.23" diff --git a/aquatic_http_private/README.md b/aquatic_http_private/README.md deleted file mode 100644 index 615d6b2..0000000 --- a/aquatic_http_private/README.md +++ /dev/null @@ -1,96 +0,0 @@ -# aquatic_http_private - -HTTP (over TLS) BitTorrent tracker that calls a mysql stored procedure to -determine if requests can proceed. - -Work in progress. - -## Usage - -### Database setup - -* Create database (you will typically skip this step and use your own database): - -```sql -CREATE DATABASE aquatic_db; -``` - -* Create aquatic user (use a better password): - -```sql -CREATE USER 'aquatic'@localhost IDENTIFIED BY 'aquatic_password'; -``` - -* Create stored procedure `aquatic_announce_v1`: - -```sql --- Create stored procedure called by aquatic for each announce request. --- --- Set output parameter p_announce_allowed determines to true to allow announce. -CREATE OR REPLACE PROCEDURE aquatic_announce_v1 ( - -- Canonical source ip address (IPv4/IPv6) - IN p_source_ip VARBINARY(16), - -- Source port (not port where peer says it will accept BitTorrent requests) - IN p_source_port SMALLINT UNSIGNED, - -- User agent (can be NULL) - IN p_user_agent TEXT, - -- User token extracted from announce url ('/announce/USER_TOKEN/) - IN p_user_token VARCHAR(255), - -- Hex-encoded info hash - IN p_info_hash CHAR(40), - -- Peer ID. BINARY since it can be any bytes according to spec. - IN p_peer_id BINARY(20), - -- Event (started/stopped/completed) (can be NULL) - IN p_event VARCHAR(9), - -- Bytes uploaded. Passed directly from request. - IN p_uploaded BIGINT UNSIGNED, - -- Bytes downloaded. Passed directly from request. - IN p_downloaded BIGINT UNSIGNED, - -- Bytes left - IN p_left BIGINT UNSIGNED, - -- Return true to send annonunce response. Defaults to false if not set. - OUT p_announce_allowed BOOLEAN, - -- Optional failure reason. Defaults to NULL if not set. - OUT p_failure_reason TEXT, - -- Optional warning message. Defaults to NULL if not set. - OUT p_warning_message TEXT -) -MODIFIES SQL DATA -BEGIN - -- Replace with your custom code - SELECT true INTO p_announce_allowed; -END -``` - -* Give aquatic user permission to call stored procedure: - -```sql -GRANT EXECUTE ON PROCEDURE aquatic_db.aquatic_announce_v1 TO 'aquatic'@localhost; -FLUSH PRIVILEGES; -``` - -`CREATE OR REPLACE PROCEDURE` command, which leaves privileges in place, -requires MariaDB 10.1.3 or later. If your database does not support it, -each time you want to replace the procedure, you need to drop it, then -create it using `CREATE PROCEDURE` and grant execution privileges again. - -### Tracker setup - -* Install rust compiler and cmake - -* Create `.env` file with database credentials: - -```sh -DATABASE_URL="mysql://aquatic:aquatic_password@localhost/aquatic_db" -``` - -* Build and run tracker: - -```sh -# Build -cargo build --release -p aquatic_http_private -# Generate config file (remember to set paths to TLS cert and key) -./target/release/aquatic_http_private -p > http-private-config.toml -# Run tracker -./target/release/aquatic_http_private -c http-private-config.toml -``` diff --git a/aquatic_http_private/src/common.rs b/aquatic_http_private/src/common.rs deleted file mode 100644 index b1b247b..0000000 --- a/aquatic_http_private/src/common.rs +++ /dev/null @@ -1,52 +0,0 @@ -use tokio::sync::{mpsc, oneshot}; - -use aquatic_common::CanonicalSocketAddr; -use aquatic_http_protocol::{common::InfoHash, response::Response}; - -use crate::{config::Config, workers::socket::db::ValidatedAnnounceRequest}; - -#[derive(Debug)] -pub struct ChannelAnnounceRequest { - pub request: ValidatedAnnounceRequest, - pub source_addr: CanonicalSocketAddr, - pub response_sender: oneshot::Sender, -} - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct RequestWorkerIndex(pub usize); - -impl RequestWorkerIndex { - pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.swarm_workers) - } -} - -pub struct ChannelRequestSender(Vec>); - -impl ChannelRequestSender { - pub fn new(senders: Vec>) -> Self { - Self(senders) - } - - pub async fn send_to( - &self, - index: RequestWorkerIndex, - request: ValidatedAnnounceRequest, - source_addr: CanonicalSocketAddr, - ) -> anyhow::Result> { - let (response_sender, response_receiver) = oneshot::channel(); - - let request = ChannelAnnounceRequest { - request, - source_addr, - response_sender, - }; - - match self.0[index.0].send(request).await { - Ok(()) => Ok(response_receiver), - Err(err) => { - Err(anyhow::Error::new(err).context("error sending ChannelAnnounceRequest")) - } - } - } -} diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs deleted file mode 100644 index d678956..0000000 --- a/aquatic_http_private/src/config.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::{net::SocketAddr, path::PathBuf}; - -use aquatic_common::privileges::PrivilegeConfig; -use aquatic_toml_config::TomlConfig; -use serde::Deserialize; - -use aquatic_common::cli::LogLevel; - -/// aquatic_http_private configuration -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default, deny_unknown_fields)] -pub struct Config { - /// Socket workers receive requests from the socket, parse them and send - /// them on to the swarm workers. They then receive responses from the - /// swarm workers, encode them and send them back over the socket. - pub socket_workers: usize, - /// Swarm workers receive a number of requests from socket workers, - /// generate responses and send them back to the socket workers. - pub swarm_workers: usize, - pub worker_channel_size: usize, - /// Number of database connections to establish in each socket worker - pub db_connections_per_worker: u32, - pub log_level: LogLevel, - pub network: NetworkConfig, - pub protocol: ProtocolConfig, - pub cleaning: CleaningConfig, - pub privileges: PrivilegeConfig, -} - -impl Default for Config { - fn default() -> Self { - Self { - socket_workers: 1, - swarm_workers: 1, - worker_channel_size: 128, - db_connections_per_worker: 4, - log_level: LogLevel::default(), - network: NetworkConfig::default(), - protocol: ProtocolConfig::default(), - cleaning: CleaningConfig::default(), - privileges: PrivilegeConfig::default(), - } - } -} - -impl aquatic_common::cli::Config for Config { - fn get_log_level(&self) -> Option { - Some(self.log_level) - } -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default, deny_unknown_fields)] -pub struct NetworkConfig { - /// Bind to this address - pub address: SocketAddr, - /// Path to TLS certificate (DER-encoded X.509) - pub tls_certificate_path: PathBuf, - /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) - pub tls_private_key_path: PathBuf, - pub keep_alive: bool, -} - -impl Default for NetworkConfig { - fn default() -> Self { - Self { - address: SocketAddr::from(([0, 0, 0, 0], 3000)), - tls_certificate_path: "".into(), - tls_private_key_path: "".into(), - keep_alive: true, - } - } -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default, deny_unknown_fields)] -pub struct ProtocolConfig { - /// Maximum number of torrents to accept in scrape request - pub max_scrape_torrents: usize, - /// Maximum number of requested peers to accept in announce request - pub max_peers: usize, - /// Ask peers to announce this often (seconds) - pub peer_announce_interval: usize, -} - -impl Default for ProtocolConfig { - fn default() -> Self { - Self { - max_scrape_torrents: 100, - max_peers: 50, - peer_announce_interval: 300, - } - } -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default, deny_unknown_fields)] -pub struct CleaningConfig { - /// Clean peers this often (seconds) - pub torrent_cleaning_interval: u64, - /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u32, -} - -impl Default for CleaningConfig { - fn default() -> Self { - Self { - torrent_cleaning_interval: 30, - max_peer_age: 360, - } - } -} - -#[cfg(test)] -mod tests { - use super::Config; - - ::aquatic_toml_config::gen_serialize_deserialize_test!(Config); -} diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs deleted file mode 100644 index 4193025..0000000 --- a/aquatic_http_private/src/lib.rs +++ /dev/null @@ -1,103 +0,0 @@ -mod common; -pub mod config; -mod workers; - -use std::{collections::VecDeque, sync::Arc}; - -use aquatic_common::{ - privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher, - ServerStartInstant, -}; -use common::ChannelRequestSender; -use dotenv::dotenv; -use signal_hook::{consts::SIGTERM, iterator::Signals}; -use tokio::sync::mpsc::channel; - -use config::Config; - -pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tracker"; -pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); - -pub fn run(config: Config) -> anyhow::Result<()> { - let mut signals = Signals::new([SIGTERM])?; - - dotenv().ok(); - - let tls_config = Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?); - - let mut request_senders = Vec::new(); - let mut request_receivers = VecDeque::new(); - - for _ in 0..config.swarm_workers { - let (request_sender, request_receiver) = channel(config.worker_channel_size); - - request_senders.push(request_sender); - request_receivers.push_back(request_receiver); - } - - let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); - let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - - let server_start_instant = ServerStartInstant::new(); - - let mut handles = Vec::new(); - - for _ in 0..config.socket_workers { - let sentinel = sentinel.clone(); - let config = config.clone(); - let tls_config = tls_config.clone(); - let request_sender = ChannelRequestSender::new(request_senders.clone()); - let priv_dropper = priv_dropper.clone(); - - let handle = ::std::thread::Builder::new() - .name("socket".into()) - .spawn(move || { - workers::socket::run_socket_worker( - sentinel, - config, - tls_config, - request_sender, - priv_dropper, - ) - })?; - - handles.push(handle); - } - - for _ in 0..config.swarm_workers { - let sentinel = sentinel.clone(); - let config = config.clone(); - let request_receiver = request_receivers.pop_front().unwrap(); - - let handle = ::std::thread::Builder::new() - .name("request".into()) - .spawn(move || { - workers::swarm::run_swarm_worker( - sentinel, - config, - request_receiver, - server_start_instant, - ) - })?; - - handles.push(handle); - } - - for signal in &mut signals { - match signal { - SIGTERM => { - if sentinel_watcher.panic_was_triggered() { - return Err(anyhow::anyhow!("worker thread panicked")); - } else { - return Ok(()); - } - } - _ => unreachable!(), - } - } - - Ok(()) -} diff --git a/aquatic_http_private/src/main.rs b/aquatic_http_private/src/main.rs deleted file mode 100644 index caf3cbc..0000000 --- a/aquatic_http_private/src/main.rs +++ /dev/null @@ -1,14 +0,0 @@ -use aquatic_common::cli::run_app_with_cli_and_config; -use aquatic_http_private::config::Config; - -#[global_allocator] -static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; - -fn main() { - run_app_with_cli_and_config::( - aquatic_http_private::APP_NAME, - aquatic_http_private::APP_VERSION, - aquatic_http_private::run, - None, - ) -} diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs deleted file mode 100644 index 28ef095..0000000 --- a/aquatic_http_private/src/workers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod socket; -pub mod swarm; diff --git a/aquatic_http_private/src/workers/socket/db.rs b/aquatic_http_private/src/workers/socket/db.rs deleted file mode 100644 index 89b7396..0000000 --- a/aquatic_http_private/src/workers/socket/db.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::net::IpAddr; - -use aquatic_common::CanonicalSocketAddr; -use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse}; -use sqlx::{Executor, MySql, Pool}; - -#[derive(Debug)] -pub struct ValidatedAnnounceRequest(AnnounceRequest); - -impl Into for ValidatedAnnounceRequest { - fn into(self) -> AnnounceRequest { - self.0 - } -} - -#[derive(Debug, sqlx::FromRow)] -struct AnnounceProcedureResults { - announce_allowed: bool, - failure_reason: Option, - warning_message: Option, -} - -pub async fn validate_announce_request( - pool: &Pool, - source_addr: CanonicalSocketAddr, - user_agent: Option, - user_token: String, - request: AnnounceRequest, -) -> Result<(ValidatedAnnounceRequest, Option), FailureResponse> { - match call_announce_procedure(pool, source_addr, user_agent, user_token, &request).await { - Ok(results) => { - if results.announce_allowed { - Ok((ValidatedAnnounceRequest(request), results.warning_message)) - } else { - Err(FailureResponse::new( - results - .failure_reason - .unwrap_or_else(|| "Not allowed".into()), - )) - } - } - Err(err) => { - ::log::error!("announce procedure error: {:#}", err); - - Err(FailureResponse::new("Internal error")) - } - } -} - -async fn call_announce_procedure( - pool: &Pool, - source_addr: CanonicalSocketAddr, - user_agent: Option, - user_token: String, // FIXME: length - request: &AnnounceRequest, -) -> anyhow::Result { - let mut t = pool.begin().await?; - - t.execute( - " - SET - @p_announce_allowed = false, - @p_failure_reason = NULL, - @p_warning_message = NULL; - ", - ) - .await?; - - let q = sqlx::query( - " - CALL aquatic_announce_v1( - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - ?, - @p_announce_allowed, - @p_failure_reason, - @p_warning_message - ); - ", - ) - .bind(match source_addr.get().ip() { - IpAddr::V4(ip) => Vec::from(ip.octets()), - IpAddr::V6(ip) => Vec::from(ip.octets()), - }) - .bind(source_addr.get().port()) - .bind(user_agent) - .bind(user_token) - .bind(hex::encode(request.info_hash.0)) - .bind(&request.peer_id.0[..]) - .bind(request.event.as_str()) - .bind(request.bytes_uploaded as u64) - .bind(request.bytes_downloaded as u64) - .bind(request.bytes_left as u64); - - t.execute(q).await?; - - let response = sqlx::query_as::<_, AnnounceProcedureResults>( - " - SELECT - @p_announce_allowed as announce_allowed, - @p_failure_reason as failure_reason, - @p_warning_message as warning_message; - - ", - ) - .fetch_one(&mut t) - .await?; - - t.commit().await?; - - Ok(response) -} diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs deleted file mode 100644 index 24e561b..0000000 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ /dev/null @@ -1,104 +0,0 @@ -pub mod db; -mod routes; -mod tls; - -use std::{ - net::{SocketAddr, TcpListener}, - sync::Arc, -}; - -use anyhow::Context; -use aquatic_common::{privileges::PrivilegeDropper, rustls_config::RustlsConfig, PanicSentinel}; -use axum::{extract::connect_info::Connected, routing::get, Extension, Router}; -use hyper::server::conn::AddrIncoming; -use sqlx::mysql::MySqlPoolOptions; - -use self::tls::{TlsAcceptor, TlsStream}; -use crate::{common::ChannelRequestSender, config::Config}; - -impl<'a> Connected<&'a tls::TlsStream> for SocketAddr { - fn connect_info(target: &'a TlsStream) -> Self { - target.get_remote_addr() - } -} - -pub fn run_socket_worker( - _sentinel: PanicSentinel, - config: Config, - tls_config: Arc, - request_sender: ChannelRequestSender, - priv_dropper: PrivilegeDropper, -) -> anyhow::Result<()> { - let tcp_listener = create_tcp_listener(config.network.address, priv_dropper)?; - - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - runtime.block_on(run_app(config, tls_config, tcp_listener, request_sender))?; - - Ok(()) -} - -async fn run_app( - config: Config, - tls_config: Arc, - tcp_listener: TcpListener, - request_sender: ChannelRequestSender, -) -> anyhow::Result<()> { - let db_url = - ::std::env::var("DATABASE_URL").with_context(|| "Retrieve env var DATABASE_URL")?; - - let tls_acceptor = TlsAcceptor::new( - tls_config, - AddrIncoming::from_listener(tokio::net::TcpListener::from_std(tcp_listener)?)?, - ); - - let pool = MySqlPoolOptions::new() - .max_connections(config.db_connections_per_worker) - .connect(&db_url) - .await?; - - let app = Router::new() - .route("/announce/:user_token/", get(routes::announce)) - .layer(Extension(Arc::new(config.clone()))) - .layer(Extension(pool)) - .layer(Extension(Arc::new(request_sender))); - - axum::Server::builder(tls_acceptor) - .http1_keepalive(config.network.keep_alive) - .serve(app.into_make_service_with_connect_info::()) - .await?; - - Ok(()) -} - -fn create_tcp_listener( - addr: SocketAddr, - priv_dropper: PrivilegeDropper, -) -> anyhow::Result { - let domain = if addr.is_ipv4() { - socket2::Domain::IPV4 - } else { - socket2::Domain::IPV6 - }; - - let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; - - socket - .set_reuse_port(true) - .with_context(|| "set_reuse_port")?; - socket - .set_nonblocking(true) - .with_context(|| "set_nonblocking")?; - socket - .bind(&addr.into()) - .with_context(|| format!("bind to {}", addr))?; - socket - .listen(1024) - .with_context(|| format!("listen on {}", addr))?; - - priv_dropper.after_socket_creation()?; - - Ok(socket.into()) -} diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs deleted file mode 100644 index 1d83f29..0000000 --- a/aquatic_http_private/src/workers/socket/routes.rs +++ /dev/null @@ -1,65 +0,0 @@ -use aquatic_common::CanonicalSocketAddr; -use axum::{ - extract::{ConnectInfo, Path, RawQuery}, - headers::UserAgent, - Extension, TypedHeader, -}; -use sqlx::mysql::MySqlPool; -use std::{net::SocketAddr, sync::Arc}; - -use aquatic_http_protocol::{ - request::AnnounceRequest, - response::{FailureResponse, Response}, -}; - -use crate::{ - common::{ChannelRequestSender, RequestWorkerIndex}, - config::Config, -}; - -use super::db; - -pub async fn announce( - Extension(config): Extension>, - Extension(pool): Extension, - Extension(request_sender): Extension>, - ConnectInfo(source_addr): ConnectInfo, - opt_user_agent: Option>, - Path(user_token): Path, - RawQuery(query): RawQuery, -) -> Result { - let query = query.ok_or_else(|| FailureResponse::new("Empty query string"))?; - - let request = AnnounceRequest::from_query_string(&query) - .map_err(|_| FailureResponse::new("Malformed request"))?; - - let swarm_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash); - let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); - - let source_addr = CanonicalSocketAddr::new(source_addr); - - let (validated_request, opt_warning_message) = - db::validate_announce_request(&pool, source_addr, opt_user_agent, user_token, request) - .await?; - - let response_receiver = request_sender - .send_to(swarm_worker_index, validated_request, source_addr) - .await - .map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?; - - let mut response = response_receiver.await.map_err(|err| { - internal_error(format!("Receiving response over channel failed: {:#}", err)) - })?; - - if let Response::Announce(ref mut r) = response { - r.warning_message = opt_warning_message; - } - - Ok(response) -} - -fn internal_error(error: String) -> FailureResponse { - ::log::error!("{}", error); - - FailureResponse::new("Internal error") -} diff --git a/aquatic_http_private/src/workers/socket/tls.rs b/aquatic_http_private/src/workers/socket/tls.rs deleted file mode 100644 index 3828b29..0000000 --- a/aquatic_http_private/src/workers/socket/tls.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! hyper/rustls integration -//! -//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2, -//! otherwise HTTP/1.1 will be used. -//! -//! Based on https://github.com/rustls/hyper-rustls/blob/9b7b1220f74de9b249ce2b8f8b922fd00074c53b/examples/server.rs - -// ISC License (ISC) -// Copyright (c) 2016, Joseph Birr-Pixton -// -// Permission to use, copy, modify, and/or distribute this software for -// any purpose with or without fee is hereby granted, provided that the -// above copyright notice and this permission notice appear in all copies. -// -// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL -// WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED -// WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE -// AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL -// DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR -// PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS -// ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF -// THIS SOFTWARE. - -use core::task::{Context, Poll}; -use futures_util::ready; -use hyper::server::accept::Accept; -use hyper::server::conn::{AddrIncoming, AddrStream}; -use std::future::Future; -use std::io; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_rustls::rustls::ServerConfig; - -enum State { - Handshaking(tokio_rustls::Accept, SocketAddr), - Streaming(tokio_rustls::server::TlsStream), -} - -// tokio_rustls::server::TlsStream doesn't expose constructor methods, -// so we have to TlsAcceptor::accept and handshake to have access to it -// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first -pub struct TlsStream { - state: State, -} - -impl TlsStream { - fn new(stream: AddrStream, config: Arc) -> TlsStream { - let remote_addr = stream.remote_addr(); - let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream); - - TlsStream { - state: State::Handshaking(accept, remote_addr), - } - } - - pub fn get_remote_addr(&self) -> SocketAddr { - match &self.state { - State::Handshaking(_, remote_addr) => *remote_addr, - State::Streaming(stream) => stream.get_ref().0.remote_addr(), - } - } -} - -impl AsyncRead for TlsStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf, - ) -> Poll> { - let pin = self.get_mut(); - match pin.state { - State::Handshaking(ref mut accept, ref mut socket_addr) => { - match ready!(Pin::new(accept).poll(cx)) { - Ok(mut stream) => { - *socket_addr = stream.get_ref().0.remote_addr(); - let result = Pin::new(&mut stream).poll_read(cx, buf); - pin.state = State::Streaming(stream); - result - } - Err(err) => Poll::Ready(Err(err)), - } - } - State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf), - } - } -} - -impl AsyncWrite for TlsStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let pin = self.get_mut(); - match pin.state { - State::Handshaking(ref mut accept, _) => match ready!(Pin::new(accept).poll(cx)) { - Ok(mut stream) => { - let result = Pin::new(&mut stream).poll_write(cx, buf); - pin.state = State::Streaming(stream); - result - } - Err(err) => Poll::Ready(Err(err)), - }, - State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.state { - State::Handshaking(_, _) => Poll::Ready(Ok(())), - State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.state { - State::Handshaking(_, _) => Poll::Ready(Ok(())), - State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), - } - } -} - -pub struct TlsAcceptor { - config: Arc, - incoming: AddrIncoming, -} - -impl TlsAcceptor { - pub fn new(config: Arc, incoming: AddrIncoming) -> TlsAcceptor { - TlsAcceptor { config, incoming } - } -} - -impl Accept for TlsAcceptor { - type Conn = TlsStream; - type Error = io::Error; - - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let pin = self.get_mut(); - match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) { - Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))), - Some(Err(e)) => Poll::Ready(Some(Err(e))), - None => Poll::Ready(None), - } - } -} diff --git a/aquatic_http_private/src/workers/swarm/common.rs b/aquatic_http_private/src/workers/swarm/common.rs deleted file mode 100644 index 22ad497..0000000 --- a/aquatic_http_private/src/workers/swarm/common.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::net::{Ipv4Addr, Ipv6Addr}; - -use aquatic_common::{IndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil}; -use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId}; -use aquatic_http_protocol::response::ResponsePeer; - -pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} - -impl Ip for Ipv4Addr {} -impl Ip for Ipv6Addr {} - -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum PeerStatus { - Seeding, - Leeching, - Stopped, -} - -impl PeerStatus { - /// Determine peer status from announce event and number of bytes left. - /// - /// Likely, the last branch will be taken most of the time. - #[inline] - pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { - if let AnnounceEvent::Stopped = event { - Self::Stopped - } else if let Some(0) = opt_bytes_left { - Self::Seeding - } else { - Self::Leeching - } - } -} - -#[derive(Debug, Clone, Copy)] -pub struct Peer { - pub ip_address: I, - pub port: u16, - pub status: PeerStatus, - pub valid_until: ValidUntil, -} - -impl Peer { - pub fn to_response_peer(&self) -> ResponsePeer { - ResponsePeer { - ip_address: self.ip_address, - port: self.port, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PeerMapKey { - pub peer_id: PeerId, - pub ip_address: I, -} - -pub type PeerMap = IndexMap, Peer>; - -pub struct TorrentData { - pub peers: PeerMap, - pub num_seeders: usize, - pub num_leechers: usize, -} - -impl Default for TorrentData { - #[inline] - fn default() -> Self { - Self { - peers: Default::default(), - num_seeders: 0, - num_leechers: 0, - } - } -} - -pub type TorrentMap = IndexMap>; - -#[derive(Default)] -pub struct TorrentMaps { - pub ipv4: TorrentMap, - pub ipv6: TorrentMap, -} - -impl TorrentMaps { - pub fn clean(&mut self, server_start_instant: ServerStartInstant) { - let now = server_start_instant.seconds_elapsed(); - - Self::clean_torrent_map(&mut self.ipv4, now); - Self::clean_torrent_map(&mut self.ipv6, now); - } - - fn clean_torrent_map(torrent_map: &mut TorrentMap, now: SecondsSinceServerStart) { - torrent_map.retain(|_, torrent_data| { - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - if peer.valid_until.valid(now) { - true - } else { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - - false - } - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); - } -} diff --git a/aquatic_http_private/src/workers/swarm/mod.rs b/aquatic_http_private/src/workers/swarm/mod.rs deleted file mode 100644 index 45fb5fb..0000000 --- a/aquatic_http_private/src/workers/swarm/mod.rs +++ /dev/null @@ -1,220 +0,0 @@ -mod common; - -use std::cell::RefCell; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use std::rc::Rc; - -use aquatic_http_protocol::request::AnnounceRequest; -use rand::prelude::SmallRng; -use rand::SeedableRng; -use tokio::sync::mpsc::Receiver; -use tokio::task::LocalSet; -use tokio::time; - -use aquatic_common::{ - extract_response_peers, CanonicalSocketAddr, PanicSentinel, ServerStartInstant, ValidUntil, -}; -use aquatic_http_protocol::response::{ - AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6, -}; - -use crate::common::ChannelAnnounceRequest; -use crate::config::Config; - -use common::*; - -pub fn run_swarm_worker( - _sentinel: PanicSentinel, - config: Config, - request_receiver: Receiver, - server_start_instant: ServerStartInstant, -) -> anyhow::Result<()> { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - runtime.block_on(run_inner(config, request_receiver, server_start_instant))?; - - Ok(()) -} - -async fn run_inner( - config: Config, - mut request_receiver: Receiver, - server_start_instant: ServerStartInstant, -) -> anyhow::Result<()> { - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let mut rng = SmallRng::from_entropy(); - - LocalSet::new().spawn_local(periodically_clean_torrents( - config.clone(), - torrents.clone(), - server_start_instant, - )); - - loop { - let request = request_receiver - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("request channel closed"))?; - - let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); - - let response = handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut(), - valid_until, - request.source_addr, - request.request.into(), - ); - - let _ = request.response_sender.send(Response::Announce(response)); - } -} - -async fn periodically_clean_torrents( - config: Config, - torrents: Rc>, - server_start_instant: ServerStartInstant, -) { - let mut interval = time::interval(time::Duration::from_secs( - config.cleaning.torrent_cleaning_interval, - )); - - loop { - interval.tick().await; - - torrents.borrow_mut().clean(server_start_instant); - } -} - -fn handle_announce_request( - config: &Config, - rng: &mut SmallRng, - torrent_maps: &mut TorrentMaps, - valid_until: ValidUntil, - source_addr: CanonicalSocketAddr, - request: AnnounceRequest, -) -> AnnounceResponse { - match source_addr.get().ip() { - IpAddr::V4(source_ip) => { - let torrent_data: &mut TorrentData = - torrent_maps.ipv4.entry(request.info_hash).or_default(); - - let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( - config, - rng, - torrent_data, - source_ip, - request, - valid_until, - ); - - let response = AnnounceResponse { - complete: seeders, - incomplete: leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(response_peers), - peers6: ResponsePeerListV6(vec![]), - warning_message: None, - }; - - response - } - IpAddr::V6(source_ip) => { - let torrent_data: &mut TorrentData = - torrent_maps.ipv6.entry(request.info_hash).or_default(); - - let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( - config, - rng, - torrent_data, - source_ip, - request, - valid_until, - ); - - let response = AnnounceResponse { - complete: seeders, - incomplete: leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(vec![]), - peers6: ResponsePeerListV6(response_peers), - warning_message: None, - }; - - response - } - } -} - -/// Insert/update peer. Return num_seeders, num_leechers and response peers -pub fn upsert_peer_and_get_response_peers( - config: &Config, - rng: &mut SmallRng, - torrent_data: &mut TorrentData, - source_ip: I, - request: AnnounceRequest, - valid_until: ValidUntil, -) -> (usize, usize, Vec>) { - // Insert/update/remove peer who sent this request - - let peer_status = - PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); - - let peer = Peer { - ip_address: source_ip, - port: request.port, - status: peer_status, - valid_until, - }; - - let peer_map_key = PeerMapKey { - peer_id: request.peer_id, - ip_address: source_ip, - }; - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_map_key.clone(), peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_map_key.clone(), peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; - - let response_peers: Vec> = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_map_key, - Peer::to_response_peer, - ); - - ( - torrent_data.num_seeders, - torrent_data.num_leechers, - response_peers, - ) -}