diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index 1337eca..2fab330 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -82,10 +82,19 @@ echo "log_level = 'trace' [network] address = '127.0.0.1:3002' +enable_tls = true tls_certificate_path = './cert.crt' tls_private_key_path = './key.pk8' +" > ws-tls.toml +./target/debug/aquatic ws -c ws-tls.toml > "$HOME/ws-tls.log" 2>&1 & + +echo "log_level = 'trace' + +[network] +address = '127.0.0.1:3003' +enable_http_health_checks = true " > ws.toml -./target/debug/aquatic ws -c ws.toml > "$HOME/wss.log" 2>&1 & +./target/debug/aquatic ws -c ws.toml > "$HOME/ws.log" 2>&1 & # Setup directories @@ -100,21 +109,30 @@ mkdir torrents # echo "http-test-ipv4" > seed/http-test-ipv4 echo "tls-test-ipv4" > seed/tls-test-ipv4 echo "udp-test-ipv4" > seed/udp-test-ipv4 -echo "wss-test-ipv4" > seed/wss-test-ipv4 +echo "ws-tls-test-ipv4" > seed/ws-tls-test-ipv4 +echo "ws-test-ipv4" > seed/ws-test-ipv4 # mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4" mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4" mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4" -mktorrent -p -o "torrents/wss-ipv4.torrent" -a "wss://example.com:3002" "seed/wss-test-ipv4" +mktorrent -p -o "torrents/ws-tls-ipv4.torrent" -a "wss://example.com:3002" "seed/ws-tls-test-ipv4" +mktorrent -p -o "torrents/ws-ipv4.torrent" -a "ws://example.com:3003" "seed/ws-test-ipv4" cp -r torrents torrents-seed cp -r torrents torrents-leech -# Setup wss seeding client +# Setup ws-tls seeding client -echo "Starting seeding wss client" +echo "Starting seeding ws-tls (wss) client" cd seed -GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/wss-ipv4.torrent > "$HOME/wss-seed.log" 2>&1 & +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/ws-tls-ipv4.torrent > "$HOME/ws-tls-seed.log" 2>&1 & +cd .. + +# Setup ws seeding client + +echo "Starting seeding ws client" +cd seed +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/ws-ipv4.torrent > "$HOME/ws-seed.log" 2>&1 & cd .. # Start seeding rtorrent client @@ -138,9 +156,14 @@ schedule2 = watch_directory,5,5,load.start=$HOME/torrents-leech/*.torrent" > ~/. echo "Starting leeching client.." screen -dmS rtorrent-leech rtorrent -echo "Starting leeching wss client" +echo "Starting leeching ws-tls (wss) client" cd leech -GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43000" ../torrents/wss-ipv4.torrent > "$HOME/wss-leech.log" 2>&1 & +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43000" ../torrents/ws-tls-ipv4.torrent > "$HOME/ws-tls-leech.log" 2>&1 & +cd .. + +echo "Starting leeching ws client" +cd leech +GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43001" ../torrents/ws-ipv4.torrent > "$HOME/ws-leech.log" 2>&1 & cd .. # Check for completion @@ -148,7 +171,8 @@ cd .. # HTTP_IPv4="Ok" TLS_IPv4="Failed" UDP_IPv4="Failed" -WSS_IPv4="Failed" +WS_TLS_IPv4="Failed" +WS_IPv4="Failed" i="0" @@ -180,17 +204,24 @@ do fi fi fi - if test -f "leech/wss-test-ipv4"; then - if grep -q "wss-test-ipv4" "leech/wss-test-ipv4"; then - if [ "$WSS_IPv4" != "Ok" ]; then - WSS_IPv4="Ok" - echo "WSS_IPv4 is Ok" + if test -f "leech/ws-tls-test-ipv4"; then + if grep -q "ws-tls-test-ipv4" "leech/ws-tls-test-ipv4"; then + if [ "$WS_TLS_IPv4" != "Ok" ]; then + WS_TLS_IPv4="Ok" + echo "WS_TLS_IPv4 is Ok" + fi + fi + fi + if test -f "leech/ws-test-ipv4"; then + if grep -q "ws-test-ipv4" "leech/ws-test-ipv4"; then + if [ "$WS_IPv4" != "Ok" ]; then + WS_IPv4="Ok" + echo "WS_IPv4 is Ok" fi fi fi - # if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then - if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then + if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WS_TLS_IPv4" = "Ok" ] && [ "$WS_IPv4" = "Ok" ]; then break fi @@ -204,7 +235,8 @@ echo "Waited for $i seconds" # echo "::set-output name=http_ipv4::$HTTP_IPv4" echo "::set-output name=http_tls_ipv4::$TLS_IPv4" echo "::set-output name=udp_ipv4::$UDP_IPv4" -echo "::set-output name=wss_ipv4::$WSS_IPv4" +echo "::set-output name=ws_tls_ipv4::$WS_TLS_IPv4" +echo "::set-output name=ws_ipv4::$WS_IPv4" # echo "" # echo "# --- HTTP log --- #" @@ -225,31 +257,48 @@ cat "udp.log" sleep 1 echo "" -echo "# --- WSS tracker log --- #" -cat "wss.log" +echo "# --- WS over TLS tracker log --- #" +cat "ws-tls.log" sleep 1 echo "" -echo "# --- WSS seed log --- #" -cat "wss-seed.log" +echo "# --- WS tracker log --- #" +cat "ws.log" sleep 1 echo "" -echo "# --- WSS leech log --- #" -cat "wss-leech.log" +echo "# --- WS over TLS seed log --- #" +cat "ws-tls-seed.log" + +sleep 1 + +echo "" +echo "# --- WS over TLS leech log --- #" +cat "ws-tls-leech.log" + +sleep 1 + +echo "" +echo "# --- WS seed log --- #" +cat "ws-seed.log" + +sleep 1 + +echo "" +echo "# --- WS leech log --- #" +cat "ws-leech.log" sleep 1 echo "" echo "# --- Test results --- #" -# echo "HTTP (IPv4): $HTTP_IPv4" echo "HTTP over TLS (IPv4): $TLS_IPv4" echo "UDP (IPv4): $UDP_IPv4" -echo "WSS (IPv4): $WSS_IPv4" +echo "WSS (IPv4): $WS_TLS_IPv4" +echo "WS (IPv4): $WS_IPv4" -# if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then -if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then +if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WS_TLS_IPv4" != "Ok" ] || [ "$WS_IPv4" != "Ok" ]; then exit 1 fi diff --git a/.github/workflows/cargo-build-and-test.yml b/.github/workflows/cargo-build-and-test.yml index bd68943..4f3ede3 100644 --- a/.github/workflows/cargo-build-and-test.yml +++ b/.github/workflows/cargo-build-and-test.yml @@ -29,7 +29,7 @@ jobs: cargo build --verbose -p aquatic_http cargo build --verbose -p aquatic_ws - name: Run tests - run: cargo test --verbose --workspace --all-targets + run: cargo test --verbose --workspace --profile "test-fast" build-macos: diff --git a/Cargo.lock b/Cargo.lock index a2d79a8..7e83c12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -73,7 +73,7 @@ dependencies = [ "log", "privdrop", "rand", - "rustls 0.20.6", + "rustls", "rustls-pemfile", "serde", "simple_logger", @@ -130,7 +130,7 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", - "rustls 0.20.6", + "rustls", "serde", ] @@ -150,13 +150,13 @@ dependencies = [ "log", "mimalloc", "rand", - "rustls 0.20.6", + "rustls", "serde", "signal-hook", "socket2 0.4.4", "sqlx", "tokio", - "tokio-rustls 0.23.4", + "tokio-rustls", ] [[package]] @@ -295,13 +295,14 @@ dependencies = [ "futures-rustls", "glommio", "hashbrown 0.12.3", + "httparse", "log", "mimalloc", "privdrop", "quickcheck", "quickcheck_macros", "rand", - "rustls 0.20.6", + "rustls", "rustls-pemfile", "serde", "signal-hook", @@ -328,7 +329,7 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", - "rustls 0.20.6", + "rustls", "serde", "serde_json", "tungstenite", @@ -402,9 +403,9 @@ dependencies = [ [[package]] name = "atoi" -version = "0.4.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" dependencies = [ "num-traits", ] @@ -587,9 +588,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "cache-padded" @@ -688,18 +689,18 @@ dependencies = [ [[package]] name = "crc" -version = "2.1.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3" dependencies = [ "crc-catalog", ] [[package]] name = "crc-catalog" -version = "1.1.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" +checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" [[package]] name = "criterion" @@ -1102,8 +1103,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01fe9932a224b72b45336d96040aa86386d674a31d0af27d800ea7bc8ca97fe" dependencies = [ "futures-io", - "rustls 0.20.6", - "webpki 0.22.0", + "rustls", + "webpki", ] [[package]] @@ -1255,15 +1256,6 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -1276,11 +1268,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" dependencies = [ - "hashbrown 0.11.2", + "hashbrown 0.12.3", ] [[package]] @@ -1728,9 +1720,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.3.3" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" dependencies = [ "autocfg", "num-integer", @@ -2236,19 +2228,6 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.20.6" @@ -2257,8 +2236,8 @@ checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" dependencies = [ "log", "ring", - "sct 0.7.0", - "webpki 0.22.0", + "sct", + "webpki", ] [[package]] @@ -2297,16 +2276,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -2458,9 +2427,12 @@ checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee" [[package]] name = "slab" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] [[package]] name = "smallvec" @@ -2538,9 +2510,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551873805652ba0d912fec5bbb0f8b4cdd96baf8e2ebf5970e5671092966019b" +checksum = "1f82cbe94f41641d6c410ded25bbf5097c240cefdf8e3b06d04198d0a96af6a4" dependencies = [ "sqlx-core", "sqlx-macros", @@ -2548,9 +2520,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5" +checksum = "6b69bf218860335ddda60d6ce85ee39f6cf6e5630e300e19757d1de15886a093" dependencies = [ "ahash", "atoi", @@ -2580,7 +2552,8 @@ dependencies = [ "percent-encoding", "rand", "rsa", - "rustls 0.19.1", + "rustls", + "rustls-pemfile", "sha-1", "sha2", "smallvec", @@ -2590,15 +2563,14 @@ dependencies = [ "thiserror", "tokio-stream", "url", - "webpki 0.21.4", "webpki-roots", ] [[package]] name = "sqlx-macros" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc0fba2b0cae21fc00fe6046f8baa4c7fcb49e379f0f592b04696607f69ed2e1" +checksum = "f40c63177cf23d356b159b60acd27c54af7423f1736988502e36bae9a712118f" dependencies = [ "dotenv", "either", @@ -2615,13 +2587,13 @@ dependencies = [ [[package]] name = "sqlx-rt" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db708cd3e459078f85f39f96a00960bd841f66ee2a669e90bf36907f5a79aae" +checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f" dependencies = [ "once_cell", "tokio", - "tokio-rustls 0.22.0", + "tokio-rustls", ] [[package]] @@ -2789,26 +2761,15 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-rustls" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" -dependencies = [ - "rustls 0.19.1", - "tokio", - "webpki 0.21.4", -] - [[package]] name = "tokio-rustls" version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.6", + "rustls", "tokio", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -3132,16 +3093,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki" version = "0.22.0" @@ -3154,11 +3105,11 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.21.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf" dependencies = [ - "webpki 0.21.4", + "webpki", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2092976..0a7239c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,7 @@ inherits = "release-debug" [profile.release-debug] inherits = "release" debug = true + +[profile.test-fast] +inherits = "release" +lto = false diff --git a/README.md b/README.md index 20dc68f..f21c53b 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ of sub-implementations for different protocols: |--------------|--------------------------------------------|------------------------------| | aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) | | aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) | -| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) | +| aquatic_ws | [WebTorrent] over TLS ([rustls], optional) | Linux 5.8+ (using [glommio]) | Features at a glance: @@ -65,9 +65,9 @@ Generate configuration files. They come with comments and differ between protoco Make adjustments to the files. You will likely want to adjust `address` (listening address) under the `network` section. -Note that both `aquatic_http` and `aquatic_ws` require configuring TLS -certificate and private key files. More details are available in the -respective configuration files. +Note that both `aquatic_http` and `aquatic_ws` require configuring certificate +and private key files to run over TLS (which is optional for `aquatic_ws`). +More details are available in the respective configuration files. #### Workers diff --git a/TODO.md b/TODO.md index 5396993..1cc19ae 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,9 @@ ## High priority +* ws + * add integration test for non-TLS configuration, maybe behind reverse proxy + ## Medium priority * quit whole program if any thread panics diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index e4a6a35..95bc281 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -10,6 +10,8 @@ use serde::Deserialize; use aquatic_common::cli::LogLevel; /// aquatic_http configuration +/// +/// Does not support running behind a reverse proxy. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct Config { diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index 1994dbe..31b4d29 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -31,6 +31,6 @@ rustls = "0.20" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } socket2 = { version = "0.4", features = ["all"] } -sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } +sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "mysql" ] } tokio = { version = "1", features = ["full"] } tokio-rustls = "0.23" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index c0d3724..4ea8a6b 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -29,6 +29,7 @@ futures-lite = "1" futures-rustls = "0.22" glommio = "0.7" hashbrown = { version = "0.12", features = ["serde"] } +httparse = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } privdrop = "0.5" diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index 27aa134..a099e68 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -1,11 +1,28 @@ -use std::sync::Arc; +use std::{net::IpAddr, sync::Arc}; use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; use aquatic_ws_protocol::{InfoHash, PeerId}; +#[derive(Copy, Clone, Debug)] +pub enum IpVersion { + V4, + V6, +} + +impl IpVersion { + pub fn canonical_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => Self::V4, + IpAddr::V6(addr) => match addr.octets() { + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, _, _, _, _] => Self::V4, + _ => Self::V6, + }, + } + } +} + #[derive(Default, Clone)] pub struct State { pub access_list: Arc, @@ -17,7 +34,7 @@ pub struct PendingScrapeId(pub usize); #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub struct ConnectionId(pub usize); #[derive(Clone, Copy, Debug)] @@ -26,7 +43,7 @@ pub struct ConnectionMeta { /// sending back response through correct channel to correct worker. pub out_message_consumer_id: ConsumerId, pub connection_id: ConnectionId, - pub peer_addr: CanonicalSocketAddr, + pub ip_version: IpVersion, pub pending_scrape_id: Option, } @@ -35,6 +52,6 @@ pub enum SwarmControlMessage { ConnectionClosed { info_hash: InfoHash, peer_id: PeerId, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, }, } diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 7f051a1..b295aa3 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -9,6 +9,9 @@ use aquatic_common::cli::LogLevel; use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration +/// +/// Running behind a reverse proxy is supported, but IPv4 peer requests have +/// to be proxied to IPv4 requests, and IPv6 requests to IPv6 requests. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct Config { @@ -60,6 +63,8 @@ pub struct NetworkConfig { /// Maximum number of pending TCP connections pub tcp_backlog: i32, + /// Enable TLS + pub enable_tls: bool, /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) @@ -67,6 +72,10 @@ pub struct NetworkConfig { pub websocket_max_message_size: usize, pub websocket_max_frame_size: usize, + + /// Return a HTTP 200 Ok response when receiving GET /health. Can not be + /// combined with enable_tls. + pub enable_http_health_checks: bool, } impl Default for NetworkConfig { @@ -76,11 +85,14 @@ impl Default for NetworkConfig { only_ipv6: false, tcp_backlog: 1024, + enable_tls: false, tls_certificate_path: "".into(), tls_private_key_path: "".into(), websocket_max_message_size: 64 * 1024, websocket_max_frame_size: 16 * 1024, + + enable_http_health_checks: false, } } } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 66cc57b..7d48a08 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -4,6 +4,7 @@ pub mod workers; use std::sync::Arc; +use anyhow::Context; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use aquatic_common::rustls_config::create_rustls_config; @@ -26,6 +27,12 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config) -> ::anyhow::Result<()> { + if config.network.enable_tls && config.network.enable_http_health_checks { + return Err(anyhow::anyhow!( + "configuration: network.enable_tls and network.enable_http_health_check can't both be set to true" + )); + } + let mut signals = Signals::new([SIGUSR1, SIGTERM])?; let state = State::default(); @@ -41,10 +48,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let tls_config = Arc::new(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?); + let opt_tls_config = if config.network.enable_tls { + Some(Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ).with_context(|| "create rustls config")?)) + } else { + None + }; let mut executors = Vec::new(); @@ -52,7 +63,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); - let tls_config = tls_config.clone(); + let opt_tls_config = opt_tls_config.clone(); let control_mesh_builder = control_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); @@ -72,7 +83,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, - tls_config, + opt_tls_config, control_mesh_builder, request_mesh_builder, response_mesh_builder, diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index e21a72d..fdfd5a9 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -10,13 +10,12 @@ use anyhow::Context; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel}; +use aquatic_common::PanicSentinel; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; -use futures::StreamExt; +use futures::{AsyncWriteExt, StreamExt}; use futures_lite::future::race; -use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; @@ -48,14 +47,14 @@ struct ConnectionReference { valid_until: ValidUntil, peer_id: Option, announced_info_hashes: HashSet, - peer_addr: CanonicalSocketAddr, + ip_version: IpVersion, } pub async fn run_socket_worker( _sentinel: PanicSentinel, config: Config, state: State, - tls_config: Arc, + opt_tls_config: Option>, control_message_mesh_builder: MeshBuilder, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, @@ -115,13 +114,10 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - let peer_addr = match stream.peer_addr() { - Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr), + let ip_version = match stream.peer_addr() { + Ok(addr) => IpVersion::canonical_from_ip(addr.ip()), Err(err) => { - ::log::info!( - "could not extract peer address, closing connection: {:#}", - err - ); + ::log::info!("could not extract ip version (v4 or v6): {:#}", err); continue; } @@ -136,12 +132,12 @@ pub async fn run_socket_worker( valid_until: ValidUntil::new(config.cleaning.max_connection_idle), peer_id: None, announced_info_hashes: Default::default(), - peer_addr, + ip_version, }); - ::log::info!("accepting stream: {}", key); + ::log::info!("accepting stream, assigning id {}", key); - let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move { + let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move { if let Err(err) = run_connection( config.clone(), access_list, @@ -153,13 +149,15 @@ pub async fn run_socket_worker( out_message_receiver, out_message_consumer_id, ConnectionId(key), - tls_config, + opt_tls_config, + ip_version, stream, - peer_addr, ).await { - ::log::debug!("Connection::run() error: {:?}", err); + ::log::debug!("connection error: {:#}", err); } + // Clean up after closed connection + // Remove reference in separate statement to avoid // multiple RefCell borrows let opt_reference = connection_slab.borrow_mut().try_remove(key); @@ -171,7 +169,7 @@ pub async fn run_socket_worker( let message = SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr: reference.peer_addr, + ip_version: reference.ip_version, }; let consumer_index = @@ -268,13 +266,94 @@ async fn run_connection( out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, - tls_config: Arc, - stream: TcpStream, - peer_addr: CanonicalSocketAddr, + opt_tls_config: Option>, + ip_version: IpVersion, + mut stream: TcpStream, ) -> anyhow::Result<()> { - let tls_acceptor: TlsAcceptor = tls_config.into(); - let stream = tls_acceptor.accept(stream).await?; + if let Some(tls_config) = opt_tls_config { + let tls_acceptor: TlsAcceptor = tls_config.into(); + let stream = tls_acceptor.accept(stream).await?; + + run_stream_agnostic_connection( + config.clone(), + access_list, + in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), + out_message_sender, + out_message_receiver, + out_message_consumer_id, + connection_id, + stream, + ip_version, + ) + .await + } else { + // Implementing this over TLS is too cumbersome, since the crate used + // for TLS streams doesn't support peek and tungstenite doesn't + // properly support sending a HTTP error response in accept_hdr + // callback. + if config.network.enable_http_health_checks { + let mut peek_buf = [0u8; 11]; + + stream + .peek(&mut peek_buf) + .await + .map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?; + + if &peek_buf == b"GET /health" { + stream + .write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk") + .await + .map_err(|err| { + anyhow::anyhow!("error sending health check response: {:#}", err) + })?; + stream.flush().await.map_err(|err| { + anyhow::anyhow!("error flushing health check response: {:#}", err) + })?; + + return Err(anyhow::anyhow!( + "client requested health check, skipping websocket negotiation" + )); + } + } + + run_stream_agnostic_connection( + config.clone(), + access_list, + in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), + out_message_sender, + out_message_receiver, + out_message_consumer_id, + connection_id, + stream, + ip_version, + ) + .await + } +} + +async fn run_stream_agnostic_connection< + S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, +>( + config: Rc, + access_list: Arc, + in_message_senders: Rc>, + tq_prioritized: TaskQueueHandle, + tq_regular: TaskQueueHandle, + connection_slab: Rc>>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_consumer_id: ConsumerId, + connection_id: ConnectionId, + stream: S, + ip_version: IpVersion, +) -> anyhow::Result<()> { let ws_config = tungstenite::protocol::WebSocketConfig { max_frame_size: Some(config.network.websocket_max_frame_size), max_message_size: Some(config.network.websocket_max_message_size), @@ -299,7 +378,7 @@ async fn run_connection( pending_scrape_slab, out_message_consumer_id, ws_in, - peer_addr, + ip_version, connection_id, }; @@ -320,7 +399,6 @@ async fn run_connection( connection_slab, ws_out, pending_scrape_slab, - peer_addr, connection_id, }; @@ -336,7 +414,7 @@ async fn run_connection( race(reader_handle, writer_handle).await.unwrap() } -struct ConnectionReader { +struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, connection_slab: Rc>>, @@ -344,12 +422,12 @@ struct ConnectionReader { out_message_sender: Rc>, pending_scrape_slab: Rc>>, out_message_consumer_id: ConsumerId, - ws_in: SplitStream>>, - peer_addr: CanonicalSocketAddr, + ws_in: SplitStream>, + ip_version: IpVersion, connection_id: ConnectionId, } -impl ConnectionReader { +impl ConnectionReader { async fn run_in_message_loop(&mut self) -> anyhow::Result<()> { loop { ::log::debug!("read_in_message"); @@ -528,33 +606,28 @@ impl ConnectionReader { ConnectionMeta { connection_id: self.connection_id, out_message_consumer_id: self.out_message_consumer_id, - peer_addr: self.peer_addr, + ip_version: self.ip_version, pending_scrape_id, } } } -struct ConnectionWriter { +struct ConnectionWriter { config: Rc, out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, connection_slab: Rc>>, - ws_out: SplitSink>, tungstenite::Message>, + ws_out: SplitSink, tungstenite::Message>, pending_scrape_slab: Rc>>, - peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, } -impl ConnectionWriter { +impl ConnectionWriter { async fn run_out_message_loop(&mut self) -> anyhow::Result<()> { loop { let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| { anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed") })?; - if meta.peer_addr != self.peer_addr { - return Err(anyhow::anyhow!("peer addresses didn't match")); - } - match out_message { OutMessage::ScrapeResponse(out_message) => { let pending_scrape_id = meta @@ -623,11 +696,7 @@ impl ConnectionWriter { } Ok(Err(err)) => Err(err.into()), Err(err) => { - ::log::info!( - "send_out_message: send to {} took to long: {}", - self.peer_addr.get(), - err - ); + ::log::info!("send_out_message: sending to peer took to long: {}", err); Ok(()) } diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 0ea07ac..22a9f02 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -208,14 +208,11 @@ where SwarmControlMessage::ConnectionClosed { info_hash, peer_id, - peer_addr, + ip_version, } => { - ::log::debug!( - "Removing peer {} from torrents because connection was closed", - peer_addr.get() - ); + ::log::debug!("Removing peer from torrents because connection was closed"); - if peer_addr.is_ipv4() { + if let IpVersion::V4 = ip_version { if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) { torrent_data.remove_peer(peer_id); } @@ -305,18 +302,18 @@ fn handle_announce_request( request_sender_meta: ConnectionMeta, request: AnnounceRequest, ) { - let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() { + let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version { torrent_maps.ipv4.entry(request.info_hash).or_default() } else { torrent_maps.ipv6.entry(request.info_hash).or_default() }; - // If there is already a peer with this peer_id, check that socket - // addr is same as that of request sender. Otherwise, ignore request. - // Since peers have access to each others peer_id's, they could send - // requests using them, causing all sorts of issues. + // If there is already a peer with this peer_id, check that connection id + // is same as that of request sender. Otherwise, ignore request. Since + // peers have access to each others peer_id's, they could send requests + // using them, causing all sorts of issues. if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) { - if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr { + if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id { return; } } @@ -454,7 +451,7 @@ fn handle_scrape_request( files: HashMap::with_capacity(num_to_take), }; - let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() { + let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version { &mut torrent_maps.ipv4 } else { &mut torrent_maps.ipv6