Merge pull request #157 from greatest-ape/fixes-1

http: allow disabling TLS, allow reverse proxies, general fixes; improve ws code; cargo update
This commit is contained in:
Joakim Frostegård 2023-11-17 18:33:40 +01:00 committed by GitHub
commit 3f2a87b10f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1384 additions and 1060 deletions

View file

@ -62,6 +62,7 @@ echo "log_level = 'debug'
[network] [network]
address = '127.0.0.1:3001' address = '127.0.0.1:3001'
enable_tls = true
tls_certificate_path = './cert.crt' tls_certificate_path = './cert.crt'
tls_private_key_path = './key.pk8' tls_private_key_path = './key.pk8'
" > tls.toml " > tls.toml

View file

@ -20,6 +20,15 @@
* Reload TLS certificate (and key) on SIGUSR1 * Reload TLS certificate (and key) on SIGUSR1
#### Changed
* Allow running without TLS
* Allow running behind reverse proxy
#### Fixed
* Fix bug where clean up after closing connections wasn't always done
### aquatic_ws ### aquatic_ws
#### Added #### Added

327
Cargo.lock generated
View file

@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.6" version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" checksum = "5a824f2aa7e75a0c98c5a504fceb80649e9c35265d44525b5f94de4771a395cd"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"once_cell", "once_cell",
@ -30,14 +30,15 @@ dependencies = [
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.8.3" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"getrandom", "getrandom",
"once_cell", "once_cell",
"version_check", "version_check",
"zerocopy",
] ]
[[package]] [[package]]
@ -88,17 +89,17 @@ dependencies = [
name = "aquatic_common" name = "aquatic_common"
version = "0.8.0" version = "0.8.0"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.6",
"anyhow", "anyhow",
"aquatic_toml_config", "aquatic_toml_config",
"arc-swap", "arc-swap",
"duplicate", "duplicate",
"git-testament", "git-testament",
"glommio", "glommio",
"hashbrown 0.14.1", "hashbrown 0.14.2",
"hex", "hex",
"hwloc", "hwloc",
"indexmap 2.0.2", "indexmap 2.1.0",
"libc", "libc",
"log", "log",
"privdrop", "privdrop",
@ -125,6 +126,7 @@ dependencies = [
"futures-lite", "futures-lite",
"futures-rustls", "futures-rustls",
"glommio", "glommio",
"httparse",
"itoa", "itoa",
"libc", "libc",
"log", "log",
@ -140,8 +142,9 @@ dependencies = [
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
"signal-hook", "signal-hook",
"slab", "slotmap",
"socket2 0.5.4", "socket2 0.5.5",
"thiserror",
] ]
[[package]] [[package]]
@ -152,10 +155,11 @@ dependencies = [
"aquatic_common", "aquatic_common",
"aquatic_http_protocol", "aquatic_http_protocol",
"aquatic_toml_config", "aquatic_toml_config",
"futures",
"futures-lite", "futures-lite",
"futures-rustls", "futures-rustls",
"glommio", "glommio",
"hashbrown 0.14.1", "hashbrown 0.14.2",
"log", "log",
"mimalloc", "mimalloc",
"quickcheck", "quickcheck",
@ -231,7 +235,7 @@ dependencies = [
"constant_time_eq", "constant_time_eq",
"crossbeam-channel", "crossbeam-channel",
"getrandom", "getrandom",
"hashbrown 0.14.1", "hashbrown 0.14.2",
"hdrhistogram", "hdrhistogram",
"hex", "hex",
"io-uring", "io-uring",
@ -249,7 +253,7 @@ dependencies = [
"serde", "serde",
"signal-hook", "signal-hook",
"slab", "slab",
"socket2 0.5.4", "socket2 0.5.5",
"tempfile", "tempfile",
"time", "time",
"tinytemplate", "tinytemplate",
@ -281,7 +285,7 @@ dependencies = [
"aquatic_common", "aquatic_common",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_udp_protocol", "aquatic_udp_protocol",
"hashbrown 0.14.1", "hashbrown 0.14.2",
"mimalloc", "mimalloc",
"mio", "mio",
"quickcheck", "quickcheck",
@ -289,7 +293,7 @@ dependencies = [
"rand", "rand",
"rand_distr", "rand_distr",
"serde", "serde",
"socket2 0.5.4", "socket2 0.5.5",
] ]
[[package]] [[package]]
@ -319,9 +323,9 @@ dependencies = [
"futures-lite", "futures-lite",
"futures-rustls", "futures-rustls",
"glommio", "glommio",
"hashbrown 0.14.1", "hashbrown 0.14.2",
"httparse", "httparse",
"indexmap 2.0.2", "indexmap 2.1.0",
"log", "log",
"metrics", "metrics",
"metrics-exporter-prometheus", "metrics-exporter-prometheus",
@ -337,7 +341,7 @@ dependencies = [
"signal-hook", "signal-hook",
"slab", "slab",
"slotmap", "slotmap",
"socket2 0.5.4", "socket2 0.5.5",
"tungstenite", "tungstenite",
] ]
@ -371,7 +375,7 @@ version = "0.8.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"criterion 0.5.1", "criterion 0.5.1",
"hashbrown 0.14.1", "hashbrown 0.14.2",
"quickcheck", "quickcheck",
"quickcheck_macros", "quickcheck_macros",
"serde", "serde",
@ -451,9 +455,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.4" version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9"
[[package]] [[package]]
name = "bendy" name = "bendy"
@ -614,21 +618,21 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.4.6" version = "4.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" checksum = "2275f18819641850fa26c89acc84d465c1bf91ce57bc2748b28c420473352f64"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
] ]
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.4.6" version = "4.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" checksum = "07cdf1b148b25c1e1f7a42225e30a0d99a615cd4637eae7365548dd4529b95bc"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"clap_lex 0.5.1", "clap_lex 0.6.0",
] ]
[[package]] [[package]]
@ -642,9 +646,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "0.5.1" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]] [[package]]
name = "colored" name = "colored"
@ -701,9 +705,9 @@ checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.9" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -752,7 +756,7 @@ dependencies = [
"anes", "anes",
"cast", "cast",
"ciborium", "ciborium",
"clap 4.4.6", "clap 4.4.8",
"criterion-plot", "criterion-plot",
"is-terminal", "is-terminal",
"itertools", "itertools",
@ -950,9 +954,9 @@ dependencies = [
[[package]] [[package]]
name = "errno" name = "errno"
version = "0.3.5" version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.48.0", "windows-sys 0.48.0",
@ -1012,7 +1016,7 @@ dependencies = [
"futures-sink", "futures-sink",
"nanorand", "nanorand",
"pin-project", "pin-project",
"spin 0.9.8", "spin",
] ]
[[package]] [[package]]
@ -1032,9 +1036,9 @@ dependencies = [
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -1047,9 +1051,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -1057,15 +1061,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -1074,9 +1078,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
[[package]] [[package]]
name = "futures-lite" name = "futures-lite"
@ -1095,13 +1099,13 @@ dependencies = [
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
] ]
[[package]] [[package]]
@ -1116,21 +1120,21 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.28" version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -1156,9 +1160,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.10" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"js-sys", "js-sys",
@ -1191,7 +1195,7 @@ dependencies = [
"log", "log",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
"time", "time",
] ]
@ -1201,7 +1205,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac1f09bf53139d5680da6325b4e79c6bc1518e94a65ab74df14b7e3693a8c78b" checksum = "ac1f09bf53139d5680da6325b4e79c6bc1518e94a65ab74df14b7e3693a8c78b"
dependencies = [ dependencies = [
"ahash 0.7.6", "ahash 0.7.7",
"backtrace", "backtrace",
"bitflags 1.3.2", "bitflags 1.3.2",
"bitmaps", "bitmaps",
@ -1225,7 +1229,7 @@ dependencies = [
"signal-hook", "signal-hook",
"sketches-ddsketch 0.1.3", "sketches-ddsketch 0.1.3",
"smallvec", "smallvec",
"socket2 0.4.9", "socket2 0.4.10",
"tracing", "tracing",
"typenum", "typenum",
] ]
@ -1258,25 +1262,25 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.6",
] ]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.14.1" version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.6",
"allocator-api2", "allocator-api2",
"serde", "serde",
] ]
[[package]] [[package]]
name = "hdrhistogram" name = "hdrhistogram"
version = "7.5.2" version = "7.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" checksum = "a5b38e5c02b7c7be48c8dc5217c4f1634af2ea221caae2e024bffc7a7651c691"
dependencies = [ dependencies = [
"base64 0.13.1", "base64 0.13.1",
"byteorder", "byteorder",
@ -1315,9 +1319,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.9" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -1378,7 +1382,7 @@ dependencies = [
"httpdate", "httpdate",
"itoa", "itoa",
"pin-project-lite", "pin-project-lite",
"socket2 0.4.9", "socket2 0.4.10",
"tokio", "tokio",
"tower-service", "tower-service",
"tracing", "tracing",
@ -1407,12 +1411,12 @@ dependencies = [
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.0.2" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown 0.14.1", "hashbrown 0.14.2",
] ]
[[package]] [[package]]
@ -1458,9 +1462,9 @@ dependencies = [
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.8.0" version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]] [[package]]
name = "is-terminal" name = "is-terminal"
@ -1490,9 +1494,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.64" version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8"
dependencies = [ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
@ -1579,9 +1583,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.149" version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]] [[package]]
name = "libm" name = "libm"
@ -1601,9 +1605,9 @@ dependencies = [
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.4.10" version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -1678,7 +1682,7 @@ version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.6",
"metrics-macros", "metrics-macros",
"portable-atomic", "portable-atomic",
] ]
@ -1689,7 +1693,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5"
dependencies = [ dependencies = [
"base64 0.21.4", "base64 0.21.5",
"hyper", "hyper",
"indexmap 1.9.3", "indexmap 1.9.3",
"ipnet", "ipnet",
@ -1708,7 +1712,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
] ]
[[package]] [[package]]
@ -1756,9 +1760,9 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.8" version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@ -1968,7 +1972,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
] ]
[[package]] [[package]]
@ -2019,9 +2023,9 @@ dependencies = [
[[package]] [[package]]
name = "portable-atomic" name = "portable-atomic"
version = "1.4.3" version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" checksum = "3bccab0e7fd7cc19f820a1c8c91720af652d0c88dc9664dd72aef2614f04af3b"
[[package]] [[package]]
name = "powerfmt" name = "powerfmt"
@ -2206,9 +2210,9 @@ dependencies = [
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.3.5" version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
] ]
@ -2244,17 +2248,16 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.20" version = "0.17.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b"
dependencies = [ dependencies = [
"cc", "cc",
"getrandom",
"libc", "libc",
"once_cell", "spin",
"spin 0.5.2",
"untrusted", "untrusted",
"web-sys", "windows-sys 0.48.0",
"winapi 0.3.9",
] ]
[[package]] [[package]]
@ -2274,12 +2277,12 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.38.19" version = "0.38.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" checksum = "9ad981d6c340a49cdc40a1028d9c6084ec7e9fa33fcb839cab656a267071e234"
dependencies = [ dependencies = [
"bitflags 2.4.1", "bitflags 2.4.1",
"errno 0.3.5", "errno 0.3.7",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.48.0", "windows-sys 0.48.0",
@ -2287,9 +2290,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.21.7" version = "0.21.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
dependencies = [ dependencies = [
"log", "log",
"ring", "ring",
@ -2299,18 +2302,18 @@ dependencies = [
[[package]] [[package]]
name = "rustls-pemfile" name = "rustls-pemfile"
version = "1.0.3" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [ dependencies = [
"base64 0.21.4", "base64 0.21.5",
] ]
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.101.6" version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [ dependencies = [
"ring", "ring",
"untrusted", "untrusted",
@ -2351,9 +2354,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "sct" name = "sct"
version = "0.7.0" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [ dependencies = [
"ring", "ring",
"untrusted", "untrusted",
@ -2361,9 +2364,9 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.189" version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -2389,20 +2392,20 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.189" version = "1.0.192"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
] ]
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.107" version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -2504,9 +2507,9 @@ dependencies = [
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.11.1" version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970"
[[package]] [[package]]
name = "snafu" name = "snafu"
@ -2532,9 +2535,9 @@ dependencies = [
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.4.9" version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [ dependencies = [
"libc", "libc",
"winapi 0.3.9", "winapi 0.3.9",
@ -2542,20 +2545,14 @@ dependencies = [
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.5.4" version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.9.8" version = "0.9.8"
@ -2584,9 +2581,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.38" version = "2.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2595,9 +2592,9 @@ dependencies = [
[[package]] [[package]]
name = "tempfile" name = "tempfile"
version = "3.8.0" version = "3.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"fastrand 2.0.1", "fastrand 2.0.1",
@ -2614,22 +2611,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.49" version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl",
] ]
[[package]] [[package]]
name = "thiserror-impl" name = "thiserror-impl"
version = "1.0.49" version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
] ]
[[package]] [[package]]
@ -2690,15 +2687,15 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.33.0" version = "1.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"libc", "libc",
"mio", "mio",
"pin-project-lite", "pin-project-lite",
"socket2 0.5.4", "socket2 0.5.5",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@ -2719,9 +2716,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.39" version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [ dependencies = [
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
@ -2736,7 +2733,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
] ]
[[package]] [[package]]
@ -2808,9 +2805,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]] [[package]]
name = "url" name = "url"
@ -2886,9 +2883,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.87" version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"wasm-bindgen-macro", "wasm-bindgen-macro",
@ -2896,24 +2893,24 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-backend" name = "wasm-bindgen-backend"
version = "0.2.87" version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"log", "log",
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.87" version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2"
dependencies = [ dependencies = [
"quote", "quote",
"wasm-bindgen-macro-support", "wasm-bindgen-macro-support",
@ -2921,28 +2918,28 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro-support" name = "wasm-bindgen-macro-support"
version = "0.2.87" version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn 2.0.39",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
[[package]] [[package]]
name = "wasm-bindgen-shared" name = "wasm-bindgen-shared"
version = "0.2.87" version = "0.2.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b"
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.64" version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85"
dependencies = [ dependencies = [
"js-sys", "js-sys",
"wasm-bindgen", "wasm-bindgen",
@ -3137,3 +3134,23 @@ name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "zerocopy"
version = "0.7.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.39",
]

View file

@ -9,11 +9,11 @@ of sub-implementations for different protocols:
[aquatic_http]: ./crates/http [aquatic_http]: ./crates/http
[aquatic_ws]: ./crates/ws [aquatic_ws]: ./crates/ws
| Name | Protocol | OS requirements | | Name | Protocol | OS requirements |
|----------------|---------------------------------|-----------------| |----------------|-------------------------------------------|-----------------|
| [aquatic_udp] | BitTorrent over UDP | Unix-like | | [aquatic_udp] | BitTorrent over UDP | Unix-like |
| [aquatic_http] | BitTorrent over HTTP over TLS | Linux 5.8+ | | [aquatic_http] | BitTorrent over HTTP, optionally over TLS | Linux 5.8+ |
| [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8+ | | [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8+ |
Features at a glance: Features at a glance:

View file

@ -5,6 +5,9 @@
* aquatic_ws * aquatic_ws
* Validate SDP data * Validate SDP data
* http
* panic sentinel not working
## Medium priority ## Medium priority
* stagger cleaning tasks? * stagger cleaning tasks?

View file

@ -35,6 +35,7 @@ futures = "0.3"
futures-lite = "1" futures-lite = "1"
futures-rustls = "0.24" futures-rustls = "0.24"
glommio = "0.8" glommio = "0.8"
httparse = "1"
itoa = "1" itoa = "1"
libc = "0.2" libc = "0.2"
log = "0.4" log = "0.4"
@ -48,8 +49,9 @@ rand = { version = "0.8", features = ["small_rng"] }
rustls-pemfile = "1" rustls-pemfile = "1"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" } signal-hook = { version = "0.3" }
slab = "0.4" slotmap = "1"
socket2 = { version = "0.5", features = ["all"] } socket2 = { version = "0.5", features = ["all"] }
thiserror = "1"
[dev-dependencies] [dev-dependencies]
quickcheck = "1" quickcheck = "1"

View file

@ -49,12 +49,9 @@ Generate the configuration file:
Make necessary adjustments to the file. You will likely want to adjust `address` Make necessary adjustments to the file. You will likely want to adjust `address`
(listening address) under the `network` section. (listening address) under the `network` section.
`aquatic_http` __only__ runs over TLS, so configuring certificate and private To run over TLS, configure certificate and private key files.
key files is required.
Running behind a reverse proxy is currently not supported due to the Running behind a reverse proxy is supported.
[difficulties of determining the originating IP address](https://adam-p.ca/blog/2022/03/x-forwarded-for/)
without knowing the exact setup.
### Running ### Running

View file

@ -10,12 +10,14 @@ use aquatic_http_protocol::{
response::{AnnounceResponse, ScrapeResponse}, response::{AnnounceResponse, ScrapeResponse},
}; };
use glommio::channels::shared_channel::SharedSender; use glommio::channels::shared_channel::SharedSender;
use slotmap::new_key_type;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize); pub struct ConsumerId(pub usize);
#[derive(Clone, Copy, Debug)] new_key_type! {
pub struct ConnectionId(pub usize); pub struct ConnectionId;
}
#[derive(Debug)] #[derive(Debug)]
pub enum ChannelRequest { pub enum ChannelRequest {

View file

@ -5,13 +5,18 @@ use aquatic_common::{
privileges::PrivilegeConfig, privileges::PrivilegeConfig,
}; };
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use serde::Deserialize; use serde::{Deserialize, Serialize};
use aquatic_common::cli::LogLevel; use aquatic_common::cli::LogLevel;
#[derive(Clone, Copy, Debug, PartialEq, Serialize, TomlConfig, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum ReverseProxyPeerIpHeaderFormat {
#[default]
LastAddress,
}
/// aquatic_http configuration /// aquatic_http configuration
///
/// Does not support running behind a reverse proxy.
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)] #[serde(default, deny_unknown_fields)]
pub struct Config { pub struct Config {
@ -76,29 +81,55 @@ pub struct NetworkConfig {
pub only_ipv6: bool, pub only_ipv6: bool,
/// Maximum number of pending TCP connections /// Maximum number of pending TCP connections
pub tcp_backlog: i32, pub tcp_backlog: i32,
/// Path to TLS certificate (DER-encoded X.509) /// Enable TLS
/// ///
/// The TLS files are read on start and when the program receives `SIGUSR1`. /// The TLS files are read on start and when the program receives `SIGUSR1`.
/// If initial parsing fails, the program exits. Later failures result in /// If initial parsing fails, the program exits. Later failures result in
/// in emitting of an error-level log message, while successful updates /// in emitting of an error-level log message, while successful updates
/// result in emitting of an info-level log message. Updates only affect /// result in emitting of an info-level log message. Updates only affect
/// new connections. /// new connections.
pub enable_tls: bool,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf, pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf, pub tls_private_key_path: PathBuf,
/// Keep connections alive after sending a response /// Keep connections alive after sending a response
pub keep_alive: bool, pub keep_alive: bool,
/// Does tracker run behind reverse proxy?
///
/// MUST be set to false if not running behind reverse proxy.
///
/// If set to true, make sure that reverse_proxy_ip_header_name and
/// reverse_proxy_ip_header_format are set to match your reverse proxy
/// setup.
///
/// More info on what can go wrong when running behind reverse proxies:
/// https://adam-p.ca/blog/2022/03/x-forwarded-for/
pub runs_behind_reverse_proxy: bool,
/// Name of header set by reverse proxy to indicate peer ip
pub reverse_proxy_ip_header_name: String,
/// How to extract peer IP from header field
///
/// Options:
/// - last_address: use the last address in the last instance of the
/// header. Works with typical multi-IP setups (e.g., "X-Forwarded-For")
/// as well as for single-IP setups (e.g., nginx "X-Real-IP")
pub reverse_proxy_ip_header_format: ReverseProxyPeerIpHeaderFormat,
} }
impl Default for NetworkConfig { impl Default for NetworkConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)), address: SocketAddr::from(([0, 0, 0, 0], 3000)),
enable_tls: false,
tls_certificate_path: "".into(), tls_certificate_path: "".into(),
tls_private_key_path: "".into(), tls_private_key_path: "".into(),
only_ipv6: false, only_ipv6: false,
tcp_backlog: 1024, tcp_backlog: 1024,
keep_alive: true, keep_alive: true,
runs_behind_reverse_proxy: false,
reverse_proxy_ip_header_name: "X-Forwarded-For".into(),
reverse_proxy_ip_header_format: Default::default(),
} }
} }
} }

View file

@ -24,7 +24,7 @@ mod common;
pub mod config; pub mod config;
mod workers; mod workers;
pub const APP_NAME: &str = "aquatic_http: BitTorrent tracker (HTTP over TLS)"; pub const APP_NAME: &str = "aquatic_http: HTTP BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
const SHARED_CHANNEL_SIZE: usize = 1024; const SHARED_CHANNEL_SIZE: usize = 1024;
@ -58,10 +58,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let tls_config = Arc::new(ArcSwap::from_pointee(create_rustls_config( let opt_tls_config = if config.network.enable_tls {
&config.network.tls_certificate_path, Some(Arc::new(ArcSwap::from_pointee(create_rustls_config(
&config.network.tls_private_key_path, &config.network.tls_certificate_path,
)?)); &config.network.tls_private_key_path,
)?)))
} else {
None
};
let server_start_instant = ServerStartInstant::new(); let server_start_instant = ServerStartInstant::new();
@ -71,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let sentinel = sentinel.clone(); let sentinel = sentinel.clone();
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
let tls_config = tls_config.clone(); let opt_tls_config = opt_tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone();
let priv_dropper = priv_dropper.clone(); let priv_dropper = priv_dropper.clone();
@ -89,7 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
sentinel, sentinel,
config, config,
state, state,
tls_config, opt_tls_config,
request_mesh_builder, request_mesh_builder,
priv_dropper, priv_dropper,
server_start_instant, server_start_instant,
@ -146,16 +150,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
SIGUSR1 => { SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list); let _ = update_access_list(&config.access_list, &state.access_list);
match create_rustls_config( if let Some(tls_config) = opt_tls_config.as_ref() {
&config.network.tls_certificate_path, match create_rustls_config(
&config.network.tls_private_key_path, &config.network.tls_certificate_path,
) { &config.network.tls_private_key_path,
Ok(config) => { ) {
tls_config.store(Arc::new(config)); Ok(config) => {
tls_config.store(Arc::new(config));
::log::info!("successfully updated tls config"); ::log::info!("successfully updated tls config");
}
Err(err) => ::log::error!("could not update tls config: {:#}", err),
} }
Err(err) => ::log::error!("could not update tls config: {:#}", err),
} }
} }
SIGTERM => { SIGTERM => {

View file

@ -1,576 +0,0 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant};
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
};
use arc_swap::ArcSwap;
use either::Either;
use futures::stream::FuturesUnordered;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use futures_rustls::server::TlsStream;
use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::shared_channel::{self, SharedReceiver};
use glommio::net::{TcpListener, TcpStream};
use glommio::task::JoinHandle;
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use once_cell::sync::Lazy;
use slab::Slab;
use crate::common::*;
use crate::config::Config;
const REQUEST_BUFFER_SIZE: usize = 2048;
const RESPONSE_BUFFER_SIZE: usize = 4096;
const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
const RESPONSE_HEADER_B: &[u8] = b" ";
const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n";
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
static RESPONSE_HEADER: Lazy<Vec<u8>> =
Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat());
struct PendingScrapeResponse {
pending_worker_responses: usize,
stats: BTreeMap<InfoHash, ScrapeStatistics>,
}
struct ConnectionReference {
task_handle: Option<JoinHandle<()>>,
valid_until: ValidUntil,
}
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
tls_config: Arc<ArcSwap<RustlsConfig>>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config);
let access_list = state.access_list;
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let request_senders = Rc::new(request_senders);
let connection_slab = Rc::new(RefCell::new(Slab::new()));
TimerActionRepeat::repeat(enclose!((config, connection_slab) move || {
clean_connections(
config.clone(),
connection_slab.clone(),
server_start_instant,
)
}));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let key = connection_slab.borrow_mut().insert(ConnectionReference {
task_handle: None,
valid_until: ValidUntil::new(
server_start_instant,
config.cleaning.max_connection_idle,
),
});
let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move {
let result = match stream.peer_addr() {
Ok(peer_addr) => {
let peer_addr = CanonicalSocketAddr::new(peer_addr);
#[cfg(feature = "metrics")]
let ip_version_str = peer_addr_to_ip_version_str(&peer_addr);
#[cfg(feature = "metrics")]
::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_str,
"worker_index" => worker_index.to_string(),
);
let result = Connection::run(
config,
access_list,
request_senders,
server_start_instant,
ConnectionId(key),
tls_config,
connection_slab.clone(),
stream,
peer_addr
).await;
#[cfg(feature = "metrics")]
::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"ip_version" => ip_version_str,
"worker_index" => worker_index.to_string(),
);
result
}
Err(err) => {
Err(anyhow::anyhow!("Couldn't get peer addr: {:?}", err))
}
};
if let Err(err) = result {
::log::debug!("Connection::run() error: {:?}", err);
}
connection_slab.borrow_mut().try_remove(key);
}))
.detach();
if let Some(reference) = connection_slab.borrow_mut().get_mut(key) {
reference.task_handle = Some(task_handle);
}
}
Err(err) => {
::log::error!("accept connection: {:?}", err);
}
}
}
}
async fn clean_connections(
config: Rc<Config>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
server_start_instant: ServerStartInstant,
) -> Option<Duration> {
let now = server_start_instant.seconds_elapsed();
connection_slab.borrow_mut().retain(|_, reference| {
if reference.valid_until.valid(now) {
true
} else {
if let Some(ref handle) = reference.task_handle {
handle.cancel();
}
false
}
});
connection_slab.borrow_mut().shrink_to_fit();
Some(Duration::from_secs(
config.cleaning.connection_cleaning_interval,
))
}
struct Connection {
config: Rc<Config>,
access_list_cache: AccessListCache,
request_senders: Rc<Senders<ChannelRequest>>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
server_start_instant: ServerStartInstant,
stream: TlsStream<TcpStream>,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
request_buffer: [u8; REQUEST_BUFFER_SIZE],
request_buffer_position: usize,
response_buffer: [u8; RESPONSE_BUFFER_SIZE],
}
impl Connection {
async fn run(
config: Rc<Config>,
access_list: Arc<AccessListArcSwap>,
request_senders: Rc<Senders<ChannelRequest>>,
server_start_instant: ServerStartInstant,
connection_id: ConnectionId,
tls_config: Arc<ArcSwap<RustlsConfig>>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TcpStream,
peer_addr: CanonicalSocketAddr,
) -> anyhow::Result<()> {
let tls_acceptor: TlsAcceptor = tls_config.load_full().into();
let stream = tls_acceptor.accept(stream).await?;
let mut response_buffer = [0; RESPONSE_BUFFER_SIZE];
response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER);
let mut conn = Connection {
config: config.clone(),
access_list_cache: create_access_list_cache(&access_list),
request_senders: request_senders.clone(),
connection_slab,
server_start_instant,
stream,
peer_addr,
connection_id,
request_buffer: [0; REQUEST_BUFFER_SIZE],
request_buffer_position: 0,
response_buffer,
};
conn.run_request_response_loop().await?;
Ok(())
}
async fn run_request_response_loop(&mut self) -> anyhow::Result<()> {
loop {
let response = match self.read_request().await? {
Either::Left(response) => Response::Failure(response),
Either::Right(request) => self.handle_request(request).await?,
};
self.write_response(&response).await?;
if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive {
let _ = self
.stream
.get_ref()
.0
.shutdown(std::net::Shutdown::Both)
.await;
break;
}
}
Ok(())
}
async fn read_request(&mut self) -> anyhow::Result<Either<FailureResponse, Request>> {
self.request_buffer_position = 0;
loop {
if self.request_buffer_position == self.request_buffer.len() {
return Err(anyhow::anyhow!("request buffer is full"));
}
let bytes_read = self
.stream
.read(&mut self.request_buffer[self.request_buffer_position..])
.await?;
if bytes_read == 0 {
return Err(anyhow::anyhow!("peer closed connection"));
}
self.request_buffer_position += bytes_read;
match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
Ok(request) => {
return Ok(Either::Right(request));
}
Err(RequestParseError::Invalid(err)) => {
let response = FailureResponse {
failure_reason: "Invalid request".into(),
};
::log::debug!("Invalid request: {:#}", err);
return Ok(Either::Left(response));
}
Err(RequestParseError::NeedMoreData) => {
::log::debug!(
"need more request data. current data: {}",
&self.request_buffer[..self.request_buffer_position].escape_ascii()
);
}
}
}
}
/// Take a request and:
/// - Update connection ValidUntil
/// - Return error response if request is not allowed
/// - If it is an announce request, send it to swarm workers an await a
/// response
/// - If it is a scrape requests, split it up, pass on the parts to
/// relevant swarm workers and await a response
async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> {
if let Ok(mut slab) = self.connection_slab.try_borrow_mut() {
if let Some(reference) = slab.get_mut(self.connection_id.0) {
reference.valid_until = ValidUntil::new(
self.server_start_instant,
self.config.cleaning.max_connection_idle,
);
}
}
match request {
Request::Announce(request) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hash = request.info_hash;
if self
.access_list_cache
.load()
.allows(self.config.access_list.mode, &info_hash.0)
{
let (response_sender, response_receiver) = shared_channel::new_bounded(1);
let request = ChannelRequest::Announce {
request,
peer_addr: self.peer_addr,
response_sender,
};
let consumer_index = calculate_request_consumer_index(&self.config, info_hash);
// Only fails when receiver is closed
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
response_receiver
.connect()
.await
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("response sender closed"))
.map(Response::Announce)
} else {
let response = Response::Failure(FailureResponse {
failure_reason: "Info hash not allowed".into(),
});
Ok(response)
}
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.into_iter() {
let info_hashes = info_hashes_by_worker
.entry(calculate_request_consumer_index(&self.config, info_hash))
.or_default();
info_hashes.push(info_hash);
}
let pending_worker_responses = info_hashes_by_worker.len();
let mut response_receivers = Vec::with_capacity(pending_worker_responses);
for (consumer_index, info_hashes) in info_hashes_by_worker {
let (response_sender, response_receiver) = shared_channel::new_bounded(1);
response_receivers.push(response_receiver);
let request = ChannelRequest::Scrape {
request: ScrapeRequest { info_hashes },
peer_addr: self.peer_addr,
response_sender,
};
// Only fails when receiver is closed
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
}
let pending_scrape_response = PendingScrapeResponse {
pending_worker_responses,
stats: Default::default(),
};
self.wait_for_scrape_responses(response_receivers, pending_scrape_response)
.await
}
}
}
/// Wait for partial scrape responses to arrive,
/// return full response
async fn wait_for_scrape_responses(
&self,
response_receivers: Vec<SharedReceiver<ScrapeResponse>>,
mut pending: PendingScrapeResponse,
) -> anyhow::Result<Response> {
let mut responses = response_receivers
.into_iter()
.map(|receiver| async { receiver.connect().await.recv().await })
.collect::<FuturesUnordered<_>>();
loop {
let response = responses
.next()
.await
.ok_or_else(|| {
anyhow::anyhow!("stream ended before all partial scrape responses received")
})?
.ok_or_else(|| {
anyhow::anyhow!(
"wait_for_scrape_response: can't receive response, sender is closed"
)
})?;
pending.stats.extend(response.files);
pending.pending_worker_responses -= 1;
if pending.pending_worker_responses == 0 {
let response = Response::Scrape(ScrapeResponse {
files: pending.stats,
});
break Ok(response);
}
}
}
async fn write_response(&mut self, response: &Response) -> anyhow::Result<()> {
// Write body and final newline to response buffer
let mut position = RESPONSE_HEADER.len();
let body_len = response.write(&mut &mut self.response_buffer[position..])?;
position += body_len;
if position + 2 > self.response_buffer.len() {
::log::error!("Response buffer is too short for response");
return Err(anyhow::anyhow!("Response buffer is too short for response"));
}
(&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n");
position += 2;
let content_len = body_len + 2;
// Clear content-len header value
{
let start = RESPONSE_HEADER_A.len();
let end = start + RESPONSE_HEADER_B.len();
(&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B);
}
// Set content-len header value
{
let mut buf = ::itoa::Buffer::new();
let content_len_bytes = buf.format(content_len).as_bytes();
let start = RESPONSE_HEADER_A.len();
let end = start + content_len_bytes.len();
(&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes);
}
// Write buffer to stream
self.stream.write(&self.response_buffer[..position]).await?;
self.stream.flush().await?;
#[cfg(feature = "metrics")]
{
let response_type = match response {
Response::Announce(_) => "announce",
Response::Scrape(_) => "scrape",
Response::Failure(_) => "error",
};
::metrics::increment_counter!(
"aquatic_responses_total",
"type" => response_type,
"ip_version" => peer_addr_to_ip_version_str(&self.peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
Ok(())
}
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.swarm_workers
}
fn create_tcp_listener(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<TcpListener> {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
if config.network.only_ipv6 {
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.bind(&config.network.address.into())
.with_context(|| format!("socket: bind to {}", config.network.address))?;
socket
.listen(config.network.tcp_backlog)
.with_context(|| format!("socket: listen on {}", config.network.address))?;
priv_dropper.after_socket_creation()?;
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
}
#[cfg(feature = "metrics")]
fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str {
if addr.is_ipv4() {
"4"
} else {
"6"
}
}

View file

@ -0,0 +1,471 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, ServerStartInstant};
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, ScrapeRequest};
use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
};
use arc_swap::ArcSwap;
use either::Either;
use futures::stream::FuturesUnordered;
use futures_lite::future::race;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::Senders;
use glommio::channels::local_channel::LocalReceiver;
use glommio::channels::shared_channel::{self, SharedReceiver};
use glommio::net::TcpStream;
use once_cell::sync::Lazy;
use crate::common::*;
use crate::config::Config;
use super::request::{parse_request, RequestParseError};
#[cfg(feature = "metrics")]
use super::{peer_addr_to_ip_version_str, WORKER_INDEX};
const REQUEST_BUFFER_SIZE: usize = 2048;
const RESPONSE_BUFFER_SIZE: usize = 4096;
const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
const RESPONSE_HEADER_B: &[u8] = b" ";
const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n";
static RESPONSE_HEADER: Lazy<Vec<u8>> =
Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat());
struct PendingScrapeResponse {
pending_worker_responses: usize,
stats: BTreeMap<InfoHash, ScrapeStatistics>,
}
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("inactive")]
Inactive,
#[error("socket peer addr extraction failed")]
NoSocketPeerAddr(String),
#[error("request buffer full")]
RequestBufferFull,
#[error("response buffer full")]
ResponseBufferFull,
#[error("response buffer write error: {0}")]
ResponseBufferWrite(::std::io::Error),
#[error("peer closed")]
PeerClosed,
#[error("response sender closed")]
ResponseSenderClosed,
#[error("scrape channel error: {0}")]
ScrapeChannelError(&'static str),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub(super) async fn run_connection(
config: Rc<Config>,
access_list: Arc<AccessListArcSwap>,
request_senders: Rc<Senders<ChannelRequest>>,
server_start_instant: ServerStartInstant,
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
valid_until: Rc<RefCell<ValidUntil>>,
close_conn_receiver: LocalReceiver<()>,
stream: TcpStream,
) -> Result<(), ConnectionError> {
let access_list_cache = create_access_list_cache(&access_list);
let request_buffer = Box::new([0u8; REQUEST_BUFFER_SIZE]);
let mut response_buffer = Box::new([0; RESPONSE_BUFFER_SIZE]);
response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER);
let remote_addr = stream
.peer_addr()
.map_err(|err| ConnectionError::NoSocketPeerAddr(err.to_string()))?;
let opt_peer_addr = if config.network.runs_behind_reverse_proxy {
None
} else {
Some(CanonicalSocketAddr::new(remote_addr))
};
let peer_port = remote_addr.port();
if let Some(tls_config) = opt_tls_config {
let tls_acceptor: TlsAcceptor = tls_config.load_full().into();
let stream = tls_acceptor
.accept(stream)
.await
.with_context(|| "tls accept")?;
let mut conn = Connection {
config,
access_list_cache,
request_senders,
valid_until,
server_start_instant,
opt_peer_addr,
peer_port,
request_buffer,
request_buffer_position: 0,
response_buffer,
stream,
};
conn.run(close_conn_receiver).await?;
} else {
let mut conn = Connection {
config,
access_list_cache,
request_senders,
valid_until,
server_start_instant,
opt_peer_addr,
peer_port,
request_buffer,
request_buffer_position: 0,
response_buffer,
stream,
};
conn.run(close_conn_receiver).await?;
}
Ok(())
}
struct Connection<S> {
config: Rc<Config>,
access_list_cache: AccessListCache,
request_senders: Rc<Senders<ChannelRequest>>,
valid_until: Rc<RefCell<ValidUntil>>,
server_start_instant: ServerStartInstant,
opt_peer_addr: Option<CanonicalSocketAddr>,
peer_port: u16,
request_buffer: Box<[u8; REQUEST_BUFFER_SIZE]>,
request_buffer_position: usize,
response_buffer: Box<[u8; RESPONSE_BUFFER_SIZE]>,
stream: S,
}
impl<S> Connection<S>
where
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
{
async fn run(&mut self, close_conn_receiver: LocalReceiver<()>) -> Result<(), ConnectionError> {
let f1 = async { self.run_request_response_loop().await };
let f2 = async {
close_conn_receiver.recv().await;
Err(ConnectionError::Inactive)
};
race(f1, f2).await
}
async fn run_request_response_loop(&mut self) -> Result<(), ConnectionError> {
loop {
let response = match self.read_request().await? {
Either::Left(response) => Response::Failure(response),
Either::Right(request) => self.handle_request(request).await?,
};
self.write_response(&response).await?;
if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive {
break;
}
}
Ok(())
}
async fn read_request(&mut self) -> Result<Either<FailureResponse, Request>, ConnectionError> {
self.request_buffer_position = 0;
loop {
if self.request_buffer_position == self.request_buffer.len() {
return Err(ConnectionError::RequestBufferFull);
}
let bytes_read = self
.stream
.read(&mut self.request_buffer[self.request_buffer_position..])
.await
.with_context(|| "read")?;
if bytes_read == 0 {
return Err(ConnectionError::PeerClosed);
}
self.request_buffer_position += bytes_read;
let buffer_slice = &self.request_buffer[..self.request_buffer_position];
match parse_request(&self.config, buffer_slice) {
Ok((request, opt_peer_ip)) => {
if self.config.network.runs_behind_reverse_proxy {
let peer_ip = opt_peer_ip
.expect("logic error: peer ip must have been extracted at this point");
self.opt_peer_addr = Some(CanonicalSocketAddr::new(SocketAddr::new(
peer_ip,
self.peer_port,
)));
}
return Ok(Either::Right(request));
}
Err(RequestParseError::MoreDataNeeded) => continue,
Err(RequestParseError::RequiredPeerIpHeaderMissing(err)) => {
panic!("Tracker configured as running behind reverse proxy, but no corresponding IP header set in request. Please check your reverse proxy setup as well as your aquatic configuration. Error: {:#}", err);
}
Err(RequestParseError::Other(err)) => {
::log::debug!("Failed parsing request: {:#}", err);
let response = FailureResponse {
failure_reason: "Invalid request".into(),
};
return Ok(Either::Left(response));
}
}
}
}
/// Take a request and:
/// - Update connection ValidUntil
/// - Return error response if request is not allowed
/// - If it is an announce request, send it to swarm workers an await a
/// response
/// - If it is a scrape requests, split it up, pass on the parts to
/// relevant swarm workers and await a response
async fn handle_request(&mut self, request: Request) -> Result<Response, ConnectionError> {
let peer_addr = self
.opt_peer_addr
.expect("peer addr should already have been extracted by now");
*self.valid_until.borrow_mut() = ValidUntil::new(
self.server_start_instant,
self.config.cleaning.max_connection_idle,
);
match request {
Request::Announce(request) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "announce",
"ip_version" => peer_addr_to_ip_version_str(&peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hash = request.info_hash;
if self
.access_list_cache
.load()
.allows(self.config.access_list.mode, &info_hash.0)
{
let (response_sender, response_receiver) = shared_channel::new_bounded(1);
let request = ChannelRequest::Announce {
request,
peer_addr,
response_sender,
};
let consumer_index = calculate_request_consumer_index(&self.config, info_hash);
// Only fails when receiver is closed
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
response_receiver
.connect()
.await
.recv()
.await
.ok_or(ConnectionError::ResponseSenderClosed)
.map(Response::Announce)
} else {
let response = Response::Failure(FailureResponse {
failure_reason: "Info hash not allowed".into(),
});
Ok(response)
}
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => peer_addr_to_ip_version_str(&peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.into_iter() {
let info_hashes = info_hashes_by_worker
.entry(calculate_request_consumer_index(&self.config, info_hash))
.or_default();
info_hashes.push(info_hash);
}
let pending_worker_responses = info_hashes_by_worker.len();
let mut response_receivers = Vec::with_capacity(pending_worker_responses);
for (consumer_index, info_hashes) in info_hashes_by_worker {
let (response_sender, response_receiver) = shared_channel::new_bounded(1);
response_receivers.push(response_receiver);
let request = ChannelRequest::Scrape {
request: ScrapeRequest { info_hashes },
peer_addr,
response_sender,
};
// Only fails when receiver is closed
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
}
let pending_scrape_response = PendingScrapeResponse {
pending_worker_responses,
stats: Default::default(),
};
self.wait_for_scrape_responses(response_receivers, pending_scrape_response)
.await
}
}
}
/// Wait for partial scrape responses to arrive,
/// return full response
async fn wait_for_scrape_responses(
&self,
response_receivers: Vec<SharedReceiver<ScrapeResponse>>,
mut pending: PendingScrapeResponse,
) -> Result<Response, ConnectionError> {
let mut responses = response_receivers
.into_iter()
.map(|receiver| async { receiver.connect().await.recv().await })
.collect::<FuturesUnordered<_>>();
loop {
let response = responses
.next()
.await
.ok_or_else(|| {
ConnectionError::ScrapeChannelError(
"stream ended before all partial scrape responses received",
)
})?
.ok_or_else(|| ConnectionError::ScrapeChannelError("sender is closed"))?;
pending.stats.extend(response.files);
pending.pending_worker_responses -= 1;
if pending.pending_worker_responses == 0 {
let response = Response::Scrape(ScrapeResponse {
files: pending.stats,
});
break Ok(response);
}
}
}
async fn write_response(&mut self, response: &Response) -> Result<(), ConnectionError> {
// Write body and final newline to response buffer
let mut position = RESPONSE_HEADER.len();
let body_len = response
.write(&mut &mut self.response_buffer[position..])
.map_err(|err| ConnectionError::ResponseBufferWrite(err))?;
position += body_len;
if position + 2 > self.response_buffer.len() {
return Err(ConnectionError::ResponseBufferFull);
}
(&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n");
position += 2;
let content_len = body_len + 2;
// Clear content-len header value
{
let start = RESPONSE_HEADER_A.len();
let end = start + RESPONSE_HEADER_B.len();
(&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B);
}
// Set content-len header value
{
let mut buf = ::itoa::Buffer::new();
let content_len_bytes = buf.format(content_len).as_bytes();
let start = RESPONSE_HEADER_A.len();
let end = start + content_len_bytes.len();
(&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes);
}
// Write buffer to stream
self.stream
.write(&self.response_buffer[..position])
.await
.with_context(|| "write")?;
self.stream.flush().await.with_context(|| "flush")?;
#[cfg(feature = "metrics")]
{
let response_type = match response {
Response::Announce(_) => "announce",
Response::Scrape(_) => "scrape",
Response::Failure(_) => "error",
};
let peer_addr = self
.opt_peer_addr
.expect("peer addr should already have been extracted by now");
::metrics::increment_counter!(
"aquatic_responses_total",
"type" => response_type,
"ip_version" => peer_addr_to_ip_version_str(&peer_addr),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
}
Ok(())
}
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.swarm_workers
}

View file

@ -0,0 +1,212 @@
mod connection;
mod request;
use std::cell::RefCell;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant};
use arc_swap::ArcSwap;
use futures_lite::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role};
use glommio::channels::local_channel::{new_bounded, LocalSender};
use glommio::net::TcpListener;
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use slotmap::HopSlotMap;
use crate::common::*;
use crate::config::Config;
use crate::workers::socket::connection::{run_connection, ConnectionError};
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell<usize> = Default::default() }
struct ConnectionHandle {
close_conn_sender: LocalSender<()>,
valid_until: Rc<RefCell<ValidUntil>>,
}
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
#[cfg(feature = "metrics")]
WORKER_INDEX.with(|index| index.set(worker_index));
let config = Rc::new(config);
let access_list = state.access_list;
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let request_senders = Rc::new(request_senders);
let connection_handles = Rc::new(RefCell::new(HopSlotMap::with_key()));
TimerActionRepeat::repeat(enclose!((config, connection_handles) move || {
clean_connections(
config.clone(),
connection_handles.clone(),
server_start_instant,
)
}));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let (close_conn_sender, close_conn_receiver) = new_bounded(1);
let valid_until = Rc::new(RefCell::new(ValidUntil::new(
server_start_instant,
config.cleaning.max_connection_idle,
)));
let connection_id = connection_handles.borrow_mut().insert(ConnectionHandle {
close_conn_sender,
valid_until: valid_until.clone(),
});
spawn_local(enclose!(
(
config,
access_list,
request_senders,
opt_tls_config,
connection_handles,
valid_until,
)
async move {
#[cfg(feature = "metrics")]
::metrics::increment_gauge!(
"aquatic_active_connections",
1.0,
"worker_index" => worker_index.to_string(),
);
let result = run_connection(
config,
access_list,
request_senders,
server_start_instant,
opt_tls_config,
valid_until.clone(),
close_conn_receiver,
stream,
).await;
#[cfg(feature = "metrics")]
::metrics::decrement_gauge!(
"aquatic_active_connections",
1.0,
"worker_index" => worker_index.to_string(),
);
match result {
Ok(()) => (),
Err(err@(
ConnectionError::ResponseBufferWrite(_) |
ConnectionError::ResponseBufferFull |
ConnectionError::ScrapeChannelError(_) |
ConnectionError::ResponseSenderClosed
)) => {
::log::error!("connection closed: {:#}", err);
}
Err(err@ConnectionError::RequestBufferFull) => {
::log::info!("connection closed: {:#}", err);
}
Err(err) => {
::log::debug!("connection closed: {:#}", err);
}
}
connection_handles.borrow_mut().remove(connection_id);
}
))
.detach();
}
Err(err) => {
::log::error!("accept connection: {:?}", err);
}
}
}
}
async fn clean_connections(
config: Rc<Config>,
connection_slab: Rc<RefCell<HopSlotMap<ConnectionId, ConnectionHandle>>>,
server_start_instant: ServerStartInstant,
) -> Option<Duration> {
let now = server_start_instant.seconds_elapsed();
connection_slab.borrow_mut().retain(|_, handle| {
if handle.valid_until.borrow().valid(now) {
true
} else {
let _ = handle.close_conn_sender.try_send(());
false
}
});
Some(Duration::from_secs(
config.cleaning.connection_cleaning_interval,
))
}
fn create_tcp_listener(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<TcpListener> {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
if config.network.only_ipv6 {
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.bind(&config.network.address.into())
.with_context(|| format!("socket: bind to {}", config.network.address))?;
socket
.listen(config.network.tcp_backlog)
.with_context(|| format!("socket: listen on {}", config.network.address))?;
priv_dropper.after_socket_creation()?;
Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) })
}
#[cfg(feature = "metrics")]
fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str {
if addr.is_ipv4() {
"4"
} else {
"6"
}
}

View file

@ -0,0 +1,147 @@
use std::net::IpAddr;
use anyhow::Context;
use aquatic_http_protocol::request::Request;
use crate::config::{Config, ReverseProxyPeerIpHeaderFormat};
#[derive(Debug, thiserror::Error)]
pub enum RequestParseError {
#[error("required peer ip header missing or invalid")]
RequiredPeerIpHeaderMissing(anyhow::Error),
#[error("more data needed")]
MoreDataNeeded,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub fn parse_request(
config: &Config,
buffer: &[u8],
) -> Result<(Request, Option<IpAddr>), RequestParseError> {
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut http_request = httparse::Request::new(&mut headers);
match http_request.parse(buffer).with_context(|| "httparse")? {
httparse::Status::Complete(_) => {
let path = http_request.path.ok_or(anyhow::anyhow!("no http path"))?;
let request = Request::from_http_get_path(path)?;
let opt_peer_ip = if config.network.runs_behind_reverse_proxy {
let header_name = &config.network.reverse_proxy_ip_header_name;
let header_format = config.network.reverse_proxy_ip_header_format;
match parse_forwarded_header(header_name, header_format, http_request.headers) {
Ok(peer_ip) => Some(peer_ip),
Err(err) => {
return Err(RequestParseError::RequiredPeerIpHeaderMissing(err));
}
}
} else {
None
};
Ok((request, opt_peer_ip))
}
httparse::Status::Partial => Err(RequestParseError::MoreDataNeeded),
}
}
fn parse_forwarded_header(
header_name: &str,
header_format: ReverseProxyPeerIpHeaderFormat,
headers: &[httparse::Header<'_>],
) -> anyhow::Result<IpAddr> {
for header in headers.into_iter().rev() {
if header.name == header_name {
match header_format {
ReverseProxyPeerIpHeaderFormat::LastAddress => {
return ::std::str::from_utf8(header.value)?
.split(',')
.last()
.ok_or(anyhow::anyhow!("no header value"))?
.trim()
.parse::<IpAddr>()
.with_context(|| "parse ip");
}
}
}
}
Err(anyhow::anyhow!("header not present"))
}
#[cfg(test)]
mod tests {
use super::*;
const REQUEST_START: &str = "GET /announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=1&downloaded=2&left=3&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started HTTP/1.1\r\nHost: example.com\r\n";
#[test]
fn test_parse_peer_ip_header_multiple() {
let mut config = Config::default();
config.network.runs_behind_reverse_proxy = true;
config.network.reverse_proxy_ip_header_name = "X-Forwarded-For".into();
config.network.reverse_proxy_ip_header_format = ReverseProxyPeerIpHeaderFormat::LastAddress;
let mut request = REQUEST_START.to_string();
request.push_str("X-Forwarded-For: 200.0.0.1\r\n");
request.push_str("X-Forwarded-For: 1.2.3.4, 5.6.7.8,9.10.11.12\r\n");
request.push_str("\r\n");
let expected_ip = IpAddr::from([9, 10, 11, 12]);
assert_eq!(
parse_request(&config, request.as_bytes())
.unwrap()
.1
.unwrap(),
expected_ip
)
}
#[test]
fn test_parse_peer_ip_header_single() {
let mut config = Config::default();
config.network.runs_behind_reverse_proxy = true;
config.network.reverse_proxy_ip_header_name = "X-Forwarded-For".into();
config.network.reverse_proxy_ip_header_format = ReverseProxyPeerIpHeaderFormat::LastAddress;
let mut request = REQUEST_START.to_string();
request.push_str("X-Forwarded-For: 1.2.3.4, 5.6.7.8,9.10.11.12\r\n");
request.push_str("X-Forwarded-For: 200.0.0.1\r\n");
request.push_str("\r\n");
let expected_ip = IpAddr::from([200, 0, 0, 1]);
assert_eq!(
parse_request(&config, request.as_bytes())
.unwrap()
.1
.unwrap(),
expected_ip
)
}
#[test]
fn test_parse_peer_ip_header_no_header() {
let mut config = Config::default();
config.network.runs_behind_reverse_proxy = true;
let mut request = REQUEST_START.to_string();
request.push_str("\r\n");
let res = parse_request(&config, request.as_bytes());
assert!(matches!(
res,
Err(RequestParseError::RequiredPeerIpHeaderMissing(_))
));
}
}

View file

@ -19,6 +19,7 @@ aquatic_http_protocol.workspace = true
aquatic_toml_config.workspace = true aquatic_toml_config.workspace = true
anyhow = "1" anyhow = "1"
futures = "0.3"
futures-lite = "1" futures-lite = "1"
futures-rustls = "0.24" futures-rustls = "0.24"
hashbrown = "0.14" hashbrown = "0.14"

View file

@ -23,6 +23,7 @@ pub struct Config {
pub url_suffix: String, pub url_suffix: String,
pub duration: usize, pub duration: usize,
pub keep_alive: bool, pub keep_alive: bool,
pub enable_tls: bool,
pub torrents: TorrentConfig, pub torrents: TorrentConfig,
pub cpu_pinning: CpuPinningConfigDesc, pub cpu_pinning: CpuPinningConfigDesc,
} }
@ -44,6 +45,7 @@ impl Default for Config {
url_suffix: "".into(), url_suffix: "".into(),
duration: 0, duration: 0,
keep_alive: true, keep_alive: true,
enable_tls: true,
torrents: TorrentConfig::default(), torrents: TorrentConfig::default(),
cpu_pinning: Default::default(), cpu_pinning: Default::default(),
} }

View file

@ -59,11 +59,15 @@ fn run(config: Config) -> ::anyhow::Result<()> {
gamma: Arc::new(gamma), gamma: Arc::new(gamma),
}; };
let tls_config = create_tls_config().unwrap(); let opt_tls_config = if config.enable_tls {
Some(create_tls_config().unwrap())
} else {
None
};
for i in 0..config.num_workers { for i in 0..config.num_workers {
let config = config.clone(); let config = config.clone();
let tls_config = tls_config.clone(); let opt_tls_config = opt_tls_config.clone();
let state = state.clone(); let state = state.clone();
let placement = get_worker_placement( let placement = get_worker_placement(
@ -76,7 +80,9 @@ fn run(config: Config) -> ::anyhow::Result<()> {
LocalExecutorBuilder::new(placement) LocalExecutorBuilder::new(placement)
.name("load-test") .name("load-test")
.spawn(move || async move { .spawn(move || async move {
run_socket_thread(config, tls_config, state).await.unwrap(); run_socket_thread(config, opt_tls_config, state)
.await
.unwrap();
}) })
.unwrap(); .unwrap();
} }

View file

@ -9,7 +9,7 @@ use std::{
use aquatic_http_protocol::response::Response; use aquatic_http_protocol::response::Response;
use futures_lite::{AsyncReadExt, AsyncWriteExt}; use futures_lite::{AsyncReadExt, AsyncWriteExt};
use futures_rustls::{client::TlsStream, TlsConnector}; use futures_rustls::TlsConnector;
use glommio::net::TcpStream; use glommio::net::TcpStream;
use glommio::{prelude::*, timer::TimerActionRepeat}; use glommio::{prelude::*, timer::TimerActionRepeat};
use rand::{prelude::SmallRng, SeedableRng}; use rand::{prelude::SmallRng, SeedableRng};
@ -18,7 +18,7 @@ use crate::{common::LoadTestState, config::Config, utils::create_random_request}
pub async fn run_socket_thread( pub async fn run_socket_thread(
config: Config, config: Config,
tls_config: Arc<rustls::ClientConfig>, opt_tls_config: Option<Arc<rustls::ClientConfig>>,
load_test_state: LoadTestState, load_test_state: LoadTestState,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let config = Rc::new(config); let config = Rc::new(config);
@ -30,9 +30,9 @@ pub async fn run_socket_thread(
if interval == 0 { if interval == 0 {
loop { loop {
if *num_active_connections.borrow() < config.num_connections { if *num_active_connections.borrow() < config.num_connections {
if let Err(err) = Connection::run( if let Err(err) = run_connection(
config.clone(), config.clone(),
tls_config.clone(), opt_tls_config.clone(),
load_test_state.clone(), load_test_state.clone(),
num_active_connections.clone(), num_active_connections.clone(),
rng.clone(), rng.clone(),
@ -50,7 +50,7 @@ pub async fn run_socket_thread(
periodically_open_connections( periodically_open_connections(
config.clone(), config.clone(),
interval, interval,
tls_config.clone(), opt_tls_config.clone(),
load_test_state.clone(), load_test_state.clone(),
num_active_connections.clone(), num_active_connections.clone(),
rng.clone(), rng.clone(),
@ -66,16 +66,16 @@ pub async fn run_socket_thread(
async fn periodically_open_connections( async fn periodically_open_connections(
config: Rc<Config>, config: Rc<Config>,
interval: Duration, interval: Duration,
tls_config: Arc<rustls::ClientConfig>, opt_tls_config: Option<Arc<rustls::ClientConfig>>,
load_test_state: LoadTestState, load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>, num_active_connections: Rc<RefCell<usize>>,
rng: Rc<RefCell<SmallRng>>, rng: Rc<RefCell<SmallRng>>,
) -> Option<Duration> { ) -> Option<Duration> {
if *num_active_connections.borrow() < config.num_connections { if *num_active_connections.borrow() < config.num_connections {
spawn_local(async move { spawn_local(async move {
if let Err(err) = Connection::run( if let Err(err) = run_connection(
config, config,
tls_config, opt_tls_config,
load_test_state, load_test_state,
num_active_connections, num_active_connections,
rng.clone(), rng.clone(),
@ -91,26 +91,18 @@ async fn periodically_open_connections(
Some(interval) Some(interval)
} }
struct Connection { async fn run_connection(
config: Rc<Config>, config: Rc<Config>,
opt_tls_config: Option<Arc<rustls::ClientConfig>>,
load_test_state: LoadTestState, load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
rng: Rc<RefCell<SmallRng>>, rng: Rc<RefCell<SmallRng>>,
stream: TlsStream<TcpStream>, ) -> anyhow::Result<()> {
buffer: [u8; 2048], let stream = TcpStream::connect(config.server_address)
} .await
.map_err(|err| anyhow::anyhow!("connect: {:?}", err))?;
impl Connection {
async fn run(
config: Rc<Config>,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
rng: Rc<RefCell<SmallRng>>,
) -> anyhow::Result<()> {
let stream = TcpStream::connect(config.server_address)
.await
.map_err(|err| anyhow::anyhow!("connect: {:?}", err))?;
if let Some(tls_config) = opt_tls_config {
let stream = TlsConnector::from(tls_config) let stream = TlsConnector::from(tls_config)
.connect("example.com".try_into().unwrap(), stream) .connect("example.com".try_into().unwrap(), stream)
.await?; .await?;
@ -120,18 +112,49 @@ impl Connection {
load_test_state, load_test_state,
rng, rng,
stream, stream,
buffer: [0; 2048], buffer: Box::new([0; 2048]),
}; };
connection.run(num_active_connections).await?;
} else {
let mut connection = Connection {
config,
load_test_state,
rng,
stream,
buffer: Box::new([0; 2048]),
};
connection.run(num_active_connections).await?;
}
Ok(())
}
struct Connection<S> {
config: Rc<Config>,
load_test_state: LoadTestState,
rng: Rc<RefCell<SmallRng>>,
stream: S,
buffer: Box<[u8; 2048]>,
}
impl<S> Connection<S>
where
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
{
async fn run(&mut self, num_active_connections: Rc<RefCell<usize>>) -> anyhow::Result<()> {
*num_active_connections.borrow_mut() += 1; *num_active_connections.borrow_mut() += 1;
if let Err(err) = connection.run_connection_loop().await { let result = self.run_connection_loop().await;
if let Err(err) = &result {
::log::info!("connection error: {:?}", err); ::log::info!("connection error: {:?}", err);
} }
*num_active_connections.borrow_mut() -= 1; *num_active_connections.borrow_mut() -= 1;
Ok(()) result
} }
async fn run_connection_loop(&mut self) -> anyhow::Result<()> { async fn run_connection_loop(&mut self) -> anyhow::Result<()> {

View file

@ -244,23 +244,6 @@ impl ScrapeRequest {
} }
} }
#[derive(Debug)]
pub enum RequestParseError {
NeedMoreData,
Invalid(anyhow::Error),
}
impl ::std::fmt::Display for RequestParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NeedMoreData => write!(f, "Incomplete request, more data needed"),
Self::Invalid(err) => write!(f, "Invalid request: {:#}", err),
}
}
}
impl ::std::error::Error for RequestParseError {}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Request { pub enum Request {
Announce(AnnounceRequest), Announce(AnnounceRequest),
@ -269,20 +252,20 @@ pub enum Request {
impl Request { impl Request {
/// Parse Request from HTTP request bytes /// Parse Request from HTTP request bytes
pub fn from_bytes(bytes: &[u8]) -> Result<Self, RequestParseError> { pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Option<Self>> {
let mut headers = [httparse::EMPTY_HEADER; 16]; let mut headers = [httparse::EMPTY_HEADER; 16];
let mut http_request = httparse::Request::new(&mut headers); let mut http_request = httparse::Request::new(&mut headers);
match http_request.parse(bytes) { match http_request.parse(bytes) {
Ok(httparse::Status::Complete(_)) => { Ok(httparse::Status::Complete(_)) => {
if let Some(path) = http_request.path { if let Some(path) = http_request.path {
Self::from_http_get_path(path).map_err(RequestParseError::Invalid) Self::from_http_get_path(path).map(Some)
} else { } else {
Err(RequestParseError::Invalid(anyhow::anyhow!("no http path"))) Err(anyhow::anyhow!("no http path"))
} }
} }
Ok(httparse::Status::Partial) => Err(RequestParseError::NeedMoreData), Ok(httparse::Status::Partial) => Ok(None),
Err(err) => Err(RequestParseError::Invalid(anyhow::Error::from(err))), Err(err) => Err(anyhow::Error::from(err)),
} }
} }
@ -368,7 +351,7 @@ mod tests {
bytes.extend_from_slice(&ANNOUNCE_REQUEST_PATH.as_bytes()); bytes.extend_from_slice(&ANNOUNCE_REQUEST_PATH.as_bytes());
bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n");
let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap();
let reference_request = get_reference_announce_request(); let reference_request = get_reference_announce_request();
assert_eq!(parsed_request, reference_request); assert_eq!(parsed_request, reference_request);
@ -382,7 +365,7 @@ mod tests {
bytes.extend_from_slice(&SCRAPE_REQUEST_PATH.as_bytes()); bytes.extend_from_slice(&SCRAPE_REQUEST_PATH.as_bytes());
bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n");
let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap();
let reference_request = Request::Scrape(ScrapeRequest { let reference_request = Request::Scrape(ScrapeRequest {
info_hashes: vec![InfoHash(REFERENCE_INFO_HASH)], info_hashes: vec![InfoHash(REFERENCE_INFO_HASH)],
}); });
@ -449,7 +432,7 @@ mod tests {
request.write(&mut bytes, &[]).unwrap(); request.write(&mut bytes, &[]).unwrap();
let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap();
let success = request == parsed_request; let success = request == parsed_request;

View file

@ -229,11 +229,14 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
match &message { match &message {
tungstenite::Message::Text(_) | tungstenite::Message::Binary(_) => { tungstenite::Message::Text(_) | tungstenite::Message::Binary(_) => {
match InMessage::from_ws_message(message) { match InMessage::from_ws_message(message) {
Ok(in_message) => { Ok(InMessage::AnnounceRequest(request)) => {
self.handle_in_message(in_message).await?; self.handle_announce_request(request).await?;
}
Ok(InMessage::ScrapeRequest(request)) => {
self.handle_scrape_request(request).await?;
} }
Err(err) => { Err(err) => {
::log::debug!("Couldn't parse in_message: {:?}", err); ::log::debug!("Couldn't parse in_message: {:#}", err);
self.send_error_response("Invalid request".into(), None, None) self.send_error_response("Invalid request".into(), None, None)
.await?; .await?;
@ -261,171 +264,167 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
} }
} }
async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> { async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> {
match in_message { #[cfg(feature = "metrics")]
InMessage::AnnounceRequest(announce_request) => { ::metrics::increment_counter!(
#[cfg(feature = "metrics")] "aquatic_requests_total",
::metrics::increment_counter!( "type" => "announce",
"aquatic_requests_total", "ip_version" => ip_version_to_metrics_str(self.ip_version),
"type" => "announce", "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
"ip_version" => ip_version_to_metrics_str(self.ip_version), );
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hash = announce_request.info_hash; let info_hash = request.info_hash;
if self if self
.access_list_cache .access_list_cache
.load() .load()
.allows(self.config.access_list.mode, &info_hash.0) .allows(self.config.access_list.mode, &info_hash.0)
{ {
let mut announced_info_hashes = let mut announced_info_hashes = self.clean_up_data.announced_info_hashes.borrow_mut();
self.clean_up_data.announced_info_hashes.borrow_mut();
// Store peer id / check if stored peer id matches // Store peer id / check if stored peer id matches
match announced_info_hashes.entry(announce_request.info_hash) { match announced_info_hashes.entry(request.info_hash) {
Entry::Occupied(entry) => { Entry::Occupied(entry) => {
if *entry.get() != announce_request.peer_id { if *entry.get() != request.peer_id {
// Drop Rc borrow before awaiting // Drop Rc borrow before awaiting
drop(announced_info_hashes); drop(announced_info_hashes);
self.send_error_response( self.send_error_response(
"Only one peer id can be used per torrent".into(), "Only one peer id can be used per torrent".into(),
Some(ErrorResponseAction::Announce), Some(ErrorResponseAction::Announce),
Some(info_hash), Some(info_hash),
)
.await?;
return Err(anyhow::anyhow!(
"Peer used more than one PeerId for a single torrent"
));
}
}
Entry::Vacant(entry) => {
entry.insert(announce_request.peer_id);
// Set peer client info if not set
#[cfg(feature = "metrics")]
if self.config.metrics.run_prometheus_endpoint
&& self.config.metrics.peer_clients
&& self.clean_up_data.opt_peer_client.borrow().is_none()
{
let peer_id = aquatic_peer_id::PeerId(announce_request.peer_id.0);
let client = peer_id.client();
let prefix = peer_id.first_8_bytes_hex().to_string();
::metrics::increment_gauge!(
"aquatic_peer_clients",
1.0,
"client" => client.to_string(),
);
if self.config.metrics.peer_id_prefixes {
::metrics::increment_gauge!(
"aquatic_peer_id_prefixes",
1.0,
"prefix_hex" => prefix.to_string(),
);
}
*self.clean_up_data.opt_peer_client.borrow_mut() =
Some((client, prefix));
};
}
}
if let Some(AnnounceEvent::Stopped) = announce_request.event {
announced_info_hashes.remove(&announce_request.info_hash);
}
// Drop Rc borrow before awaiting
drop(announced_info_hashes);
let in_message = InMessage::AnnounceRequest(announce_request);
let consumer_index =
calculate_in_message_consumer_index(&self.config, info_hash);
// Only fails when receiver is closed
self.in_message_senders
.send_to(
consumer_index,
(self.make_connection_meta(None), in_message),
) )
.await .await?;
.unwrap();
} else { return Err(anyhow::anyhow!(
self.send_error_response( "Peer used more than one PeerId for a single torrent"
"Info hash not allowed".into(), ));
Some(ErrorResponseAction::Announce), }
Some(info_hash), }
) Entry::Vacant(entry) => {
.await?; entry.insert(request.peer_id);
// Set peer client info if not set
#[cfg(feature = "metrics")]
if self.config.metrics.run_prometheus_endpoint
&& self.config.metrics.peer_clients
&& self.clean_up_data.opt_peer_client.borrow().is_none()
{
let peer_id = aquatic_peer_id::PeerId(request.peer_id.0);
let client = peer_id.client();
let prefix = peer_id.first_8_bytes_hex().to_string();
::metrics::increment_gauge!(
"aquatic_peer_clients",
1.0,
"client" => client.to_string(),
);
if self.config.metrics.peer_id_prefixes {
::metrics::increment_gauge!(
"aquatic_peer_id_prefixes",
1.0,
"prefix_hex" => prefix.to_string(),
);
}
*self.clean_up_data.opt_peer_client.borrow_mut() = Some((client, prefix));
};
} }
} }
InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hashes = if let Some(info_hashes) = info_hashes { if let Some(AnnounceEvent::Stopped) = request.event {
info_hashes announced_info_hashes.remove(&request.info_hash);
} else {
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
self.send_error_response(
"Full scrapes are not allowed".into(),
Some(ErrorResponseAction::Scrape),
None,
)
.await?;
return Ok(());
};
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.as_vec() {
let info_hashes = info_hashes_by_worker
.entry(calculate_in_message_consumer_index(&self.config, info_hash))
.or_default();
info_hashes.push(info_hash);
}
let pending_worker_out_messages = info_hashes_by_worker.len();
let pending_scrape_response = PendingScrapeResponse {
pending_worker_out_messages,
stats: Default::default(),
};
let pending_scrape_id: u8 = self
.pending_scrape_slab
.borrow_mut()
.insert(pending_scrape_response)
.try_into()
.with_context(|| "Reached 256 pending scrape responses")?;
let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id)));
for (consumer_index, info_hashes) in info_hashes_by_worker {
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
action: ScrapeAction::Scrape,
info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)),
});
// Only fails when receiver is closed
self.in_message_senders
.send_to(consumer_index, (meta, in_message))
.await
.unwrap();
}
} }
// Drop Rc borrow before awaiting
drop(announced_info_hashes);
let in_message = InMessage::AnnounceRequest(request);
let consumer_index = calculate_in_message_consumer_index(&self.config, info_hash);
// Only fails when receiver is closed
self.in_message_senders
.send_to(
consumer_index,
(self.make_connection_meta(None), in_message),
)
.await
.unwrap();
} else {
self.send_error_response(
"Info hash not allowed".into(),
Some(ErrorResponseAction::Announce),
Some(info_hash),
)
.await?;
}
Ok(())
}
async fn handle_scrape_request(&mut self, request: ScrapeRequest) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
::metrics::increment_counter!(
"aquatic_requests_total",
"type" => "scrape",
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
let info_hashes = if let Some(info_hashes) = request.info_hashes {
info_hashes
} else {
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
self.send_error_response(
"Full scrapes are not allowed".into(),
Some(ErrorResponseAction::Scrape),
None,
)
.await?;
return Ok(());
};
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.as_vec() {
let info_hashes = info_hashes_by_worker
.entry(calculate_in_message_consumer_index(&self.config, info_hash))
.or_default();
info_hashes.push(info_hash);
}
let pending_worker_out_messages = info_hashes_by_worker.len();
let pending_scrape_response = PendingScrapeResponse {
pending_worker_out_messages,
stats: Default::default(),
};
let pending_scrape_id: u8 = self
.pending_scrape_slab
.borrow_mut()
.insert(pending_scrape_response)
.try_into()
.with_context(|| "Reached 256 pending scrape responses")?;
let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id)));
for (consumer_index, info_hashes) in info_hashes_by_worker {
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
action: ScrapeAction::Scrape,
info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)),
});
// Only fails when receiver is closed
self.in_message_senders
.send_to(consumer_index, (meta, in_message))
.await
.unwrap();
} }
Ok(()) Ok(())
@ -485,31 +484,28 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
.pending_scrape_id .pending_scrape_id
.expect("meta.pending_scrape_id not set"); .expect("meta.pending_scrape_id not set");
let finished = if let Some(pending) = Slab::get_mut( let mut pending_responses = self.pending_scrape_slab.borrow_mut();
&mut RefCell::borrow_mut(&self.pending_scrape_slab),
pending_scrape_id.0 as usize,
) {
pending.stats.extend(out_message.files);
pending.pending_worker_out_messages -= 1;
pending.pending_worker_out_messages == 0 let pending_response = pending_responses
} else { .get_mut(pending_scrape_id.0 as usize)
return Err(anyhow::anyhow!("pending scrape not found in slab")); .ok_or(anyhow::anyhow!("pending scrape not found in slab"))?;
};
if finished { pending_response.stats.extend(out_message.files);
let out_message = { pending_response.pending_worker_out_messages -= 1;
let mut slab = RefCell::borrow_mut(&self.pending_scrape_slab);
let pending = slab.remove(pending_scrape_id.0 as usize); if pending_response.pending_worker_out_messages == 0 {
let pending_response =
pending_responses.remove(pending_scrape_id.0 as usize);
slab.shrink_to_fit(); pending_responses.shrink_to_fit();
OutMessage::ScrapeResponse(ScrapeResponse { let out_message = OutMessage::ScrapeResponse(ScrapeResponse {
action: ScrapeAction::Scrape, action: ScrapeAction::Scrape,
files: pending.stats, files: pending_response.stats,
}) });
};
// Drop Rc borrow before awaiting
drop(pending_responses);
self.send_out_message(&out_message).await?; self.send_out_message(&out_message).await?;
} }
@ -522,72 +518,63 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
} }
async fn send_out_message(&mut self, out_message: &OutMessage) -> anyhow::Result<()> { async fn send_out_message(&mut self, out_message: &OutMessage) -> anyhow::Result<()> {
let result = timeout(Duration::from_secs(10), async { timeout(Duration::from_secs(10), async {
let result = Ok(futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await)
futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await;
Ok(result)
}) })
.await; .await
.map_err(|err| {
anyhow::anyhow!("send_out_message: sending to peer took too long: {:#}", err)
})?
.with_context(|| "send_out_message")?;
match result { if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message {
Ok(Ok(())) => { *self.connection_valid_until.borrow_mut() = ValidUntil::new(
if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message self.server_start_instant,
{ self.config.cleaning.max_connection_idle,
*self.connection_valid_until.borrow_mut() = ValidUntil::new( );
self.server_start_instant,
self.config.cleaning.max_connection_idle,
);
}
#[cfg(feature = "metrics")]
{
let out_message_type = match &out_message {
OutMessage::OfferOutMessage(_) => "offer",
OutMessage::AnswerOutMessage(_) => "offer_answer",
OutMessage::AnnounceResponse(_) => "announce",
OutMessage::ScrapeResponse(_) => "scrape",
OutMessage::ErrorResponse(_) => "error",
};
::metrics::increment_counter!(
"aquatic_responses_total",
"type" => out_message_type,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
if let Some((peer_client, prefix)) =
self.clean_up_data.opt_peer_client.borrow().as_ref()
{
// As long as connection is still alive, increment peer client
// gauges by zero to prevent them from being removed due to
// idleness
::metrics::increment_gauge!(
"aquatic_peer_clients",
0.0,
"client" => peer_client.to_string(),
);
if self.config.metrics.peer_id_prefixes {
::metrics::increment_gauge!(
"aquatic_peer_id_prefixes",
0.0,
"prefix_hex" => prefix.to_string(),
);
}
}
}
Ok(())
}
Ok(Err(err)) => Err(err.into()),
Err(err) => Err(anyhow::anyhow!(
"send_out_message: sending to peer took too long: {}",
err
)),
} }
#[cfg(feature = "metrics")]
{
let out_message_type = match &out_message {
OutMessage::OfferOutMessage(_) => "offer",
OutMessage::AnswerOutMessage(_) => "offer_answer",
OutMessage::AnnounceResponse(_) => "announce",
OutMessage::ScrapeResponse(_) => "scrape",
OutMessage::ErrorResponse(_) => "error",
};
::metrics::increment_counter!(
"aquatic_responses_total",
"type" => out_message_type,
"ip_version" => ip_version_to_metrics_str(self.ip_version),
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
);
if let Some((peer_client, prefix)) =
self.clean_up_data.opt_peer_client.borrow().as_ref()
{
// As long as connection is still alive, increment peer client
// gauges by zero to prevent them from being removed due to
// idleness
::metrics::increment_gauge!(
"aquatic_peer_clients",
0.0,
"client" => peer_client.to_string(),
);
if self.config.metrics.peer_id_prefixes {
::metrics::increment_gauge!(
"aquatic_peer_id_prefixes",
0.0,
"prefix_hex" => prefix.to_string(),
);
}
}
}
Ok(())
} }
} }