mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
Merge pull request #176 from greatest-ape/work-2023-01-26
Improve http swarm worker; add plain HTTP transfer CI testing; cargo update
This commit is contained in:
commit
96ec1f659b
8 changed files with 392 additions and 256 deletions
|
|
@ -42,8 +42,7 @@ $SUDO echo "127.0.0.1 example.com" >> /etc/hosts
|
||||||
openssl ecparam -genkey -name prime256v1 -out key.pem
|
openssl ecparam -genkey -name prime256v1 -out key.pem
|
||||||
openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com"
|
openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com"
|
||||||
openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt
|
openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt
|
||||||
openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8 # rustls
|
openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8
|
||||||
openssl pkcs12 -export -passout "pass:p" -out identity.pfx -inkey key.pem -in cert.crt
|
|
||||||
|
|
||||||
$SUDO cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt
|
$SUDO cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt
|
||||||
$SUDO update-ca-certificates
|
$SUDO update-ca-certificates
|
||||||
|
|
@ -52,11 +51,11 @@ $SUDO update-ca-certificates
|
||||||
|
|
||||||
cargo build --bin aquatic
|
cargo build --bin aquatic
|
||||||
|
|
||||||
# echo "log_level = 'debug'
|
echo "log_level = 'debug'
|
||||||
#
|
|
||||||
# [network]
|
[network]
|
||||||
# address = '127.0.0.1:3000'" > http.toml
|
address = '127.0.0.1:3004'" > http.toml
|
||||||
# ./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 &
|
./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 &
|
||||||
|
|
||||||
echo "log_level = 'debug'
|
echo "log_level = 'debug'
|
||||||
|
|
||||||
|
|
@ -69,13 +68,13 @@ tls_private_key_path = './key.pk8'
|
||||||
./target/debug/aquatic http -c tls.toml > "$HOME/tls.log" 2>&1 &
|
./target/debug/aquatic http -c tls.toml > "$HOME/tls.log" 2>&1 &
|
||||||
|
|
||||||
echo "
|
echo "
|
||||||
log_level = 'trace'
|
log_level = 'debug'
|
||||||
|
|
||||||
[network]
|
[network]
|
||||||
address = '127.0.0.1:3000'" > udp.toml
|
address = '127.0.0.1:3000'" > udp.toml
|
||||||
./target/debug/aquatic udp -c udp.toml > "$HOME/udp.log" 2>&1 &
|
./target/debug/aquatic udp -c udp.toml > "$HOME/udp.log" 2>&1 &
|
||||||
|
|
||||||
echo "log_level = 'trace'
|
echo "log_level = 'debug'
|
||||||
|
|
||||||
[network]
|
[network]
|
||||||
address = '127.0.0.1:3002'
|
address = '127.0.0.1:3002'
|
||||||
|
|
@ -85,7 +84,7 @@ tls_private_key_path = './key.pk8'
|
||||||
" > ws-tls.toml
|
" > ws-tls.toml
|
||||||
./target/debug/aquatic ws -c ws-tls.toml > "$HOME/ws-tls.log" 2>&1 &
|
./target/debug/aquatic ws -c ws-tls.toml > "$HOME/ws-tls.log" 2>&1 &
|
||||||
|
|
||||||
echo "log_level = 'trace'
|
echo "log_level = 'debug'
|
||||||
|
|
||||||
[network]
|
[network]
|
||||||
address = '127.0.0.1:3003'
|
address = '127.0.0.1:3003'
|
||||||
|
|
@ -103,13 +102,13 @@ mkdir torrents
|
||||||
|
|
||||||
# Create torrents
|
# Create torrents
|
||||||
|
|
||||||
# echo "http-test-ipv4" > seed/http-test-ipv4
|
echo "http-test-ipv4" > seed/http-test-ipv4
|
||||||
echo "tls-test-ipv4" > seed/tls-test-ipv4
|
echo "tls-test-ipv4" > seed/tls-test-ipv4
|
||||||
echo "udp-test-ipv4" > seed/udp-test-ipv4
|
echo "udp-test-ipv4" > seed/udp-test-ipv4
|
||||||
echo "ws-tls-test-ipv4" > seed/ws-tls-test-ipv4
|
echo "ws-tls-test-ipv4" > seed/ws-tls-test-ipv4
|
||||||
echo "ws-test-ipv4" > seed/ws-test-ipv4
|
echo "ws-test-ipv4" > seed/ws-test-ipv4
|
||||||
|
|
||||||
# mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4"
|
mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3004/announce" "seed/http-test-ipv4"
|
||||||
mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4"
|
mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4"
|
||||||
mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4"
|
mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4"
|
||||||
mktorrent -p -o "torrents/ws-tls-ipv4.torrent" -a "wss://example.com:3002" "seed/ws-tls-test-ipv4"
|
mktorrent -p -o "torrents/ws-tls-ipv4.torrent" -a "wss://example.com:3002" "seed/ws-tls-test-ipv4"
|
||||||
|
|
@ -165,7 +164,7 @@ cd ..
|
||||||
|
|
||||||
# Check for completion
|
# Check for completion
|
||||||
|
|
||||||
# HTTP_IPv4="Ok"
|
HTTP_IPv4="Failed"
|
||||||
TLS_IPv4="Failed"
|
TLS_IPv4="Failed"
|
||||||
UDP_IPv4="Failed"
|
UDP_IPv4="Failed"
|
||||||
WS_TLS_IPv4="Failed"
|
WS_TLS_IPv4="Failed"
|
||||||
|
|
@ -177,14 +176,14 @@ echo "Watching for finished files.."
|
||||||
|
|
||||||
while [ $i -lt 60 ]
|
while [ $i -lt 60 ]
|
||||||
do
|
do
|
||||||
# if test -f "leech/http-test-ipv4"; then
|
if test -f "leech/http-test-ipv4"; then
|
||||||
# if grep -q "http-test-ipv4" "leech/http-test-ipv4"; then
|
if grep -q "http-test-ipv4" "leech/http-test-ipv4"; then
|
||||||
# if [ "$HTTP_IPv4" != "Ok" ]; then
|
if [ "$HTTP_IPv4" != "Ok" ]; then
|
||||||
# HTTP_IPv4="Ok"
|
HTTP_IPv4="Ok"
|
||||||
# echo "HTTP_IPv4 is Ok"
|
echo "HTTP_IPv4 is Ok"
|
||||||
# fi
|
fi
|
||||||
# fi
|
fi
|
||||||
# fi
|
fi
|
||||||
if test -f "leech/tls-test-ipv4"; then
|
if test -f "leech/tls-test-ipv4"; then
|
||||||
if grep -q "tls-test-ipv4" "leech/tls-test-ipv4"; then
|
if grep -q "tls-test-ipv4" "leech/tls-test-ipv4"; then
|
||||||
if [ "$TLS_IPv4" != "Ok" ]; then
|
if [ "$TLS_IPv4" != "Ok" ]; then
|
||||||
|
|
@ -218,7 +217,7 @@ do
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WS_TLS_IPv4" = "Ok" ] && [ "$WS_IPv4" = "Ok" ]; then
|
if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WS_TLS_IPv4" = "Ok" ] && [ "$WS_IPv4" = "Ok" ]; then
|
||||||
break
|
break
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
@ -229,15 +228,15 @@ done
|
||||||
|
|
||||||
echo "Waited for $i seconds"
|
echo "Waited for $i seconds"
|
||||||
|
|
||||||
# echo "::set-output name=http_ipv4::$HTTP_IPv4"
|
echo "::set-output name=http_ipv4::$HTTP_IPv4"
|
||||||
echo "::set-output name=http_tls_ipv4::$TLS_IPv4"
|
echo "::set-output name=http_tls_ipv4::$TLS_IPv4"
|
||||||
echo "::set-output name=udp_ipv4::$UDP_IPv4"
|
echo "::set-output name=udp_ipv4::$UDP_IPv4"
|
||||||
echo "::set-output name=ws_tls_ipv4::$WS_TLS_IPv4"
|
echo "::set-output name=ws_tls_ipv4::$WS_TLS_IPv4"
|
||||||
echo "::set-output name=ws_ipv4::$WS_IPv4"
|
echo "::set-output name=ws_ipv4::$WS_IPv4"
|
||||||
|
|
||||||
# echo ""
|
echo ""
|
||||||
# echo "# --- HTTP log --- #"
|
echo "# --- HTTP log --- #"
|
||||||
# cat "http.log"
|
cat "http.log"
|
||||||
|
|
||||||
sleep 1
|
sleep 1
|
||||||
|
|
||||||
|
|
@ -291,11 +290,12 @@ sleep 1
|
||||||
|
|
||||||
echo ""
|
echo ""
|
||||||
echo "# --- Test results --- #"
|
echo "# --- Test results --- #"
|
||||||
echo "HTTP over TLS (IPv4): $TLS_IPv4"
|
echo "HTTP: $HTTP_IPv4"
|
||||||
echo "UDP (IPv4): $UDP_IPv4"
|
echo "HTTP (TLS): $TLS_IPv4"
|
||||||
echo "WSS (IPv4): $WS_TLS_IPv4"
|
echo "UDP: $UDP_IPv4"
|
||||||
echo "WS (IPv4): $WS_IPv4"
|
echo "WebTorrent (TLS): $WS_TLS_IPv4"
|
||||||
|
echo "WebTorrent: $WS_IPv4"
|
||||||
|
|
||||||
if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WS_TLS_IPv4" != "Ok" ] || [ "$WS_IPv4" != "Ok" ]; then
|
if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WS_TLS_IPv4" != "Ok" ] || [ "$WS_IPv4" != "Ok" ]; then
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
|
||||||
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
|
@ -69,7 +69,7 @@ jobs:
|
||||||
|
|
||||||
test-file-transfers:
|
test-file-transfers:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
name: "Test BitTorrent file transfers over UDP, TLS and WSS"
|
name: "Test BitTorrent file transfers (UDP, HTTP, WebTorrent)"
|
||||||
timeout-minutes: 20
|
timeout-minutes: 20
|
||||||
container:
|
container:
|
||||||
image: rust:1-bullseye
|
image: rust:1-bullseye
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,8 @@
|
||||||
|
|
||||||
* Index peers by packet source IP and provided port instead of by source ip
|
* Index peers by packet source IP and provided port instead of by source ip
|
||||||
and peer id. This is likely slightly faster.
|
and peer id. This is likely slightly faster.
|
||||||
|
* Avoid a heap allocation for torrents with four or less peers. This can save
|
||||||
|
a lot of memory if many torrents are tracked
|
||||||
* Improve announce performance by avoiding having to filter response peers
|
* Improve announce performance by avoiding having to filter response peers
|
||||||
* In announce response statistics, don't include announcing peer
|
* In announce response statistics, don't include announcing peer
|
||||||
|
|
||||||
|
|
|
||||||
67
Cargo.lock
generated
67
Cargo.lock
generated
|
|
@ -183,6 +183,7 @@ dependencies = [
|
||||||
"aquatic_http_protocol",
|
"aquatic_http_protocol",
|
||||||
"aquatic_toml_config",
|
"aquatic_toml_config",
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
|
"arrayvec",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"either",
|
"either",
|
||||||
"futures",
|
"futures",
|
||||||
|
|
@ -622,9 +623,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ciborium"
|
name = "ciborium"
|
||||||
version = "0.2.1"
|
version = "0.2.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926"
|
checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ciborium-io",
|
"ciborium-io",
|
||||||
"ciborium-ll",
|
"ciborium-ll",
|
||||||
|
|
@ -633,15 +634,15 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ciborium-io"
|
name = "ciborium-io"
|
||||||
version = "0.2.1"
|
version = "0.2.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656"
|
checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ciborium-ll"
|
name = "ciborium-ll"
|
||||||
version = "0.2.1"
|
version = "0.2.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b"
|
checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ciborium-io",
|
"ciborium-io",
|
||||||
"half",
|
"half",
|
||||||
|
|
@ -889,6 +890,12 @@ version = "0.8.19"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
|
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crunchy"
|
||||||
|
version = "0.2.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crypto-common"
|
name = "crypto-common"
|
||||||
version = "0.1.6"
|
version = "0.1.6"
|
||||||
|
|
@ -1276,9 +1283,13 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "half"
|
name = "half"
|
||||||
version = "1.8.2"
|
version = "2.3.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
|
checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"crunchy",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "halfbrown"
|
name = "halfbrown"
|
||||||
|
|
@ -2001,18 +2012,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project"
|
name = "pin-project"
|
||||||
version = "1.1.3"
|
version = "1.1.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
|
checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"pin-project-internal",
|
"pin-project-internal",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-internal"
|
name = "pin-project-internal"
|
||||||
version = "1.1.3"
|
version = "1.1.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
|
checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|
@ -2129,9 +2140,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.76"
|
version = "1.0.78"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c"
|
checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
@ -2292,9 +2303,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "1.10.2"
|
version = "1.10.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
|
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
|
|
@ -2304,9 +2315,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex-automata"
|
name = "regex-automata"
|
||||||
version = "0.4.3"
|
version = "0.4.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
|
checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
|
|
@ -2437,9 +2448,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.195"
|
version = "1.0.196"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02"
|
checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
|
|
@ -2465,9 +2476,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.195"
|
version = "1.0.196"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
|
checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
|
@ -2476,9 +2487,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_json"
|
name = "serde_json"
|
||||||
version = "1.0.111"
|
version = "1.0.112"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4"
|
checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"itoa",
|
"itoa",
|
||||||
"ryu",
|
"ryu",
|
||||||
|
|
@ -2974,9 +2985,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "value-trait"
|
name = "value-trait"
|
||||||
version = "0.8.0"
|
version = "0.8.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97"
|
checksum = "dad8db98c1e677797df21ba03fca7d3bf9bec3ca38db930954e4fe6e1ea27eb4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"float-cmp",
|
"float-cmp",
|
||||||
"halfbrown",
|
"halfbrown",
|
||||||
|
|
@ -3262,9 +3273,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winnow"
|
name = "winnow"
|
||||||
version = "0.5.34"
|
version = "0.5.35"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b7cf47b659b318dccbd69cc4797a39ae128f533dce7902a1096044d1967b9c16"
|
checksum = "1931d78a9c73861da0134f453bb1f790ce49b2e30eba8410b4b79bac72b46a2d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
4
TODO.md
4
TODO.md
|
|
@ -6,10 +6,6 @@
|
||||||
* add task to generate prometheus exports on regular interval to clean up
|
* add task to generate prometheus exports on regular interval to clean up
|
||||||
data. this is important if peer_clients is activated
|
data. this is important if peer_clients is activated
|
||||||
|
|
||||||
* http
|
|
||||||
* consider storing small number of peers without extra heap allocation
|
|
||||||
* add CI transfer test for http without TLS
|
|
||||||
|
|
||||||
* aquatic_bench
|
* aquatic_bench
|
||||||
* Opentracker "slow to get up to speed", is it due to getting faster once
|
* Opentracker "slow to get up to speed", is it due to getting faster once
|
||||||
inserts are rarely needed since most ip-port combinations have been sent?
|
inserts are rarely needed since most ip-port combinations have been sent?
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ aquatic_http_protocol.workspace = true
|
||||||
aquatic_toml_config.workspace = true
|
aquatic_toml_config.workspace = true
|
||||||
|
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
|
arrayvec = "0.7"
|
||||||
arc-swap = "1"
|
arc-swap = "1"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
either = "1"
|
either = "1"
|
||||||
|
|
|
||||||
|
|
@ -58,19 +58,8 @@ pub async fn run_swarm_worker(
|
||||||
// Periodically update torrent count metrics
|
// Periodically update torrent count metrics
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
|
TimerActionRepeat::repeat(enclose!((config, torrents) move || {
|
||||||
enclose!((config, torrents, worker_index) move || async move {
|
enclose!((config, torrents) move || async move {
|
||||||
let torrents = torrents.borrow_mut();
|
torrents.borrow_mut().update_torrent_metrics();
|
||||||
|
|
||||||
::metrics::gauge!(
|
|
||||||
"aquatic_torrents",
|
|
||||||
"ip_version" => "4",
|
|
||||||
"worker_index" => worker_index.to_string(),
|
|
||||||
).set(torrents.ipv4.len() as f64);
|
|
||||||
::metrics::gauge!(
|
|
||||||
"aquatic_torrents",
|
|
||||||
"ip_version" => "6",
|
|
||||||
"worker_index" => worker_index.to_string(),
|
|
||||||
).set(torrents.ipv6.len() as f64);
|
|
||||||
|
|
||||||
Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
|
Some(Duration::from_secs(config.metrics.torrent_count_update_interval))
|
||||||
})()
|
})()
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ use std::collections::BTreeMap;
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrayvec::ArrayVec;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
|
||||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
||||||
|
|
@ -15,52 +16,23 @@ use aquatic_http_protocol::response::*;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
const SMALL_PEER_MAP_CAPACITY: usize = 4;
|
||||||
|
|
||||||
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {
|
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
fn ip_version_str() -> &'static str;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Ip for Ipv4Addr {
|
impl Ip for Ipv4Addr {}
|
||||||
#[cfg(feature = "metrics")]
|
impl Ip for Ipv6Addr {}
|
||||||
fn ip_version_str() -> &'static str {
|
|
||||||
"4"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl Ip for Ipv6Addr {
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
fn ip_version_str() -> &'static str {
|
|
||||||
"6"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct TorrentMaps {
|
pub struct TorrentMaps {
|
||||||
pub ipv4: TorrentMap<Ipv4Addr>,
|
pub ipv4: TorrentMap<Ipv4Addr>,
|
||||||
pub ipv6: TorrentMap<Ipv6Addr>,
|
pub ipv6: TorrentMap<Ipv6Addr>,
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
pub ipv4_peer_gauge: metrics::Gauge,
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
pub ipv6_peer_gauge: metrics::Gauge,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TorrentMaps {
|
impl TorrentMaps {
|
||||||
pub fn new(worker_index: usize) -> Self {
|
pub fn new(worker_index: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
ipv4: Default::default(),
|
ipv4: TorrentMap::new(worker_index, true),
|
||||||
ipv6: Default::default(),
|
ipv6: TorrentMap::new(worker_index, false),
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
ipv4_peer_gauge: ::metrics::gauge!(
|
|
||||||
"aquatic_peers",
|
|
||||||
"ip_version" => "4",
|
|
||||||
"worker_index" => worker_index.to_string(),
|
|
||||||
),
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
ipv6_peer_gauge: ::metrics::gauge!(
|
|
||||||
"aquatic_peers",
|
|
||||||
"ip_version" => "6",
|
|
||||||
"worker_index" => worker_index.to_string(),
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,18 +46,13 @@ impl TorrentMaps {
|
||||||
) -> AnnounceResponse {
|
) -> AnnounceResponse {
|
||||||
match peer_addr.get().ip() {
|
match peer_addr.get().ip() {
|
||||||
IpAddr::V4(peer_ip_address) => {
|
IpAddr::V4(peer_ip_address) => {
|
||||||
let (seeders, leechers, response_peers) = self
|
let (seeders, leechers, response_peers) =
|
||||||
.ipv4
|
self.ipv4.upsert_peer_and_get_response_peers(
|
||||||
.entry(request.info_hash)
|
|
||||||
.or_default()
|
|
||||||
.upsert_peer_and_get_response_peers(
|
|
||||||
config,
|
config,
|
||||||
rng,
|
rng,
|
||||||
|
valid_until,
|
||||||
peer_ip_address,
|
peer_ip_address,
|
||||||
request,
|
request,
|
||||||
valid_until,
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
&self.ipv4_peer_gauge,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
AnnounceResponse {
|
AnnounceResponse {
|
||||||
|
|
@ -98,18 +65,13 @@ impl TorrentMaps {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
IpAddr::V6(peer_ip_address) => {
|
IpAddr::V6(peer_ip_address) => {
|
||||||
let (seeders, leechers, response_peers) = self
|
let (seeders, leechers, response_peers) =
|
||||||
.ipv6
|
self.ipv6.upsert_peer_and_get_response_peers(
|
||||||
.entry(request.info_hash)
|
|
||||||
.or_default()
|
|
||||||
.upsert_peer_and_get_response_peers(
|
|
||||||
config,
|
config,
|
||||||
rng,
|
rng,
|
||||||
|
valid_until,
|
||||||
peer_ip_address,
|
peer_ip_address,
|
||||||
request,
|
request,
|
||||||
valid_until,
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
&self.ipv6_peer_gauge,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
AnnounceResponse {
|
AnnounceResponse {
|
||||||
|
|
@ -130,46 +92,16 @@ impl TorrentMaps {
|
||||||
peer_addr: CanonicalSocketAddr,
|
peer_addr: CanonicalSocketAddr,
|
||||||
request: ScrapeRequest,
|
request: ScrapeRequest,
|
||||||
) -> ScrapeResponse {
|
) -> ScrapeResponse {
|
||||||
let num_to_take = request
|
if peer_addr.get().ip().is_ipv4() {
|
||||||
.info_hashes
|
self.ipv4.handle_scrape_request(config, request)
|
||||||
.len()
|
|
||||||
.min(config.protocol.max_scrape_torrents);
|
|
||||||
|
|
||||||
let mut response = ScrapeResponse {
|
|
||||||
files: BTreeMap::new(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let peer_ip = peer_addr.get().ip();
|
|
||||||
|
|
||||||
// If request.info_hashes is empty, don't return scrape for all
|
|
||||||
// torrents, even though reference server does it. It is too expensive.
|
|
||||||
if peer_ip.is_ipv4() {
|
|
||||||
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
|
|
||||||
if let Some(torrent_data) = self.ipv4.get(&info_hash) {
|
|
||||||
let stats = ScrapeStatistics {
|
|
||||||
complete: torrent_data.num_seeders,
|
|
||||||
downloaded: 0, // No implementation planned
|
|
||||||
incomplete: torrent_data.num_leechers(),
|
|
||||||
};
|
|
||||||
|
|
||||||
response.files.insert(info_hash, stats);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
|
self.ipv6.handle_scrape_request(config, request)
|
||||||
if let Some(torrent_data) = self.ipv6.get(&info_hash) {
|
}
|
||||||
let stats = ScrapeStatistics {
|
}
|
||||||
complete: torrent_data.num_seeders,
|
|
||||||
downloaded: 0, // No implementation planned
|
|
||||||
incomplete: torrent_data.num_leechers(),
|
|
||||||
};
|
|
||||||
|
|
||||||
response.files.insert(info_hash, stats);
|
pub fn update_torrent_metrics(&self) {
|
||||||
}
|
self.ipv4.torrent_gauge.set(self.ipv4.torrents.len() as f64);
|
||||||
}
|
self.ipv6.torrent_gauge.set(self.ipv6.torrents.len() as f64);
|
||||||
};
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clean(
|
pub fn clean(
|
||||||
|
|
@ -182,32 +114,117 @@ impl TorrentMaps {
|
||||||
|
|
||||||
let now = server_start_instant.seconds_elapsed();
|
let now = server_start_instant.seconds_elapsed();
|
||||||
|
|
||||||
Self::clean_torrent_map(
|
self.ipv4.clean(config, &mut access_list_cache, now);
|
||||||
config,
|
self.ipv6.clean(config, &mut access_list_cache, now);
|
||||||
&mut access_list_cache,
|
}
|
||||||
&mut self.ipv4,
|
}
|
||||||
now,
|
|
||||||
&self.ipv4_peer_gauge,
|
pub struct TorrentMap<I: Ip> {
|
||||||
);
|
torrents: IndexMap<InfoHash, TorrentData<I>>,
|
||||||
Self::clean_torrent_map(
|
#[cfg(feature = "metrics")]
|
||||||
config,
|
peer_gauge: ::metrics::Gauge,
|
||||||
&mut access_list_cache,
|
#[cfg(feature = "metrics")]
|
||||||
&mut self.ipv6,
|
torrent_gauge: ::metrics::Gauge,
|
||||||
now,
|
}
|
||||||
&self.ipv6_peer_gauge,
|
|
||||||
);
|
impl<I: Ip> TorrentMap<I> {
|
||||||
|
fn new(worker_index: usize, ipv4: bool) -> Self {
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
let peer_gauge = if ipv4 {
|
||||||
|
::metrics::gauge!(
|
||||||
|
"aquatic_peers",
|
||||||
|
"ip_version" => "4",
|
||||||
|
"worker_index" => worker_index.to_string(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
::metrics::gauge!(
|
||||||
|
"aquatic_peers",
|
||||||
|
"ip_version" => "6",
|
||||||
|
"worker_index" => worker_index.to_string(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
let torrent_gauge = if ipv4 {
|
||||||
|
::metrics::gauge!(
|
||||||
|
"aquatic_torrents",
|
||||||
|
"ip_version" => "4",
|
||||||
|
"worker_index" => worker_index.to_string(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
::metrics::gauge!(
|
||||||
|
"aquatic_torrents",
|
||||||
|
"ip_version" => "6",
|
||||||
|
"worker_index" => worker_index.to_string(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
Self {
|
||||||
|
torrents: Default::default(),
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
peer_gauge,
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
torrent_gauge,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn clean_torrent_map<I: Ip>(
|
fn upsert_peer_and_get_response_peers(
|
||||||
|
&mut self,
|
||||||
|
config: &Config,
|
||||||
|
rng: &mut impl Rng,
|
||||||
|
valid_until: ValidUntil,
|
||||||
|
peer_ip_address: I,
|
||||||
|
request: AnnounceRequest,
|
||||||
|
) -> (usize, usize, Vec<ResponsePeer<I>>) {
|
||||||
|
self.torrents
|
||||||
|
.entry(request.info_hash)
|
||||||
|
.or_default()
|
||||||
|
.upsert_peer_and_get_response_peers(
|
||||||
|
config,
|
||||||
|
rng,
|
||||||
|
request,
|
||||||
|
peer_ip_address,
|
||||||
|
valid_until,
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
&self.peer_gauge,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_scrape_request(&mut self, config: &Config, request: ScrapeRequest) -> ScrapeResponse {
|
||||||
|
let num_to_take = request
|
||||||
|
.info_hashes
|
||||||
|
.len()
|
||||||
|
.min(config.protocol.max_scrape_torrents);
|
||||||
|
|
||||||
|
let mut response = ScrapeResponse {
|
||||||
|
files: BTreeMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
|
||||||
|
let stats = self
|
||||||
|
.torrents
|
||||||
|
.get(&info_hash)
|
||||||
|
.map(|torrent_data| torrent_data.scrape_statistics())
|
||||||
|
.unwrap_or(ScrapeStatistics {
|
||||||
|
complete: 0,
|
||||||
|
incomplete: 0,
|
||||||
|
downloaded: 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
response.files.insert(info_hash, stats);
|
||||||
|
}
|
||||||
|
|
||||||
|
response
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clean(
|
||||||
|
&mut self,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
access_list_cache: &mut AccessListCache,
|
access_list_cache: &mut AccessListCache,
|
||||||
torrent_map: &mut TorrentMap<I>,
|
|
||||||
now: SecondsSinceServerStart,
|
now: SecondsSinceServerStart,
|
||||||
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
|
|
||||||
) {
|
) {
|
||||||
let mut total_num_peers = 0;
|
let mut total_num_peers = 0;
|
||||||
|
|
||||||
torrent_map.retain(|info_hash, torrent_data| {
|
self.torrents.retain(|info_hash, torrent_data| {
|
||||||
if !access_list_cache
|
if !access_list_cache
|
||||||
.load()
|
.load()
|
||||||
.allows(config.access_list.mode, &info_hash.0)
|
.allows(config.access_list.mode, &info_hash.0)
|
||||||
|
|
@ -215,116 +232,216 @@ impl TorrentMaps {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_seeders = &mut torrent_data.num_seeders;
|
let num_peers = match torrent_data {
|
||||||
|
TorrentData::Small(t) => t.clean_and_get_num_peers(now),
|
||||||
|
TorrentData::Large(t) => t.clean_and_get_num_peers(now),
|
||||||
|
};
|
||||||
|
|
||||||
torrent_data.peers.retain(|_, peer| {
|
total_num_peers += num_peers as u64;
|
||||||
let keep = peer.valid_until.valid(now);
|
|
||||||
|
|
||||||
if (!keep) & peer.seeder {
|
num_peers > 0
|
||||||
*num_seeders -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
keep
|
|
||||||
});
|
|
||||||
|
|
||||||
total_num_peers += torrent_data.peers.len() as u64;
|
|
||||||
|
|
||||||
!torrent_data.peers.is_empty()
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let total_num_peers = total_num_peers as f64;
|
self.torrents.shrink_to_fit();
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
peer_gauge.set(total_num_peers);
|
self.peer_gauge.set(total_num_peers as f64);
|
||||||
|
|
||||||
torrent_map.shrink_to_fit();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type TorrentMap<I> = IndexMap<InfoHash, TorrentData<I>>;
|
pub enum TorrentData<I: Ip> {
|
||||||
|
Small(SmallPeerMap<I>),
|
||||||
pub struct TorrentData<I: Ip> {
|
Large(LargePeerMap<I>),
|
||||||
peers: IndexMap<ResponsePeer<I>, Peer>,
|
|
||||||
num_seeders: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<I: Ip> Default for TorrentData<I> {
|
|
||||||
#[inline]
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
peers: Default::default(),
|
|
||||||
num_seeders: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Ip> TorrentData<I> {
|
impl<I: Ip> TorrentData<I> {
|
||||||
fn num_leechers(&self) -> usize {
|
|
||||||
self.peers.len() - self.num_seeders
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Insert/update peer. Return num_seeders, num_leechers and response peers
|
|
||||||
fn upsert_peer_and_get_response_peers(
|
fn upsert_peer_and_get_response_peers(
|
||||||
&mut self,
|
&mut self,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
rng: &mut impl Rng,
|
rng: &mut impl Rng,
|
||||||
peer_ip_address: I,
|
|
||||||
request: AnnounceRequest,
|
request: AnnounceRequest,
|
||||||
|
ip_address: I,
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
|
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
|
||||||
) -> (usize, usize, Vec<ResponsePeer<I>>) {
|
) -> (usize, usize, Vec<ResponsePeer<I>>) {
|
||||||
let peer_status =
|
let max_num_peers_to_take = match request.numwant {
|
||||||
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
|
Some(0) | None => config.protocol.max_peers,
|
||||||
|
Some(numwant) => numwant.min(config.protocol.max_peers),
|
||||||
|
};
|
||||||
|
|
||||||
|
let status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left);
|
||||||
|
|
||||||
let peer_map_key = ResponsePeer {
|
let peer_map_key = ResponsePeer {
|
||||||
ip_address: peer_ip_address,
|
ip_address,
|
||||||
port: request.port,
|
port: request.port,
|
||||||
};
|
};
|
||||||
|
|
||||||
let opt_removed_peer = self.peers.remove(&peer_map_key);
|
// Create the response before inserting the peer. This means that we
|
||||||
|
// don't have to filter it out from the response peers, and that the
|
||||||
|
// reported number of seeders/leechers will not include it
|
||||||
|
let (response_data, opt_removed_peer) = match self {
|
||||||
|
Self::Small(peer_map) => {
|
||||||
|
let opt_removed_peer = peer_map.remove(&peer_map_key);
|
||||||
|
|
||||||
if let Some(Peer { seeder: true, .. }) = opt_removed_peer.as_ref() {
|
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
||||||
self.num_seeders -= 1;
|
let response_peers = peer_map.extract_response_peers(max_num_peers_to_take);
|
||||||
}
|
|
||||||
|
|
||||||
let response_peers = match peer_status {
|
// Convert peer map to large variant if it is full and
|
||||||
PeerStatus::Seeding | PeerStatus::Leeching => {
|
// announcing peer is not stopped and will therefore be
|
||||||
|
// inserted
|
||||||
|
if peer_map.is_full() && status != PeerStatus::Stopped {
|
||||||
|
*self = Self::Large(peer_map.to_large());
|
||||||
|
}
|
||||||
|
|
||||||
|
((seeders, leechers, response_peers), opt_removed_peer)
|
||||||
|
}
|
||||||
|
Self::Large(peer_map) => {
|
||||||
|
let opt_removed_peer = peer_map.remove_peer(&peer_map_key);
|
||||||
|
|
||||||
|
let (seeders, leechers) = peer_map.num_seeders_leechers();
|
||||||
|
let response_peers = peer_map.extract_response_peers(rng, max_num_peers_to_take);
|
||||||
|
|
||||||
|
// Try shrinking the map if announcing peer is stopped and
|
||||||
|
// will therefore not be inserted
|
||||||
|
if status == PeerStatus::Stopped {
|
||||||
|
if let Some(peer_map) = peer_map.try_shrink() {
|
||||||
|
*self = Self::Small(peer_map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
((seeders, leechers, response_peers), opt_removed_peer)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match status {
|
||||||
|
PeerStatus::Leeching | PeerStatus::Seeding => {
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
if opt_removed_peer.is_none() {
|
if opt_removed_peer.is_none() {
|
||||||
peer_gauge.increment(1.0);
|
peer_gauge.increment(1.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
let max_num_peers_to_take = match request.numwant {
|
|
||||||
Some(0) | None => config.protocol.max_peers,
|
|
||||||
Some(numwant) => numwant.min(config.protocol.max_peers),
|
|
||||||
};
|
|
||||||
|
|
||||||
let response_peers = self.extract_response_peers(rng, max_num_peers_to_take);
|
|
||||||
|
|
||||||
let peer = Peer {
|
let peer = Peer {
|
||||||
|
is_seeder: status == PeerStatus::Seeding,
|
||||||
valid_until,
|
valid_until,
|
||||||
seeder: peer_status == PeerStatus::Seeding,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
self.peers.insert(peer_map_key, peer);
|
match self {
|
||||||
|
Self::Small(peer_map) => peer_map.insert(peer_map_key, peer),
|
||||||
if peer_status == PeerStatus::Seeding {
|
Self::Large(peer_map) => peer_map.insert(peer_map_key, peer),
|
||||||
self.num_seeders += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
response_peers
|
|
||||||
}
|
}
|
||||||
PeerStatus::Stopped => {
|
PeerStatus::Stopped =>
|
||||||
|
{
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
if opt_removed_peer.is_some() {
|
if opt_removed_peer.is_some() {
|
||||||
peer_gauge.decrement(1.0);
|
peer_gauge.decrement(1.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
Vec::new()
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
(self.num_seeders, self.num_leechers(), response_peers)
|
response_data
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scrape_statistics(&self) -> ScrapeStatistics {
|
||||||
|
let (seeders, leechers) = match self {
|
||||||
|
Self::Small(peer_map) => peer_map.num_seeders_leechers(),
|
||||||
|
Self::Large(peer_map) => peer_map.num_seeders_leechers(),
|
||||||
|
};
|
||||||
|
|
||||||
|
ScrapeStatistics {
|
||||||
|
complete: seeders,
|
||||||
|
incomplete: leechers,
|
||||||
|
downloaded: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: Ip> Default for TorrentData<I> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Small(SmallPeerMap(ArrayVec::default()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store torrents with very few peers without an extra heap allocation
|
||||||
|
///
|
||||||
|
/// On public open trackers, this is likely to be the majority of torrents.
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct SmallPeerMap<I: Ip>(ArrayVec<(ResponsePeer<I>, Peer), SMALL_PEER_MAP_CAPACITY>);
|
||||||
|
|
||||||
|
impl<I: Ip> SmallPeerMap<I> {
|
||||||
|
fn is_full(&self) -> bool {
|
||||||
|
self.0.is_full()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn num_seeders_leechers(&self) -> (usize, usize) {
|
||||||
|
let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count();
|
||||||
|
let leechers = self.0.len() - seeders;
|
||||||
|
|
||||||
|
(seeders, leechers)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
||||||
|
self.0.push((key, peer));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
||||||
|
for (i, (k, _)) in self.0.iter().enumerate() {
|
||||||
|
if k == key {
|
||||||
|
return Some(self.0.remove(i).1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec<ResponsePeer<I>> {
|
||||||
|
Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize {
|
||||||
|
self.0.retain(|(_, peer)| peer.valid_until.valid(now));
|
||||||
|
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_large(&self) -> LargePeerMap<I> {
|
||||||
|
let (num_seeders, _) = self.num_seeders_leechers();
|
||||||
|
let peers = self.0.iter().copied().collect();
|
||||||
|
|
||||||
|
LargePeerMap { peers, num_seeders }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct LargePeerMap<I: Ip> {
|
||||||
|
peers: IndexMap<ResponsePeer<I>, Peer>,
|
||||||
|
num_seeders: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: Ip> LargePeerMap<I> {
|
||||||
|
fn num_seeders_leechers(&self) -> (usize, usize) {
|
||||||
|
(self.num_seeders, self.peers.len() - self.num_seeders)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn insert(&mut self, key: ResponsePeer<I>, peer: Peer) {
|
||||||
|
if peer.is_seeder {
|
||||||
|
self.num_seeders += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.peers.insert(key, peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_peer(&mut self, key: &ResponsePeer<I>) -> Option<Peer> {
|
||||||
|
let opt_removed_peer = self.peers.remove(key);
|
||||||
|
|
||||||
|
if let Some(Peer {
|
||||||
|
is_seeder: true, ..
|
||||||
|
}) = opt_removed_peer
|
||||||
|
{
|
||||||
|
self.num_seeders -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
opt_removed_peer
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract response peers
|
/// Extract response peers
|
||||||
|
|
@ -373,12 +490,36 @@ impl<I: Ip> TorrentData<I> {
|
||||||
peers
|
peers
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn clean_and_get_num_peers(&mut self, now: SecondsSinceServerStart) -> usize {
|
||||||
|
self.peers.retain(|_, peer| {
|
||||||
|
let keep = peer.valid_until.valid(now);
|
||||||
|
|
||||||
|
if (!keep) & peer.is_seeder {
|
||||||
|
self.num_seeders -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
keep
|
||||||
|
});
|
||||||
|
|
||||||
|
self.peers.shrink_to_fit();
|
||||||
|
|
||||||
|
self.peers.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_shrink(&mut self) -> Option<SmallPeerMap<I>> {
|
||||||
|
(self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| {
|
||||||
|
SmallPeerMap(ArrayVec::from_iter(
|
||||||
|
self.peers.iter().map(|(k, v)| (*k, *v)),
|
||||||
|
))
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
struct Peer {
|
struct Peer {
|
||||||
pub valid_until: ValidUntil,
|
pub valid_until: ValidUntil,
|
||||||
pub seeder: bool,
|
pub is_seeder: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||||
|
|
@ -389,14 +530,10 @@ enum PeerStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerStatus {
|
impl PeerStatus {
|
||||||
/// Determine peer status from announce event and number of bytes left.
|
fn from_event_and_bytes_left(event: AnnounceEvent, bytes_left: usize) -> Self {
|
||||||
///
|
|
||||||
/// Likely, the last branch will be taken most of the time.
|
|
||||||
#[inline]
|
|
||||||
fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
|
|
||||||
if let AnnounceEvent::Stopped = event {
|
if let AnnounceEvent::Stopped = event {
|
||||||
Self::Stopped
|
Self::Stopped
|
||||||
} else if let Some(0) = opt_bytes_left {
|
} else if bytes_left == 0 {
|
||||||
Self::Seeding
|
Self::Seeding
|
||||||
} else {
|
} else {
|
||||||
Self::Leeching
|
Self::Leeching
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue