Merge pull request #84 from greatest-ape/work-2022-07-18_2

upgrade dependencies; ws: make TLS optional, support reverse proxies, add HTTP health check route when running without TLS
This commit is contained in:
Joakim Frostegård 2022-07-19 18:04:09 +02:00 committed by GitHub
commit 97fa699476
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 310 additions and 194 deletions

View file

@ -82,10 +82,19 @@ echo "log_level = 'trace'
[network]
address = '127.0.0.1:3002'
enable_tls = true
tls_certificate_path = './cert.crt'
tls_private_key_path = './key.pk8'
" > ws-tls.toml
./target/debug/aquatic ws -c ws-tls.toml > "$HOME/ws-tls.log" 2>&1 &
echo "log_level = 'trace'
[network]
address = '127.0.0.1:3003'
enable_http_health_checks = true
" > ws.toml
./target/debug/aquatic ws -c ws.toml > "$HOME/wss.log" 2>&1 &
./target/debug/aquatic ws -c ws.toml > "$HOME/ws.log" 2>&1 &
# Setup directories
@ -100,21 +109,30 @@ mkdir torrents
# echo "http-test-ipv4" > seed/http-test-ipv4
echo "tls-test-ipv4" > seed/tls-test-ipv4
echo "udp-test-ipv4" > seed/udp-test-ipv4
echo "wss-test-ipv4" > seed/wss-test-ipv4
echo "ws-tls-test-ipv4" > seed/ws-tls-test-ipv4
echo "ws-test-ipv4" > seed/ws-test-ipv4
# mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4"
mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4"
mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4"
mktorrent -p -o "torrents/wss-ipv4.torrent" -a "wss://example.com:3002" "seed/wss-test-ipv4"
mktorrent -p -o "torrents/ws-tls-ipv4.torrent" -a "wss://example.com:3002" "seed/ws-tls-test-ipv4"
mktorrent -p -o "torrents/ws-ipv4.torrent" -a "ws://example.com:3003" "seed/ws-test-ipv4"
cp -r torrents torrents-seed
cp -r torrents torrents-leech
# Setup wss seeding client
# Setup ws-tls seeding client
echo "Starting seeding wss client"
echo "Starting seeding ws-tls (wss) client"
cd seed
GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/wss-ipv4.torrent > "$HOME/wss-seed.log" 2>&1 &
GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/ws-tls-ipv4.torrent > "$HOME/ws-tls-seed.log" 2>&1 &
cd ..
# Setup ws seeding client
echo "Starting seeding ws client"
cd seed
GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --seed ../torrents/ws-ipv4.torrent > "$HOME/ws-seed.log" 2>&1 &
cd ..
# Start seeding rtorrent client
@ -138,9 +156,14 @@ schedule2 = watch_directory,5,5,load.start=$HOME/torrents-leech/*.torrent" > ~/.
echo "Starting leeching client.."
screen -dmS rtorrent-leech rtorrent
echo "Starting leeching wss client"
echo "Starting leeching ws-tls (wss) client"
cd leech
GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43000" ../torrents/wss-ipv4.torrent > "$HOME/wss-leech.log" 2>&1 &
GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43000" ../torrents/ws-tls-ipv4.torrent > "$HOME/ws-tls-leech.log" 2>&1 &
cd ..
echo "Starting leeching ws client"
cd leech
GOPPROF=http GODEBUG=x509ignoreCN=0 $HOME/gotorrent download --dht=false --tcppeers=false --utppeers=false --pex=false --stats --addr ":43001" ../torrents/ws-ipv4.torrent > "$HOME/ws-leech.log" 2>&1 &
cd ..
# Check for completion
@ -148,7 +171,8 @@ cd ..
# HTTP_IPv4="Ok"
TLS_IPv4="Failed"
UDP_IPv4="Failed"
WSS_IPv4="Failed"
WS_TLS_IPv4="Failed"
WS_IPv4="Failed"
i="0"
@ -180,17 +204,24 @@ do
fi
fi
fi
if test -f "leech/wss-test-ipv4"; then
if grep -q "wss-test-ipv4" "leech/wss-test-ipv4"; then
if [ "$WSS_IPv4" != "Ok" ]; then
WSS_IPv4="Ok"
echo "WSS_IPv4 is Ok"
if test -f "leech/ws-tls-test-ipv4"; then
if grep -q "ws-tls-test-ipv4" "leech/ws-tls-test-ipv4"; then
if [ "$WS_TLS_IPv4" != "Ok" ]; then
WS_TLS_IPv4="Ok"
echo "WS_TLS_IPv4 is Ok"
fi
fi
fi
if test -f "leech/ws-test-ipv4"; then
if grep -q "ws-test-ipv4" "leech/ws-test-ipv4"; then
if [ "$WS_IPv4" != "Ok" ]; then
WS_IPv4="Ok"
echo "WS_IPv4 is Ok"
fi
fi
fi
# if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then
if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then
if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WS_TLS_IPv4" = "Ok" ] && [ "$WS_IPv4" = "Ok" ]; then
break
fi
@ -204,7 +235,8 @@ echo "Waited for $i seconds"
# echo "::set-output name=http_ipv4::$HTTP_IPv4"
echo "::set-output name=http_tls_ipv4::$TLS_IPv4"
echo "::set-output name=udp_ipv4::$UDP_IPv4"
echo "::set-output name=wss_ipv4::$WSS_IPv4"
echo "::set-output name=ws_tls_ipv4::$WS_TLS_IPv4"
echo "::set-output name=ws_ipv4::$WS_IPv4"
# echo ""
# echo "# --- HTTP log --- #"
@ -225,31 +257,48 @@ cat "udp.log"
sleep 1
echo ""
echo "# --- WSS tracker log --- #"
cat "wss.log"
echo "# --- WS over TLS tracker log --- #"
cat "ws-tls.log"
sleep 1
echo ""
echo "# --- WSS seed log --- #"
cat "wss-seed.log"
echo "# --- WS tracker log --- #"
cat "ws.log"
sleep 1
echo ""
echo "# --- WSS leech log --- #"
cat "wss-leech.log"
echo "# --- WS over TLS seed log --- #"
cat "ws-tls-seed.log"
sleep 1
echo ""
echo "# --- WS over TLS leech log --- #"
cat "ws-tls-leech.log"
sleep 1
echo ""
echo "# --- WS seed log --- #"
cat "ws-seed.log"
sleep 1
echo ""
echo "# --- WS leech log --- #"
cat "ws-leech.log"
sleep 1
echo ""
echo "# --- Test results --- #"
# echo "HTTP (IPv4): $HTTP_IPv4"
echo "HTTP over TLS (IPv4): $TLS_IPv4"
echo "UDP (IPv4): $UDP_IPv4"
echo "WSS (IPv4): $WSS_IPv4"
echo "WSS (IPv4): $WS_TLS_IPv4"
echo "WS (IPv4): $WS_IPv4"
# if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then
if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then
if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WS_TLS_IPv4" != "Ok" ] || [ "$WS_IPv4" != "Ok" ]; then
exit 1
fi

View file

@ -29,7 +29,7 @@ jobs:
cargo build --verbose -p aquatic_http
cargo build --verbose -p aquatic_ws
- name: Run tests
run: cargo test --verbose --workspace --all-targets
run: cargo test --verbose --workspace --profile "test-fast"
build-macos:

139
Cargo.lock generated
View file

@ -73,7 +73,7 @@ dependencies = [
"log",
"privdrop",
"rand",
"rustls 0.20.6",
"rustls",
"rustls-pemfile",
"serde",
"simple_logger",
@ -130,7 +130,7 @@ dependencies = [
"quickcheck_macros",
"rand",
"rand_distr",
"rustls 0.20.6",
"rustls",
"serde",
]
@ -150,13 +150,13 @@ dependencies = [
"log",
"mimalloc",
"rand",
"rustls 0.20.6",
"rustls",
"serde",
"signal-hook",
"socket2 0.4.4",
"sqlx",
"tokio",
"tokio-rustls 0.23.4",
"tokio-rustls",
]
[[package]]
@ -295,13 +295,14 @@ dependencies = [
"futures-rustls",
"glommio",
"hashbrown 0.12.3",
"httparse",
"log",
"mimalloc",
"privdrop",
"quickcheck",
"quickcheck_macros",
"rand",
"rustls 0.20.6",
"rustls",
"rustls-pemfile",
"serde",
"signal-hook",
@ -328,7 +329,7 @@ dependencies = [
"quickcheck_macros",
"rand",
"rand_distr",
"rustls 0.20.6",
"rustls",
"serde",
"serde_json",
"tungstenite",
@ -402,9 +403,9 @@ dependencies = [
[[package]]
name = "atoi"
version = "0.4.0"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5"
checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e"
dependencies = [
"num-traits",
]
@ -587,9 +588,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e"
[[package]]
name = "cache-padded"
@ -688,18 +689,18 @@ dependencies = [
[[package]]
name = "crc"
version = "2.1.0"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23"
checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "1.1.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403"
checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff"
[[package]]
name = "criterion"
@ -1102,8 +1103,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01fe9932a224b72b45336d96040aa86386d674a31d0af27d800ea7bc8ca97fe"
dependencies = [
"futures-io",
"rustls 0.20.6",
"webpki 0.22.0",
"rustls",
"webpki",
]
[[package]]
@ -1255,15 +1256,6 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -1276,11 +1268,11 @@ dependencies = [
[[package]]
name = "hashlink"
version = "0.7.0"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086"
dependencies = [
"hashbrown 0.11.2",
"hashbrown 0.12.3",
]
[[package]]
@ -1728,9 +1720,9 @@ dependencies = [
[[package]]
name = "num-bigint"
version = "0.3.3"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
@ -2236,19 +2228,6 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustls"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64",
"log",
"ring",
"sct 0.6.1",
"webpki 0.21.4",
]
[[package]]
name = "rustls"
version = "0.20.6"
@ -2257,8 +2236,8 @@ checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033"
dependencies = [
"log",
"ring",
"sct 0.7.0",
"webpki 0.22.0",
"sct",
"webpki",
]
[[package]]
@ -2297,16 +2276,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "sct"
version = "0.7.0"
@ -2458,9 +2427,12 @@ checksum = "04d2ecae5fcf33b122e2e6bd520a57ccf152d2dde3b38c71039df1a6867264ee"
[[package]]
name = "slab"
version = "0.4.6"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
@ -2538,9 +2510,9 @@ dependencies = [
[[package]]
name = "sqlx"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "551873805652ba0d912fec5bbb0f8b4cdd96baf8e2ebf5970e5671092966019b"
checksum = "1f82cbe94f41641d6c410ded25bbf5097c240cefdf8e3b06d04198d0a96af6a4"
dependencies = [
"sqlx-core",
"sqlx-macros",
@ -2548,9 +2520,9 @@ dependencies = [
[[package]]
name = "sqlx-core"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5"
checksum = "6b69bf218860335ddda60d6ce85ee39f6cf6e5630e300e19757d1de15886a093"
dependencies = [
"ahash",
"atoi",
@ -2580,7 +2552,8 @@ dependencies = [
"percent-encoding",
"rand",
"rsa",
"rustls 0.19.1",
"rustls",
"rustls-pemfile",
"sha-1",
"sha2",
"smallvec",
@ -2590,15 +2563,14 @@ dependencies = [
"thiserror",
"tokio-stream",
"url",
"webpki 0.21.4",
"webpki-roots",
]
[[package]]
name = "sqlx-macros"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc0fba2b0cae21fc00fe6046f8baa4c7fcb49e379f0f592b04696607f69ed2e1"
checksum = "f40c63177cf23d356b159b60acd27c54af7423f1736988502e36bae9a712118f"
dependencies = [
"dotenv",
"either",
@ -2615,13 +2587,13 @@ dependencies = [
[[package]]
name = "sqlx-rt"
version = "0.5.13"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4db708cd3e459078f85f39f96a00960bd841f66ee2a669e90bf36907f5a79aae"
checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f"
dependencies = [
"once_cell",
"tokio",
"tokio-rustls 0.22.0",
"tokio-rustls",
]
[[package]]
@ -2789,26 +2761,15 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [
"rustls 0.19.1",
"tokio",
"webpki 0.21.4",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.6",
"rustls",
"tokio",
"webpki 0.22.0",
"webpki",
]
[[package]]
@ -3132,16 +3093,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki"
version = "0.22.0"
@ -3154,11 +3105,11 @@ dependencies = [
[[package]]
name = "webpki-roots"
version = "0.21.1"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf"
dependencies = [
"webpki 0.21.4",
"webpki",
]
[[package]]

View file

@ -35,3 +35,7 @@ inherits = "release-debug"
[profile.release-debug]
inherits = "release"
debug = true
[profile.test-fast]
inherits = "release"
lto = false

View file

@ -17,7 +17,7 @@ of sub-implementations for different protocols:
|--------------|--------------------------------------------|------------------------------|
| aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) |
| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
| aquatic_ws | [WebTorrent] over TLS ([rustls], optional) | Linux 5.8+ (using [glommio]) |
Features at a glance:
@ -65,9 +65,9 @@ Generate configuration files. They come with comments and differ between protoco
Make adjustments to the files. You will likely want to adjust `address`
(listening address) under the `network` section.
Note that both `aquatic_http` and `aquatic_ws` require configuring TLS
certificate and private key files. More details are available in the
respective configuration files.
Note that both `aquatic_http` and `aquatic_ws` require configuring certificate
and private key files to run over TLS (which is optional for `aquatic_ws`).
More details are available in the respective configuration files.
#### Workers

View file

@ -2,6 +2,9 @@
## High priority
* ws
* add integration test for non-TLS configuration, maybe behind reverse proxy
## Medium priority
* quit whole program if any thread panics

View file

@ -10,6 +10,8 @@ use serde::Deserialize;
use aquatic_common::cli::LogLevel;
/// aquatic_http configuration
///
/// Does not support running behind a reverse proxy.
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {

View file

@ -31,6 +31,6 @@ rustls = "0.20"
serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.23"

View file

@ -29,6 +29,7 @@ futures-lite = "1"
futures-rustls = "0.22"
glommio = "0.7"
hashbrown = { version = "0.12", features = ["serde"] }
httparse = "1"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
privdrop = "0.5"

View file

@ -1,11 +1,28 @@
use std::sync::Arc;
use std::{net::IpAddr, sync::Arc};
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
pub use aquatic_common::ValidUntil;
use aquatic_ws_protocol::{InfoHash, PeerId};
#[derive(Copy, Clone, Debug)]
pub enum IpVersion {
V4,
V6,
}
impl IpVersion {
pub fn canonical_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => Self::V4,
IpAddr::V6(addr) => match addr.octets() {
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, _, _, _, _] => Self::V4,
_ => Self::V6,
},
}
}
}
#[derive(Default, Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
@ -17,7 +34,7 @@ pub struct PendingScrapeId(pub usize);
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct ConnectionId(pub usize);
#[derive(Clone, Copy, Debug)]
@ -26,7 +43,7 @@ pub struct ConnectionMeta {
/// sending back response through correct channel to correct worker.
pub out_message_consumer_id: ConsumerId,
pub connection_id: ConnectionId,
pub peer_addr: CanonicalSocketAddr,
pub ip_version: IpVersion,
pub pending_scrape_id: Option<PendingScrapeId>,
}
@ -35,6 +52,6 @@ pub enum SwarmControlMessage {
ConnectionClosed {
info_hash: InfoHash,
peer_id: PeerId,
peer_addr: CanonicalSocketAddr,
ip_version: IpVersion,
},
}

View file

@ -9,6 +9,9 @@ use aquatic_common::cli::LogLevel;
use aquatic_toml_config::TomlConfig;
/// aquatic_ws configuration
///
/// Running behind a reverse proxy is supported, but IPv4 peer requests have
/// to be proxied to IPv4 requests, and IPv6 requests to IPv6 requests.
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
@ -60,6 +63,8 @@ pub struct NetworkConfig {
/// Maximum number of pending TCP connections
pub tcp_backlog: i32,
/// Enable TLS
pub enable_tls: bool,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
@ -67,6 +72,10 @@ pub struct NetworkConfig {
pub websocket_max_message_size: usize,
pub websocket_max_frame_size: usize,
/// Return a HTTP 200 Ok response when receiving GET /health. Can not be
/// combined with enable_tls.
pub enable_http_health_checks: bool,
}
impl Default for NetworkConfig {
@ -76,11 +85,14 @@ impl Default for NetworkConfig {
only_ipv6: false,
tcp_backlog: 1024,
enable_tls: false,
tls_certificate_path: "".into(),
tls_private_key_path: "".into(),
websocket_max_message_size: 64 * 1024,
websocket_max_frame_size: 16 * 1024,
enable_http_health_checks: false,
}
}
}

View file

@ -4,6 +4,7 @@ pub mod workers;
use std::sync::Arc;
use anyhow::Context;
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex;
use aquatic_common::rustls_config::create_rustls_config;
@ -26,6 +27,12 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const SHARED_IN_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.network.enable_tls && config.network.enable_http_health_checks {
return Err(anyhow::anyhow!(
"configuration: network.enable_tls and network.enable_http_health_check can't both be set to true"
));
}
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
let state = State::default();
@ -41,10 +48,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let opt_tls_config = if config.network.enable_tls {
Some(Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
).with_context(|| "create rustls config")?))
} else {
None
};
let mut executors = Vec::new();
@ -52,7 +63,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let opt_tls_config = opt_tls_config.clone();
let control_mesh_builder = control_mesh_builder.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
@ -72,7 +83,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
sentinel,
config,
state,
tls_config,
opt_tls_config,
control_mesh_builder,
request_mesh_builder,
response_mesh_builder,

View file

@ -10,13 +10,12 @@ use anyhow::Context;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel};
use aquatic_common::PanicSentinel;
use aquatic_ws_protocol::*;
use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use futures::{AsyncWriteExt, StreamExt};
use futures_lite::future::race;
use futures_rustls::server::TlsStream;
use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
@ -48,14 +47,14 @@ struct ConnectionReference {
valid_until: ValidUntil,
peer_id: Option<PeerId>,
announced_info_hashes: HashSet<InfoHash>,
peer_addr: CanonicalSocketAddr,
ip_version: IpVersion,
}
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
tls_config: Arc<RustlsConfig>,
opt_tls_config: Option<Arc<RustlsConfig>>,
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
@ -115,13 +114,10 @@ pub async fn run_socket_worker(
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let peer_addr = match stream.peer_addr() {
Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr),
let ip_version = match stream.peer_addr() {
Ok(addr) => IpVersion::canonical_from_ip(addr.ip()),
Err(err) => {
::log::info!(
"could not extract peer address, closing connection: {:#}",
err
);
::log::info!("could not extract ip version (v4 or v6): {:#}", err);
continue;
}
@ -136,12 +132,12 @@ pub async fn run_socket_worker(
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
peer_id: None,
announced_info_hashes: Default::default(),
peer_addr,
ip_version,
});
::log::info!("accepting stream: {}", key);
::log::info!("accepting stream, assigning id {}", key);
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move {
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, opt_tls_config) async move {
if let Err(err) = run_connection(
config.clone(),
access_list,
@ -153,13 +149,15 @@ pub async fn run_socket_worker(
out_message_receiver,
out_message_consumer_id,
ConnectionId(key),
tls_config,
opt_tls_config,
ip_version,
stream,
peer_addr,
).await {
::log::debug!("Connection::run() error: {:?}", err);
::log::debug!("connection error: {:#}", err);
}
// Clean up after closed connection
// Remove reference in separate statement to avoid
// multiple RefCell borrows
let opt_reference = connection_slab.borrow_mut().try_remove(key);
@ -171,7 +169,7 @@ pub async fn run_socket_worker(
let message = SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
peer_addr: reference.peer_addr,
ip_version: reference.ip_version,
};
let consumer_index =
@ -268,13 +266,94 @@ async fn run_connection(
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
out_message_consumer_id: ConsumerId,
connection_id: ConnectionId,
tls_config: Arc<RustlsConfig>,
stream: TcpStream,
peer_addr: CanonicalSocketAddr,
opt_tls_config: Option<Arc<RustlsConfig>>,
ip_version: IpVersion,
mut stream: TcpStream,
) -> anyhow::Result<()> {
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
if let Some(tls_config) = opt_tls_config {
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
run_stream_agnostic_connection(
config.clone(),
access_list,
in_message_senders,
tq_prioritized,
tq_regular,
connection_slab.clone(),
out_message_sender,
out_message_receiver,
out_message_consumer_id,
connection_id,
stream,
ip_version,
)
.await
} else {
// Implementing this over TLS is too cumbersome, since the crate used
// for TLS streams doesn't support peek and tungstenite doesn't
// properly support sending a HTTP error response in accept_hdr
// callback.
if config.network.enable_http_health_checks {
let mut peek_buf = [0u8; 11];
stream
.peek(&mut peek_buf)
.await
.map_err(|err| anyhow::anyhow!("error peeking: {:#}", err))?;
if &peek_buf == b"GET /health" {
stream
.write_all(b"HTTP/1.1 200 Ok\r\nContent-Length: 2\r\n\r\nOk")
.await
.map_err(|err| {
anyhow::anyhow!("error sending health check response: {:#}", err)
})?;
stream.flush().await.map_err(|err| {
anyhow::anyhow!("error flushing health check response: {:#}", err)
})?;
return Err(anyhow::anyhow!(
"client requested health check, skipping websocket negotiation"
));
}
}
run_stream_agnostic_connection(
config.clone(),
access_list,
in_message_senders,
tq_prioritized,
tq_regular,
connection_slab.clone(),
out_message_sender,
out_message_receiver,
out_message_consumer_id,
connection_id,
stream,
ip_version,
)
.await
}
}
async fn run_stream_agnostic_connection<
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
>(
config: Rc<Config>,
access_list: Arc<AccessListArcSwap>,
in_message_senders: Rc<Senders<(ConnectionMeta, InMessage)>>,
tq_prioritized: TaskQueueHandle,
tq_regular: TaskQueueHandle,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
out_message_consumer_id: ConsumerId,
connection_id: ConnectionId,
stream: S,
ip_version: IpVersion,
) -> anyhow::Result<()> {
let ws_config = tungstenite::protocol::WebSocketConfig {
max_frame_size: Some(config.network.websocket_max_frame_size),
max_message_size: Some(config.network.websocket_max_message_size),
@ -299,7 +378,7 @@ async fn run_connection(
pending_scrape_slab,
out_message_consumer_id,
ws_in,
peer_addr,
ip_version,
connection_id,
};
@ -320,7 +399,6 @@ async fn run_connection(
connection_slab,
ws_out,
pending_scrape_slab,
peer_addr,
connection_id,
};
@ -336,7 +414,7 @@ async fn run_connection(
race(reader_handle, writer_handle).await.unwrap()
}
struct ConnectionReader {
struct ConnectionReader<S> {
config: Rc<Config>,
access_list_cache: AccessListCache,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
@ -344,12 +422,12 @@ struct ConnectionReader {
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
out_message_consumer_id: ConsumerId,
ws_in: SplitStream<WebSocketStream<TlsStream<TcpStream>>>,
peer_addr: CanonicalSocketAddr,
ws_in: SplitStream<WebSocketStream<S>>,
ip_version: IpVersion,
connection_id: ConnectionId,
}
impl ConnectionReader {
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
async fn run_in_message_loop(&mut self) -> anyhow::Result<()> {
loop {
::log::debug!("read_in_message");
@ -528,33 +606,28 @@ impl ConnectionReader {
ConnectionMeta {
connection_id: self.connection_id,
out_message_consumer_id: self.out_message_consumer_id,
peer_addr: self.peer_addr,
ip_version: self.ip_version,
pending_scrape_id,
}
}
}
struct ConnectionWriter {
struct ConnectionWriter<S> {
config: Rc<Config>,
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
ws_out: SplitSink<WebSocketStream<TlsStream<TcpStream>>, tungstenite::Message>,
ws_out: SplitSink<WebSocketStream<S>, tungstenite::Message>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
}
impl ConnectionWriter {
impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
async fn run_out_message_loop(&mut self) -> anyhow::Result<()> {
loop {
let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| {
anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed")
})?;
if meta.peer_addr != self.peer_addr {
return Err(anyhow::anyhow!("peer addresses didn't match"));
}
match out_message {
OutMessage::ScrapeResponse(out_message) => {
let pending_scrape_id = meta
@ -623,11 +696,7 @@ impl ConnectionWriter {
}
Ok(Err(err)) => Err(err.into()),
Err(err) => {
::log::info!(
"send_out_message: send to {} took to long: {}",
self.peer_addr.get(),
err
);
::log::info!("send_out_message: sending to peer took to long: {}", err);
Ok(())
}

View file

@ -208,14 +208,11 @@ where
SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
peer_addr,
ip_version,
} => {
::log::debug!(
"Removing peer {} from torrents because connection was closed",
peer_addr.get()
);
::log::debug!("Removing peer from torrents because connection was closed");
if peer_addr.is_ipv4() {
if let IpVersion::V4 = ip_version {
if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
}
@ -305,18 +302,18 @@ fn handle_announce_request(
request_sender_meta: ConnectionMeta,
request: AnnounceRequest,
) {
let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() {
let torrent_data: &mut TorrentData = if let IpVersion::V4 = request_sender_meta.ip_version {
torrent_maps.ipv4.entry(request.info_hash).or_default()
} else {
torrent_maps.ipv6.entry(request.info_hash).or_default()
};
// If there is already a peer with this peer_id, check that socket
// addr is same as that of request sender. Otherwise, ignore request.
// Since peers have access to each others peer_id's, they could send
// requests using them, causing all sorts of issues.
// If there is already a peer with this peer_id, check that connection id
// is same as that of request sender. Otherwise, ignore request. Since
// peers have access to each others peer_id's, they could send requests
// using them, causing all sorts of issues.
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr {
if request_sender_meta.connection_id != previous_peer.connection_meta.connection_id {
return;
}
}
@ -454,7 +451,7 @@ fn handle_scrape_request(
files: HashMap::with_capacity(num_to_take),
};
let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() {
let torrent_map: &mut TorrentMap = if let IpVersion::V4 = meta.ip_version {
&mut torrent_maps.ipv4
} else {
&mut torrent_maps.ipv6