Remove aquatic_http_private

This commit is contained in:
Joakim Frostegård 2023-04-10 00:20:44 +02:00
parent 2a7551d634
commit c8a08cb124
15 changed files with 3 additions and 1673 deletions

471
Cargo.lock generated
View file

@ -147,31 +147,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "aquatic_http_private"
version = "0.8.0"
dependencies = [
"anyhow",
"aquatic_common",
"aquatic_http_protocol",
"aquatic_toml_config",
"axum",
"dotenv",
"futures-util",
"hex",
"hyper",
"log",
"mimalloc",
"rand",
"rustls",
"serde",
"signal-hook",
"socket2",
"sqlx",
"tokio",
"tokio-rustls",
]
[[package]] [[package]]
name = "aquatic_http_protocol" name = "aquatic_http_protocol"
version = "0.8.0" version = "0.8.0"
@ -409,15 +384,6 @@ dependencies = [
"tungstenite", "tungstenite",
] ]
[[package]]
name = "atoi"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -446,7 +412,6 @@ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"bytes", "bytes",
"futures-util", "futures-util",
"headers",
"http", "http",
"http-body", "http-body",
"hyper", "hyper",
@ -508,12 +473,6 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "base64ct"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]] [[package]]
name = "bendy" name = "bendy"
version = "0.4.0-beta.2" version = "0.4.0-beta.2"
@ -682,12 +641,6 @@ dependencies = [
"windows-sys 0.42.0", "windows-sys 0.42.0",
] ]
[[package]]
name = "const-oid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
[[package]] [[package]]
name = "constant_time_eq" name = "constant_time_eq"
version = "0.1.5" version = "0.1.5"
@ -709,21 +662,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "crc"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.3.2" version = "1.3.2"
@ -836,16 +774,6 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "crypto-bigint"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21"
dependencies = [
"generic-array",
"subtle",
]
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.6" version = "0.1.6"
@ -877,17 +805,6 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "der"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
dependencies = [
"const-oid",
"crypto-bigint",
"pem-rfc7468",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.9.0" version = "0.9.0"
@ -913,18 +830,6 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "dotenvy"
version = "0.15.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0"
[[package]] [[package]]
name = "duplicate" name = "duplicate"
version = "0.4.1" version = "0.4.1"
@ -984,12 +889,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "1.9.0" version = "1.9.0"
@ -1088,17 +987,6 @@ dependencies = [
"futures-util", "futures-util",
] ]
[[package]]
name = "futures-intrusive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5"
dependencies = [
"futures-core",
"lock_api",
"parking_lot 0.11.2",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.27" version = "0.3.27"
@ -1294,15 +1182,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "hashlink"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa"
dependencies = [
"hashbrown 0.12.3",
]
[[package]] [[package]]
name = "hdrhistogram" name = "hdrhistogram"
version = "7.5.2" version = "7.5.2"
@ -1317,39 +1196,11 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "headers"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584"
dependencies = [
"base64 0.13.1",
"bitflags 1.3.2",
"bytes",
"headers-core",
"http",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.4.1" version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
dependencies = [
"unicode-segmentation",
]
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
@ -1558,9 +1409,6 @@ name = "lazy_static"
version = "1.4.0" version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin 0.5.2",
]
[[package]] [[package]]
name = "libc" name = "libc"
@ -1682,7 +1530,7 @@ dependencies = [
"ipnet", "ipnet",
"metrics", "metrics",
"metrics-util", "metrics-util",
"parking_lot 0.12.1", "parking_lot",
"portable-atomic", "portable-atomic",
"quanta", "quanta",
"thiserror", "thiserror",
@ -1711,7 +1559,7 @@ dependencies = [
"hashbrown 0.12.3", "hashbrown 0.12.3",
"metrics", "metrics",
"num_cpus", "num_cpus",
"parking_lot 0.12.1", "parking_lot",
"portable-atomic", "portable-atomic",
"quanta", "quanta",
"sketches-ddsketch 0.2.0", "sketches-ddsketch 0.2.0",
@ -1822,34 +1670,6 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "num-bigint"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2399c9463abc5f909349d8aa9ba080e0b88b3ce2885389b60b993f39b1a56905"
dependencies = [
"byteorder",
"lazy_static",
"libm",
"num-integer",
"num-iter",
"num-traits",
"rand",
"smallvec",
"zeroize",
]
[[package]] [[package]]
name = "num-format" name = "num-format"
version = "0.4.4" version = "0.4.4"
@ -1949,17 +1769,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"
@ -1967,21 +1776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [ dependencies = [
"lock_api", "lock_api",
"parking_lot_core 0.9.7", "parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
] ]
[[package]] [[package]]
@ -1997,21 +1792,6 @@ dependencies = [
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
[[package]]
name = "paste"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79"
[[package]]
name = "pem-rfc7468"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01de5d978f34aa4b2296576379fcc416034702fd94117c56ffd8a1a767cefb30"
dependencies = [
"base64ct",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.2.0" version = "2.2.0"
@ -2050,28 +1830,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a78f66c04ccc83dd4486fd46c33896f4e17b24a7a3a6400dedc48ed0ddd72320"
dependencies = [
"der",
"pkcs8",
"zeroize",
]
[[package]]
name = "pkcs8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0"
dependencies = [
"der",
"spki",
"zeroize",
]
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.26" version = "0.3.26"
@ -2329,26 +2087,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "rsa"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cf22754c49613d2b3b119f0e5d46e34a2c628a937e3024b8762de4e7d8c710b"
dependencies = [
"byteorder",
"digest 0.10.5",
"num-bigint-dig",
"num-integer",
"num-iter",
"num-traits",
"pkcs1",
"pkcs8",
"rand_core",
"smallvec",
"subtle",
"zeroize",
]
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.21" version = "0.1.21"
@ -2490,17 +2228,6 @@ dependencies = [
"digest 0.10.5", "digest 0.10.5",
] ]
[[package]]
name = "sha2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.5",
]
[[package]] [[package]]
name = "signal-hook" name = "signal-hook"
version = "0.3.15" version = "0.3.15"
@ -2626,138 +2353,12 @@ dependencies = [
"lock_api", "lock_api",
] ]
[[package]]
name = "spki"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27"
dependencies = [
"base64ct",
"der",
]
[[package]]
name = "sqlformat"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
dependencies = [
"itertools",
"nom",
"unicode_categories",
]
[[package]]
name = "sqlx"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9249290c05928352f71c077cc44a464d880c63f26f7534728cca008e135c0428"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105"
dependencies = [
"ahash 0.7.6",
"atoi",
"bitflags 1.3.2",
"byteorder",
"bytes",
"crc",
"crossbeam-queue",
"digest 0.10.5",
"dotenvy",
"either",
"event-listener",
"futures-channel",
"futures-core",
"futures-intrusive",
"futures-util",
"generic-array",
"hashlink",
"hex",
"indexmap",
"itoa",
"libc",
"log",
"memchr",
"num-bigint",
"once_cell",
"paste",
"percent-encoding",
"rand",
"rsa",
"rustls",
"rustls-pemfile",
"sha1",
"sha2",
"smallvec",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"tokio-stream",
"url",
"webpki-roots",
]
[[package]]
name = "sqlx-macros"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9"
dependencies = [
"dotenvy",
"either",
"heck",
"once_cell",
"proc-macro2",
"quote",
"sha2",
"sqlx-core",
"sqlx-rt",
"syn",
"url",
]
[[package]]
name = "sqlx-rt"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396"
dependencies = [
"once_cell",
"tokio",
"tokio-rustls",
]
[[package]] [[package]]
name = "static_assertions" name = "static_assertions"
version = "1.1.0" version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "subtle"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.109" version = "1.0.109"
@ -2865,52 +2466,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes",
"libc", "libc",
"memchr",
"mio", "mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry",
"socket2", "socket2",
"tokio-macros",
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
[[package]]
name = "tokio-macros"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.5.11" version = "0.5.11"
@ -3052,24 +2614,12 @@ dependencies = [
"tinyvec", "tinyvec",
] ]
[[package]]
name = "unicode-segmentation"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36"
[[package]] [[package]]
name = "unicode-width" name = "unicode-width"
version = "0.1.10" version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.7.1"
@ -3229,15 +2779,6 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.2.8" version = "0.2.8"
@ -3361,9 +2902,3 @@ name = "windows_x86_64_msvc"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "zeroize"
version = "1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"

View file

@ -4,7 +4,6 @@ members = [
"aquatic_common", "aquatic_common",
"aquatic_http", "aquatic_http",
"aquatic_http_load_test", "aquatic_http_load_test",
"aquatic_http_private",
"aquatic_http_protocol", "aquatic_http_protocol",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_toml_config_derive", "aquatic_toml_config_derive",

View file

@ -1,38 +0,0 @@
[package]
name = "aquatic_http_private"
keywords = ["http", "benchmark", "peer-to-peer", "torrent", "bittorrent"]
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
readme.workspace = true
rust-version.workspace = true
[lib]
name = "aquatic_http_private"
[[bin]]
name = "aquatic_http_private"
[dependencies]
aquatic_common = { workspace = true, features = ["rustls"] }
aquatic_http_protocol = { workspace = true, features = ["axum"] }
aquatic_toml_config.workspace = true
anyhow = "1"
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
dotenv = "0.15"
futures-util = { version = "0.3", default-features = false }
hex = "0.4"
hyper = "0.14"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
rand = { version = "0.8", features = ["small_rng"] }
rustls = "0.20"
serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.23"

View file

@ -1,96 +0,0 @@
# aquatic_http_private
HTTP (over TLS) BitTorrent tracker that calls a mysql stored procedure to
determine if requests can proceed.
Work in progress.
## Usage
### Database setup
* Create database (you will typically skip this step and use your own database):
```sql
CREATE DATABASE aquatic_db;
```
* Create aquatic user (use a better password):
```sql
CREATE USER 'aquatic'@localhost IDENTIFIED BY 'aquatic_password';
```
* Create stored procedure `aquatic_announce_v1`:
```sql
-- Create stored procedure called by aquatic for each announce request.
--
-- Set output parameter p_announce_allowed determines to true to allow announce.
CREATE OR REPLACE PROCEDURE aquatic_announce_v1 (
-- Canonical source ip address (IPv4/IPv6)
IN p_source_ip VARBINARY(16),
-- Source port (not port where peer says it will accept BitTorrent requests)
IN p_source_port SMALLINT UNSIGNED,
-- User agent (can be NULL)
IN p_user_agent TEXT,
-- User token extracted from announce url ('/announce/USER_TOKEN/)
IN p_user_token VARCHAR(255),
-- Hex-encoded info hash
IN p_info_hash CHAR(40),
-- Peer ID. BINARY since it can be any bytes according to spec.
IN p_peer_id BINARY(20),
-- Event (started/stopped/completed) (can be NULL)
IN p_event VARCHAR(9),
-- Bytes uploaded. Passed directly from request.
IN p_uploaded BIGINT UNSIGNED,
-- Bytes downloaded. Passed directly from request.
IN p_downloaded BIGINT UNSIGNED,
-- Bytes left
IN p_left BIGINT UNSIGNED,
-- Return true to send annonunce response. Defaults to false if not set.
OUT p_announce_allowed BOOLEAN,
-- Optional failure reason. Defaults to NULL if not set.
OUT p_failure_reason TEXT,
-- Optional warning message. Defaults to NULL if not set.
OUT p_warning_message TEXT
)
MODIFIES SQL DATA
BEGIN
-- Replace with your custom code
SELECT true INTO p_announce_allowed;
END
```
* Give aquatic user permission to call stored procedure:
```sql
GRANT EXECUTE ON PROCEDURE aquatic_db.aquatic_announce_v1 TO 'aquatic'@localhost;
FLUSH PRIVILEGES;
```
`CREATE OR REPLACE PROCEDURE` command, which leaves privileges in place,
requires MariaDB 10.1.3 or later. If your database does not support it,
each time you want to replace the procedure, you need to drop it, then
create it using `CREATE PROCEDURE` and grant execution privileges again.
### Tracker setup
* Install rust compiler and cmake
* Create `.env` file with database credentials:
```sh
DATABASE_URL="mysql://aquatic:aquatic_password@localhost/aquatic_db"
```
* Build and run tracker:
```sh
# Build
cargo build --release -p aquatic_http_private
# Generate config file (remember to set paths to TLS cert and key)
./target/release/aquatic_http_private -p > http-private-config.toml
# Run tracker
./target/release/aquatic_http_private -c http-private-config.toml
```

View file

@ -1,52 +0,0 @@
use tokio::sync::{mpsc, oneshot};
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::{common::InfoHash, response::Response};
use crate::{config::Config, workers::socket::db::ValidatedAnnounceRequest};
#[derive(Debug)]
pub struct ChannelAnnounceRequest {
pub request: ValidatedAnnounceRequest,
pub source_addr: CanonicalSocketAddr,
pub response_sender: oneshot::Sender<Response>,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct RequestWorkerIndex(pub usize);
impl RequestWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.swarm_workers)
}
}
pub struct ChannelRequestSender(Vec<mpsc::Sender<ChannelAnnounceRequest>>);
impl ChannelRequestSender {
pub fn new(senders: Vec<mpsc::Sender<ChannelAnnounceRequest>>) -> Self {
Self(senders)
}
pub async fn send_to(
&self,
index: RequestWorkerIndex,
request: ValidatedAnnounceRequest,
source_addr: CanonicalSocketAddr,
) -> anyhow::Result<oneshot::Receiver<Response>> {
let (response_sender, response_receiver) = oneshot::channel();
let request = ChannelAnnounceRequest {
request,
source_addr,
response_sender,
};
match self.0[index.0].send(request).await {
Ok(()) => Ok(response_receiver),
Err(err) => {
Err(anyhow::Error::new(err).context("error sending ChannelAnnounceRequest"))
}
}
}
}

View file

@ -1,119 +0,0 @@
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::privileges::PrivilegeConfig;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
use aquatic_common::cli::LogLevel;
/// aquatic_http_private configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the swarm workers. They then receive responses from the
/// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
/// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
pub swarm_workers: usize,
pub worker_channel_size: usize,
/// Number of database connections to establish in each socket worker
pub db_connections_per_worker: u32,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
swarm_workers: 1,
worker_channel_size: 128,
db_connections_per_worker: 4,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
}
}
}
impl aquatic_common::cli::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf,
pub keep_alive: bool,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)),
tls_certificate_path: "".into(),
tls_private_key_path: "".into(),
keep_alive: true,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of requested peers to accept in announce request
pub max_peers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
max_scrape_torrents: 100,
max_peers: 50,
peer_announce_interval: 300,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,
/// Remove peers that have not announced for this long (seconds)
pub max_peer_age: u32,
}
impl Default for CleaningConfig {
fn default() -> Self {
Self {
torrent_cleaning_interval: 30,
max_peer_age: 360,
}
}
}
#[cfg(test)]
mod tests {
use super::Config;
::aquatic_toml_config::gen_serialize_deserialize_test!(Config);
}

View file

@ -1,103 +0,0 @@
mod common;
pub mod config;
mod workers;
use std::{collections::VecDeque, sync::Arc};
use aquatic_common::{
privileges::PrivilegeDropper, rustls_config::create_rustls_config, PanicSentinelWatcher,
ServerStartInstant,
};
use common::ChannelRequestSender;
use dotenv::dotenv;
use signal_hook::{consts::SIGTERM, iterator::Signals};
use tokio::sync::mpsc::channel;
use config::Config;
pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> anyhow::Result<()> {
let mut signals = Signals::new([SIGTERM])?;
dotenv().ok();
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut request_senders = Vec::new();
let mut request_receivers = VecDeque::new();
for _ in 0..config.swarm_workers {
let (request_sender, request_receiver) = channel(config.worker_channel_size);
request_senders.push(request_sender);
request_receivers.push_back(request_receiver);
}
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let server_start_instant = ServerStartInstant::new();
let mut handles = Vec::new();
for _ in 0..config.socket_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let tls_config = tls_config.clone();
let request_sender = ChannelRequestSender::new(request_senders.clone());
let priv_dropper = priv_dropper.clone();
let handle = ::std::thread::Builder::new()
.name("socket".into())
.spawn(move || {
workers::socket::run_socket_worker(
sentinel,
config,
tls_config,
request_sender,
priv_dropper,
)
})?;
handles.push(handle);
}
for _ in 0..config.swarm_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let request_receiver = request_receivers.pop_front().unwrap();
let handle = ::std::thread::Builder::new()
.name("request".into())
.spawn(move || {
workers::swarm::run_swarm_worker(
sentinel,
config,
request_receiver,
server_start_instant,
)
})?;
handles.push(handle);
}
for signal in &mut signals {
match signal {
SIGTERM => {
if sentinel_watcher.panic_was_triggered() {
return Err(anyhow::anyhow!("worker thread panicked"));
} else {
return Ok(());
}
}
_ => unreachable!(),
}
}
Ok(())
}

View file

@ -1,14 +0,0 @@
use aquatic_common::cli::run_app_with_cli_and_config;
use aquatic_http_private::config::Config;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn main() {
run_app_with_cli_and_config::<Config>(
aquatic_http_private::APP_NAME,
aquatic_http_private::APP_VERSION,
aquatic_http_private::run,
None,
)
}

View file

@ -1,2 +0,0 @@
pub mod socket;
pub mod swarm;

View file

@ -1,119 +0,0 @@
use std::net::IpAddr;
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse};
use sqlx::{Executor, MySql, Pool};
#[derive(Debug)]
pub struct ValidatedAnnounceRequest(AnnounceRequest);
impl Into<AnnounceRequest> for ValidatedAnnounceRequest {
fn into(self) -> AnnounceRequest {
self.0
}
}
#[derive(Debug, sqlx::FromRow)]
struct AnnounceProcedureResults {
announce_allowed: bool,
failure_reason: Option<String>,
warning_message: Option<String>,
}
pub async fn validate_announce_request(
pool: &Pool<MySql>,
source_addr: CanonicalSocketAddr,
user_agent: Option<String>,
user_token: String,
request: AnnounceRequest,
) -> Result<(ValidatedAnnounceRequest, Option<String>), FailureResponse> {
match call_announce_procedure(pool, source_addr, user_agent, user_token, &request).await {
Ok(results) => {
if results.announce_allowed {
Ok((ValidatedAnnounceRequest(request), results.warning_message))
} else {
Err(FailureResponse::new(
results
.failure_reason
.unwrap_or_else(|| "Not allowed".into()),
))
}
}
Err(err) => {
::log::error!("announce procedure error: {:#}", err);
Err(FailureResponse::new("Internal error"))
}
}
}
async fn call_announce_procedure(
pool: &Pool<MySql>,
source_addr: CanonicalSocketAddr,
user_agent: Option<String>,
user_token: String, // FIXME: length
request: &AnnounceRequest,
) -> anyhow::Result<AnnounceProcedureResults> {
let mut t = pool.begin().await?;
t.execute(
"
SET
@p_announce_allowed = false,
@p_failure_reason = NULL,
@p_warning_message = NULL;
",
)
.await?;
let q = sqlx::query(
"
CALL aquatic_announce_v1(
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
@p_announce_allowed,
@p_failure_reason,
@p_warning_message
);
",
)
.bind(match source_addr.get().ip() {
IpAddr::V4(ip) => Vec::from(ip.octets()),
IpAddr::V6(ip) => Vec::from(ip.octets()),
})
.bind(source_addr.get().port())
.bind(user_agent)
.bind(user_token)
.bind(hex::encode(request.info_hash.0))
.bind(&request.peer_id.0[..])
.bind(request.event.as_str())
.bind(request.bytes_uploaded as u64)
.bind(request.bytes_downloaded as u64)
.bind(request.bytes_left as u64);
t.execute(q).await?;
let response = sqlx::query_as::<_, AnnounceProcedureResults>(
"
SELECT
@p_announce_allowed as announce_allowed,
@p_failure_reason as failure_reason,
@p_warning_message as warning_message;
",
)
.fetch_one(&mut t)
.await?;
t.commit().await?;
Ok(response)
}

View file

@ -1,104 +0,0 @@
pub mod db;
mod routes;
mod tls;
use std::{
net::{SocketAddr, TcpListener},
sync::Arc,
};
use anyhow::Context;
use aquatic_common::{privileges::PrivilegeDropper, rustls_config::RustlsConfig, PanicSentinel};
use axum::{extract::connect_info::Connected, routing::get, Extension, Router};
use hyper::server::conn::AddrIncoming;
use sqlx::mysql::MySqlPoolOptions;
use self::tls::{TlsAcceptor, TlsStream};
use crate::{common::ChannelRequestSender, config::Config};
impl<'a> Connected<&'a tls::TlsStream> for SocketAddr {
fn connect_info(target: &'a TlsStream) -> Self {
target.get_remote_addr()
}
}
pub fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
tls_config: Arc<RustlsConfig>,
request_sender: ChannelRequestSender,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
let tcp_listener = create_tcp_listener(config.network.address, priv_dropper)?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(run_app(config, tls_config, tcp_listener, request_sender))?;
Ok(())
}
async fn run_app(
config: Config,
tls_config: Arc<RustlsConfig>,
tcp_listener: TcpListener,
request_sender: ChannelRequestSender,
) -> anyhow::Result<()> {
let db_url =
::std::env::var("DATABASE_URL").with_context(|| "Retrieve env var DATABASE_URL")?;
let tls_acceptor = TlsAcceptor::new(
tls_config,
AddrIncoming::from_listener(tokio::net::TcpListener::from_std(tcp_listener)?)?,
);
let pool = MySqlPoolOptions::new()
.max_connections(config.db_connections_per_worker)
.connect(&db_url)
.await?;
let app = Router::new()
.route("/announce/:user_token/", get(routes::announce))
.layer(Extension(Arc::new(config.clone())))
.layer(Extension(pool))
.layer(Extension(Arc::new(request_sender)));
axum::Server::builder(tls_acceptor)
.http1_keepalive(config.network.keep_alive)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await?;
Ok(())
}
fn create_tcp_listener(
addr: SocketAddr,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<TcpListener> {
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
socket
.set_reuse_port(true)
.with_context(|| "set_reuse_port")?;
socket
.set_nonblocking(true)
.with_context(|| "set_nonblocking")?;
socket
.bind(&addr.into())
.with_context(|| format!("bind to {}", addr))?;
socket
.listen(1024)
.with_context(|| format!("listen on {}", addr))?;
priv_dropper.after_socket_creation()?;
Ok(socket.into())
}

View file

@ -1,65 +0,0 @@
use aquatic_common::CanonicalSocketAddr;
use axum::{
extract::{ConnectInfo, Path, RawQuery},
headers::UserAgent,
Extension, TypedHeader,
};
use sqlx::mysql::MySqlPool;
use std::{net::SocketAddr, sync::Arc};
use aquatic_http_protocol::{
request::AnnounceRequest,
response::{FailureResponse, Response},
};
use crate::{
common::{ChannelRequestSender, RequestWorkerIndex},
config::Config,
};
use super::db;
pub async fn announce(
Extension(config): Extension<Arc<Config>>,
Extension(pool): Extension<MySqlPool>,
Extension(request_sender): Extension<Arc<ChannelRequestSender>>,
ConnectInfo(source_addr): ConnectInfo<SocketAddr>,
opt_user_agent: Option<TypedHeader<UserAgent>>,
Path(user_token): Path<String>,
RawQuery(query): RawQuery,
) -> Result<Response, FailureResponse> {
let query = query.ok_or_else(|| FailureResponse::new("Empty query string"))?;
let request = AnnounceRequest::from_query_string(&query)
.map_err(|_| FailureResponse::new("Malformed request"))?;
let swarm_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash);
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
let source_addr = CanonicalSocketAddr::new(source_addr);
let (validated_request, opt_warning_message) =
db::validate_announce_request(&pool, source_addr, opt_user_agent, user_token, request)
.await?;
let response_receiver = request_sender
.send_to(swarm_worker_index, validated_request, source_addr)
.await
.map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?;
let mut response = response_receiver.await.map_err(|err| {
internal_error(format!("Receiving response over channel failed: {:#}", err))
})?;
if let Response::Announce(ref mut r) = response {
r.warning_message = opt_warning_message;
}
Ok(response)
}
fn internal_error(error: String) -> FailureResponse {
::log::error!("{}", error);
FailureResponse::new("Internal error")
}

View file

@ -1,151 +0,0 @@
//! hyper/rustls integration
//!
//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2,
//! otherwise HTTP/1.1 will be used.
//!
//! Based on https://github.com/rustls/hyper-rustls/blob/9b7b1220f74de9b249ce2b8f8b922fd00074c53b/examples/server.rs
// ISC License (ISC)
// Copyright (c) 2016, Joseph Birr-Pixton <jpixton@gmail.com>
//
// Permission to use, copy, modify, and/or distribute this software for
// any purpose with or without fee is hereby granted, provided that the
// above copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
// WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
// AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
// DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
// PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
// ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
use core::task::{Context, Poll};
use futures_util::ready;
use hyper::server::accept::Accept;
use hyper::server::conn::{AddrIncoming, AddrStream};
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::rustls::ServerConfig;
enum State {
Handshaking(tokio_rustls::Accept<AddrStream>, SocketAddr),
Streaming(tokio_rustls::server::TlsStream<AddrStream>),
}
// tokio_rustls::server::TlsStream doesn't expose constructor methods,
// so we have to TlsAcceptor::accept and handshake to have access to it
// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first
pub struct TlsStream {
state: State,
}
impl TlsStream {
fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream {
let remote_addr = stream.remote_addr();
let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream);
TlsStream {
state: State::Handshaking(accept, remote_addr),
}
}
pub fn get_remote_addr(&self) -> SocketAddr {
match &self.state {
State::Handshaking(_, remote_addr) => *remote_addr,
State::Streaming(stream) => stream.get_ref().0.remote_addr(),
}
}
}
impl AsyncRead for TlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept, ref mut socket_addr) => {
match ready!(Pin::new(accept).poll(cx)) {
Ok(mut stream) => {
*socket_addr = stream.get_ref().0.remote_addr();
let result = Pin::new(&mut stream).poll_read(cx, buf);
pin.state = State::Streaming(stream);
result
}
Err(err) => Poll::Ready(Err(err)),
}
}
State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWrite for TlsStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept, _) => match ready!(Pin::new(accept).poll(cx)) {
Ok(mut stream) => {
let result = Pin::new(&mut stream).poll_write(cx, buf);
pin.state = State::Streaming(stream);
result
}
Err(err) => Poll::Ready(Err(err)),
},
State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.state {
State::Handshaking(_, _) => Poll::Ready(Ok(())),
State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.state {
State::Handshaking(_, _) => Poll::Ready(Ok(())),
State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx),
}
}
}
pub struct TlsAcceptor {
config: Arc<ServerConfig>,
incoming: AddrIncoming,
}
impl TlsAcceptor {
pub fn new(config: Arc<ServerConfig>, incoming: AddrIncoming) -> TlsAcceptor {
TlsAcceptor { config, incoming }
}
}
impl Accept for TlsAcceptor {
type Conn = TlsStream;
type Error = io::Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let pin = self.get_mut();
match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) {
Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
}
}
}

View file

@ -1,121 +0,0 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use aquatic_common::{IndexMap, SecondsSinceServerStart, ServerStartInstant, ValidUntil};
use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId};
use aquatic_http_protocol::response::ResponsePeer;
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
impl Ip for Ipv4Addr {}
impl Ip for Ipv6Addr {}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
Leeching,
Stopped,
}
impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
#[inline]
pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
if let AnnounceEvent::Stopped = event {
Self::Stopped
} else if let Some(0) = opt_bytes_left {
Self::Seeding
} else {
Self::Leeching
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Peer<I: Ip> {
pub ip_address: I,
pub port: u16,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
pub fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PeerMapKey<I: Ip> {
pub peer_id: PeerId,
pub ip_address: I,
}
pub type PeerMap<I> = IndexMap<PeerMapKey<I>, Peer<I>>;
pub struct TorrentData<I: Ip> {
pub peers: PeerMap<I>,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl<I: Ip> Default for TorrentData<I> {
#[inline]
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
pub type TorrentMap<I> = IndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)]
pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>,
}
impl TorrentMaps {
pub fn clean(&mut self, server_start_instant: ServerStartInstant) {
let now = server_start_instant.seconds_elapsed();
Self::clean_torrent_map(&mut self.ipv4, now);
Self::clean_torrent_map(&mut self.ipv6, now);
}
fn clean_torrent_map<I: Ip>(torrent_map: &mut TorrentMap<I>, now: SecondsSinceServerStart) {
torrent_map.retain(|_, torrent_data| {
let num_seeders = &mut torrent_data.num_seeders;
let num_leechers = &mut torrent_data.num_leechers;
torrent_data.peers.retain(|_, peer| {
if peer.valid_until.valid(now) {
true
} else {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
false
}
});
!torrent_data.peers.is_empty()
});
torrent_map.shrink_to_fit();
}
}

View file

@ -1,220 +0,0 @@
mod common;
use std::cell::RefCell;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::rc::Rc;
use aquatic_http_protocol::request::AnnounceRequest;
use rand::prelude::SmallRng;
use rand::SeedableRng;
use tokio::sync::mpsc::Receiver;
use tokio::task::LocalSet;
use tokio::time;
use aquatic_common::{
extract_response_peers, CanonicalSocketAddr, PanicSentinel, ServerStartInstant, ValidUntil,
};
use aquatic_http_protocol::response::{
AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6,
};
use crate::common::ChannelAnnounceRequest;
use crate::config::Config;
use common::*;
pub fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
request_receiver: Receiver<ChannelAnnounceRequest>,
server_start_instant: ServerStartInstant,
) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(run_inner(config, request_receiver, server_start_instant))?;
Ok(())
}
async fn run_inner(
config: Config,
mut request_receiver: Receiver<ChannelAnnounceRequest>,
server_start_instant: ServerStartInstant,
) -> anyhow::Result<()> {
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let mut rng = SmallRng::from_entropy();
LocalSet::new().spawn_local(periodically_clean_torrents(
config.clone(),
torrents.clone(),
server_start_instant,
));
loop {
let request = request_receiver
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("request channel closed"))?;
let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
let response = handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
valid_until,
request.source_addr,
request.request.into(),
);
let _ = request.response_sender.send(Response::Announce(response));
}
}
async fn periodically_clean_torrents(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
server_start_instant: ServerStartInstant,
) {
let mut interval = time::interval(time::Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
loop {
interval.tick().await;
torrents.borrow_mut().clean(server_start_instant);
}
}
fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrent_maps: &mut TorrentMaps,
valid_until: ValidUntil,
source_addr: CanonicalSocketAddr,
request: AnnounceRequest,
) -> AnnounceResponse {
match source_addr.get().ip() {
IpAddr::V4(source_ip) => {
let torrent_data: &mut TorrentData<Ipv4Addr> =
torrent_maps.ipv4.entry(request.info_hash).or_default();
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
torrent_data,
source_ip,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(response_peers),
peers6: ResponsePeerListV6(vec![]),
warning_message: None,
};
response
}
IpAddr::V6(source_ip) => {
let torrent_data: &mut TorrentData<Ipv6Addr> =
torrent_maps.ipv6.entry(request.info_hash).or_default();
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
torrent_data,
source_ip,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(vec![]),
peers6: ResponsePeerListV6(response_peers),
warning_message: None,
};
response
}
}
}
/// Insert/update peer. Return num_seeders, num_leechers and response peers
pub fn upsert_peer_and_get_response_peers<I: Ip>(
config: &Config,
rng: &mut SmallRng,
torrent_data: &mut TorrentData<I>,
source_ip: I,
request: AnnounceRequest,
valid_until: ValidUntil,
) -> (usize, usize, Vec<ResponsePeer<I>>) {
// Insert/update/remove peer who sent this request
let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
let peer = Peer {
ip_address: source_ip,
port: request.port,
status: peer_status,
valid_until,
};
let peer_map_key = PeerMapKey {
peer_id: request.peer_id,
ip_address: source_ip,
};
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
let max_num_peers_to_take = match request.numwant {
Some(0) | None => config.protocol.max_peers,
Some(numwant) => numwant.min(config.protocol.max_peers),
};
let response_peers: Vec<ResponsePeer<I>> = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
peer_map_key,
Peer::to_response_peer,
);
(
torrent_data.num_seeders,
torrent_data.num_leechers,
response_peers,
)
}