Merge pull request #53 from greatest-ape/2022-03-18

aquatic_ws: remove mio implementation, update dependencies
This commit is contained in:
Joakim Frostegård 2022-03-18 16:01:45 +01:00 committed by GitHub
commit ae03cfcd03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 708 additions and 2276 deletions

View file

@ -20,11 +20,8 @@ jobs:
- name: Build - name: Build
run: | run: |
cargo build --verbose -p aquatic_udp --features "cpu-pinning" cargo build --verbose -p aquatic_udp --features "cpu-pinning"
cargo build --verbose -p aquatic_http --features "cpu-pinning" cargo build --verbose -p aquatic_http --features "cpu-pinning"
cargo build --verbose -p aquatic_ws --features "cpu-pinning" cargo build --verbose -p aquatic_ws --features "cpu-pinning"
cargo build --verbose -p aquatic_ws --features "with-glommio cpu-pinning" --no-default-features
- name: Run tests - name: Run tests
run: cargo test --verbose --workspace --all-targets run: cargo test --verbose --workspace --all-targets
@ -35,6 +32,4 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Build - name: Build
run: | run: cargo build --verbose -p aquatic_udp
cargo build --verbose -p aquatic_udp
cargo build --verbose -p aquatic_ws

203
Cargo.lock generated
View file

@ -39,9 +39,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.54" version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d" checksum = "4361135be9122e0870de935d7c439aef945b9f9ddd4199a553b5270b49c82a27"
[[package]] [[package]]
name = "aquatic" name = "aquatic"
@ -264,18 +264,14 @@ dependencies = [
"aquatic_ws_protocol", "aquatic_ws_protocol",
"async-tungstenite", "async-tungstenite",
"cfg-if", "cfg-if",
"crossbeam-channel",
"either", "either",
"futures", "futures",
"futures-lite", "futures-lite",
"futures-rustls", "futures-rustls",
"glommio", "glommio",
"hashbrown 0.12.0", "hashbrown 0.12.0",
"histogram",
"log", "log",
"mimalloc", "mimalloc",
"mio",
"parking_lot",
"privdrop", "privdrop",
"quickcheck", "quickcheck",
"quickcheck_macros", "quickcheck_macros",
@ -285,7 +281,6 @@ dependencies = [
"serde", "serde",
"signal-hook", "signal-hook",
"slab", "slab",
"socket2 0.4.4",
"tungstenite", "tungstenite",
] ]
@ -346,9 +341,9 @@ dependencies = [
[[package]] [[package]]
name = "async-tungstenite" name = "async-tungstenite"
version = "0.17.0" version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3fe458e5f0c283bd6315a884d30f90b654c612b2c16d676730ba71f23cf160a" checksum = "7922abeade7dd8948c20dfa1f85dc48cc952d2e0791f7c42b8b1cbb07a57129d"
dependencies = [ dependencies = [
"futures-io", "futures-io",
"futures-util", "futures-util",
@ -609,9 +604,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.2" version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa" checksum = "fdbfe11fe19ff083c48923cf179540e8cd0535903dc35e178a1fdeeb59aef51f"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -630,10 +625,11 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.9.7" version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c00d6d2ea26e8b151d99093005cb442fb9a37aeaca582a03ec70946f49ab5ed9" checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [ dependencies = [
"autocfg",
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
"lazy_static", "lazy_static",
@ -643,9 +639,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.3.4" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd435b205a4842da59efd07628f921c096bc1cc0a156835b4fa0bcb9a19bcce" checksum = "1f25d8400f4a7a5778f0e4e52384a48cbd9b5c495d110786187fc750075277a2"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -653,9 +649,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.7" version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"lazy_static", "lazy_static",
@ -794,9 +790,9 @@ dependencies = [
[[package]] [[package]]
name = "flume" name = "flume"
version = "0.10.11" version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b279436a715a9de95dcd26b151db590a71961cc06e54918b24fe0dd5b7d3fc4" checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -948,14 +944,14 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.4" version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"js-sys", "js-sys",
"libc", "libc",
"wasi", "wasi 0.10.2+wasi-snapshot-preview1",
"wasm-bindgen", "wasm-bindgen",
] ]
@ -1018,11 +1014,11 @@ checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]] [[package]]
name = "halfbrown" name = "halfbrown"
version = "0.1.12" version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed39577259d319b81a15176a32673271be2786cb463889703c58c90fe83c825" checksum = "49e26621a30b9fdb4f949b9c6a7fa42ce88112851c33ac4ca00bfa7848d26fb4"
dependencies = [ dependencies = [
"hashbrown 0.11.2", "hashbrown 0.12.0",
"serde", "serde",
] ]
@ -1032,15 +1028,6 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.0" version = "0.12.0"
@ -1066,12 +1053,6 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "histogram"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669"
[[package]] [[package]]
name = "http" name = "http"
version = "0.2.6" version = "0.2.6"
@ -1204,9 +1185,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.118" version = "0.2.120"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06e509672465a0504304aa87f9f176f2b2b716ed8fb105ebe5c02dc6dce96a94" checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09"
[[package]] [[package]]
name = "libm" name = "libm"
@ -1313,14 +1294,15 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.0" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
"miow", "miow",
"ntapi", "ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -1335,9 +1317,9 @@ dependencies = [
[[package]] [[package]]
name = "nanorand" name = "nanorand"
version = "0.6.1" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729eb334247daa1803e0a094d0a5c55711b85571179f5ec6e53eccfdf7008958" checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [ dependencies = [
"getrandom", "getrandom",
] ]
@ -1449,9 +1431,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.9.0" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]] [[package]]
name = "oorandom" name = "oorandom"
@ -1471,29 +1453,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.1.0" version = "2.1.0"
@ -1615,9 +1574,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.15" version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -1687,20 +1646,11 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "redox_syscall"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.5.4" version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -1818,9 +1768,9 @@ dependencies = [
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.5" version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0486718e92ec9a68fbed73bb5ef687d71103b142595b406835649bebd33f72c7" checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d"
[[package]] [[package]]
name = "serde" name = "serde"
@ -1962,9 +1912,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]] [[package]]
name = "smartstring" name = "smartstring"
version = "0.2.9" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31aa6a31c0c2b21327ce875f7e8952322acfcfd0c27569a6e18a647281352c9b" checksum = "ea958ad90cacc8ece7f238fde3671e1b350ee1741964edf2a22fd16f60224163"
dependencies = [ dependencies = [
"static_assertions", "static_assertions",
] ]
@ -2013,9 +1963,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.86" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2036,9 +1986,9 @@ dependencies = [
[[package]] [[package]]
name = "termcolor" name = "termcolor"
version = "1.1.2" version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [ dependencies = [
"winapi-util", "winapi-util",
] ]
@ -2128,9 +2078,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.31" version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6c650a8ef0cd2dd93736f033d21cbd1224c5a967aa0c258d00fcf7dafef9b9f" checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"pin-project-lite", "pin-project-lite",
@ -2140,9 +2090,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.19" version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8276d9a4a3a558d7b7ad5303ad50b53d58264641b82914b7ada36bd762e7a716" checksum = "2e65ce065b4b5c53e73bb28912318cb8c9e9ad3921f1d669eb0e68b4c8143a2b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2151,18 +2101,18 @@ dependencies = [
[[package]] [[package]]
name = "tracing-core" name = "tracing-core"
version = "0.1.22" version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23" checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]] [[package]]
name = "tungstenite" name = "tungstenite"
version = "0.17.1" version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a5198d211a468fa9573edf4919aa88a17515723c766b3a6b3a10536eb7e1ee0" checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [ dependencies = [
"base64", "base64",
"byteorder", "byteorder",
@ -2242,9 +2192,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]] [[package]]
name = "value-trait" name = "value-trait"
version = "0.2.9" version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0393efdd7d82f856a927b0fcafa80bca45911f5c89ef6b9d80197bebc284f72e" checksum = "0fe40a74a6f052b10668ef021c8c3ae56ab38269f9c0f401daa6ed36f96662fd"
dependencies = [ dependencies = [
"float-cmp", "float-cmp",
"halfbrown", "halfbrown",
@ -2281,6 +2231,12 @@ version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.79" version = "0.2.79"
@ -2397,46 +2353,3 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5"
[[package]]
name = "windows_i686_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615"
[[package]]
name = "windows_i686_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172"
[[package]]
name = "windows_x86_64_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"

View file

@ -13,11 +13,11 @@ of sub-implementations for different protocols:
[mio]: https://github.com/tokio-rs/mio [mio]: https://github.com/tokio-rs/mio
[glommio]: https://github.com/DataDog/glommio [glommio]: https://github.com/DataDog/glommio
| Name | Protocol | OS requirements | | Name | Protocol | OS requirements |
|--------------|--------------------------------------------|------------------------------------------------------------| |--------------|--------------------------------------------|------------------------------|
| aquatic_udp | [BitTorrent over UDP] | Unix-like | | aquatic_udp | [BitTorrent over UDP] | Unix-like (using [mio]) |
| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ | | aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
| aquatic_ws | [WebTorrent] over TLS ([rustls]) | Unix-like with [mio] (default) / Linux 5.8+ with [glommio] | | aquatic_ws | [WebTorrent] over TLS ([rustls]) | Linux 5.8+ (using [glommio]) |
## Usage ## Usage
@ -25,10 +25,9 @@ of sub-implementations for different protocols:
- Install Rust with [rustup](https://rustup.rs/) (stable is recommended) - Install Rust with [rustup](https://rustup.rs/) (stable is recommended)
- Install cmake with your package manager (e.g., `apt-get install cmake`) - Install cmake with your package manager (e.g., `apt-get install cmake`)
- Unless you're planning to only run the cross-platform mio based - Unless you're planning to only run `aquatic_udp`, make sure locked memory
implementations, make sure locked memory limits are sufficient. limits are sufficient. You can do this by adding the following lines to
You can do this by adding the following lines to `/etc/security/limits.conf`, `/etc/security/limits.conf`, and then logging out and back in:
and then logging out and back in:
``` ```
* hard memlock 512 * hard memlock 512
@ -50,7 +49,6 @@ Compile the implementations that you are interested in:
cargo build --release -p aquatic_udp cargo build --release -p aquatic_udp
cargo build --release -p aquatic_http cargo build --release -p aquatic_http
cargo build --release -p aquatic_ws cargo build --release -p aquatic_ws
cargo build --release -p aquatic_ws --features "with-glommio" --no-default-features
``` ```
### Running ### Running

View file

@ -39,7 +39,7 @@ rustls-pemfile = "0.3"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" } signal-hook = { version = "0.3" }
slab = "0.4" slab = "0.4"
smartstring = "0.2" smartstring = "1"
[dev-dependencies] [dev-dependencies]
quickcheck = "1" quickcheck = "1"

View file

@ -31,7 +31,7 @@ memchr = "2"
rand = { version = "0.8", features = ["small_rng"] } rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_bencode = "0.2" serde_bencode = "0.2"
smartstring = "0.2" smartstring = "1"
urlencoding = "2" urlencoding = "2"
[dev-dependencies] [dev-dependencies]

View file

@ -14,10 +14,7 @@ name = "aquatic_ws"
name = "aquatic_ws" name = "aquatic_ws"
[features] [features]
default = ["with-mio"]
cpu-pinning = ["aquatic_common/cpu-pinning"] cpu-pinning = ["aquatic_common/cpu-pinning"]
with-glommio = ["cpu-pinning", "async-tungstenite", "futures-lite", "futures", "futures-rustls", "glommio"]
with-mio = ["crossbeam-channel", "histogram", "mio", "parking_lot", "socket2"]
[dependencies] [dependencies]
aquatic_cli_helpers = "0.1.0" aquatic_cli_helpers = "0.1.0"
@ -26,8 +23,13 @@ aquatic_toml_config = "0.1.0"
aquatic_ws_protocol = "0.1.0" aquatic_ws_protocol = "0.1.0"
anyhow = "1" anyhow = "1"
async-tungstenite = "0.17"
cfg-if = "1" cfg-if = "1"
either = "1" either = "1"
futures = "0.3"
futures-lite = "1"
futures-rustls = "0.22"
glommio = "0.7"
hashbrown = { version = "0.12", features = ["serde"] } hashbrown = { version = "0.12", features = ["serde"] }
log = "0.4" log = "0.4"
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }
@ -40,20 +42,6 @@ signal-hook = { version = "0.3" }
slab = "0.4" slab = "0.4"
tungstenite = "0.17" tungstenite = "0.17"
# mio
crossbeam-channel = { version = "0.5", optional = true }
histogram = { version = "0.6", optional = true }
mio = { version = "0.8", features = ["net", "os-poll"], optional = true }
parking_lot = { version = "0.12", optional = true }
socket2 = { version = "0.4", features = ["all"], optional = true }
# glommio
async-tungstenite = { version = "0.17", optional = true }
futures-lite = { version = "1", optional = true }
futures = { version = "0.3", optional = true }
futures-rustls = { version = "0.22", optional = true }
glommio = { version = "0.7", optional = true }
[dev-dependencies] [dev-dependencies]
quickcheck = "1" quickcheck = "1"
quickcheck_macros = "1" quickcheck_macros = "1"

32
aquatic_ws/src/common.rs Normal file
View file

@ -0,0 +1,32 @@
use std::sync::Arc;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
pub use aquatic_common::ValidUntil;
pub type TlsConfig = futures_rustls::rustls::ServerConfig;
#[derive(Default, Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
}
#[derive(Copy, Clone, Debug)]
pub struct PendingScrapeId(pub usize);
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);
#[derive(Clone, Copy, Debug)]
pub struct ConnectionId(pub usize);
#[derive(Clone, Copy, Debug)]
pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker.
pub out_message_consumer_id: ConsumerId,
pub connection_id: ConnectionId,
pub peer_addr: CanonicalSocketAddr,
pub pending_scrape_id: Option<PendingScrapeId>,
}

View file

@ -1,187 +0,0 @@
use aquatic_common::extract_response_peers;
use hashbrown::HashMap;
use rand::rngs::SmallRng;
use aquatic_ws_protocol::*;
use crate::common::*;
use crate::config::Config;
pub fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrent_maps: &mut TorrentMaps,
out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
valid_until: ValidUntil,
request_sender_meta: ConnectionMeta,
request: AnnounceRequest,
) {
let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() {
torrent_maps.ipv4.entry(request.info_hash).or_default()
} else {
torrent_maps.ipv6.entry(request.info_hash).or_default()
};
// If there is already a peer with this peer_id, check that socket
// addr is same as that of request sender. Otherwise, ignore request.
// Since peers have access to each others peer_id's, they could send
// requests using them, causing all sorts of issues.
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr {
return;
}
}
::log::trace!("received request from {:?}", request_sender_meta);
// Insert/update/remove peer who sent this request
{
let peer_status = PeerStatus::from_event_and_bytes_left(
request.event.unwrap_or_default(),
request.bytes_left,
);
let peer = Peer {
connection_meta: request_sender_meta,
status: peer_status,
valid_until,
};
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(request.peer_id, peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(request.peer_id, peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
}
// If peer sent offers, send them on to random peers
if let Some(offers) = request.offers {
// FIXME: config: also maybe check this when parsing request
let max_num_peers_to_take = offers.len().min(config.protocol.max_offers);
#[inline]
fn f(peer: &Peer) -> Peer {
*peer
}
let offer_receivers: Vec<Peer> = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
request.peer_id,
f,
);
for (offer, offer_receiver) in offers.into_iter().zip(offer_receivers) {
let middleman_offer = MiddlemanOfferToPeer {
action: AnnounceAction,
info_hash: request.info_hash,
peer_id: request.peer_id,
offer: offer.offer,
offer_id: offer.offer_id,
};
out_messages.push((
offer_receiver.connection_meta,
OutMessage::Offer(middleman_offer),
));
::log::trace!(
"sending middleman offer to {:?}",
offer_receiver.connection_meta
);
}
}
// If peer sent answer, send it on to relevant peer
if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) =
(request.answer, request.to_peer_id, request.offer_id)
{
if let Some(answer_receiver) = torrent_data.peers.get(&answer_receiver_id) {
let middleman_answer = MiddlemanAnswerToPeer {
action: AnnounceAction,
peer_id: request.peer_id,
info_hash: request.info_hash,
answer,
offer_id,
};
out_messages.push((
answer_receiver.connection_meta,
OutMessage::Answer(middleman_answer),
));
::log::trace!(
"sending middleman answer to {:?}",
answer_receiver.connection_meta
);
}
}
let out_message = OutMessage::AnnounceResponse(AnnounceResponse {
action: AnnounceAction,
info_hash: request.info_hash,
complete: torrent_data.num_seeders,
incomplete: torrent_data.num_leechers,
announce_interval: config.protocol.peer_announce_interval,
});
out_messages.push((request_sender_meta, out_message));
}
pub fn handle_scrape_request(
config: &Config,
torrent_maps: &mut TorrentMaps,
out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
meta: ConnectionMeta,
request: ScrapeRequest,
) {
let info_hashes = if let Some(info_hashes) = request.info_hashes {
info_hashes.as_vec()
} else {
return;
};
let num_to_take = info_hashes.len().min(config.protocol.max_scrape_torrents);
let mut out_message = ScrapeResponse {
action: ScrapeAction,
files: HashMap::with_capacity(num_to_take),
};
let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() {
&mut torrent_maps.ipv4
} else {
&mut torrent_maps.ipv6
};
for info_hash in info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_map.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
};
out_message.files.insert(info_hash, stats);
}
}
out_messages.push((meta, OutMessage::ScrapeResponse(out_message)));
}

View file

@ -1,171 +0,0 @@
pub mod handlers;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::{AHashIndexMap, CanonicalSocketAddr};
pub use aquatic_common::ValidUntil;
use aquatic_ws_protocol::*;
use crate::config::Config;
#[derive(Copy, Clone, Debug)]
pub struct PendingScrapeId(pub usize);
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);
#[derive(Clone, Copy, Debug)]
pub struct ConnectionId(pub usize);
#[derive(Clone, Copy, Debug)]
pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker.
pub out_message_consumer_id: ConsumerId,
pub connection_id: ConnectionId,
pub peer_addr: CanonicalSocketAddr,
pub pending_scrape_id: Option<PendingScrapeId>,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
Leeching,
Stopped,
}
impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
#[inline]
pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
if let AnnounceEvent::Stopped = event {
Self::Stopped
} else if let Some(0) = opt_bytes_left {
Self::Seeding
} else {
Self::Leeching
}
}
}
#[derive(Clone, Copy)]
pub struct Peer {
pub connection_meta: ConnectionMeta,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
pub type PeerMap = AHashIndexMap<PeerId, Peer>;
pub struct TorrentData {
pub peers: PeerMap,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl Default for TorrentData {
#[inline]
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
pub type TorrentMap = AHashIndexMap<InfoHash, TorrentData>;
#[derive(Default)]
pub struct TorrentMaps {
pub ipv4: TorrentMap,
pub ipv6: TorrentMap,
}
impl TorrentMaps {
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
let mut access_list_cache = create_access_list_cache(access_list);
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4);
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6);
}
fn clean_torrent_map(
config: &Config,
access_list_cache: &mut AccessListCache,
torrent_map: &mut TorrentMap,
) {
let now = Instant::now();
torrent_map.retain(|info_hash, torrent_data| {
if !access_list_cache
.load()
.allows(config.access_list.mode, &info_hash.0)
{
return false;
}
let num_seeders = &mut torrent_data.num_seeders;
let num_leechers = &mut torrent_data.num_leechers;
torrent_data.peers.retain(|_, peer| {
let keep = peer.valid_until.0 >= now;
if !keep {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
}
keep
});
!torrent_data.peers.is_empty()
});
torrent_map.shrink_to_fit();
}
}
pub fn create_tls_config(config: &Config) -> anyhow::Result<rustls::ServerConfig> {
let certs = {
let f = File::open(&config.network.tls_certificate_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::certs(&mut f)?
.into_iter()
.map(|bytes| rustls::Certificate(bytes))
.collect()
};
let private_key = {
let f = File::open(&config.network.tls_private_key_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::pkcs8_private_keys(&mut f)?
.first()
.map(|bytes| rustls::PrivateKey(bytes.clone()))
.ok_or(anyhow::anyhow!("No private keys in file"))?
};
let tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key)?;
Ok(tls_config)
}

View file

@ -23,15 +23,28 @@ pub struct Config {
pub log_level: LogLevel, pub log_level: LogLevel,
pub network: NetworkConfig, pub network: NetworkConfig,
pub protocol: ProtocolConfig, pub protocol: ProtocolConfig,
#[cfg(feature = "with-mio")]
pub handlers: HandlerConfig,
pub cleaning: CleaningConfig, pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig, pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig, pub access_list: AccessListConfig,
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
pub cpu_pinning: CpuPinningConfig, pub cpu_pinning: CpuPinningConfig,
#[cfg(feature = "with-mio")] }
pub statistics: StatisticsConfig,
impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
#[cfg(feature = "cpu-pinning")]
cpu_pinning: Default::default(),
}
}
} }
impl aquatic_cli_helpers::Config for Config { impl aquatic_cli_helpers::Config for Config {
@ -55,81 +68,6 @@ pub struct NetworkConfig {
pub websocket_max_message_size: usize, pub websocket_max_message_size: usize,
pub websocket_max_frame_size: usize, pub websocket_max_frame_size: usize,
#[cfg(feature = "with-mio")]
pub poll_event_capacity: usize,
#[cfg(feature = "with-mio")]
pub poll_timeout_microseconds: u64,
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of offers to accept in announce request
pub max_offers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
#[cfg(feature = "with-mio")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct HandlerConfig {
/// Maximum number of requests to receive from channel before locking
/// mutex and starting work
pub max_requests_per_iter: usize,
pub channel_recv_timeout_microseconds: u64,
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,
/// Remove peers that have not announced for this long (seconds)
pub max_peer_age: u64,
// Clean connections this often (seconds)
#[cfg(feature = "with-glommio")]
pub connection_cleaning_interval: u64,
/// Close connections if no responses have been sent to them for this long (seconds)
#[cfg(feature = "with-glommio")]
pub max_connection_idle: u64,
/// Remove connections that are older than this (seconds)
#[cfg(feature = "with-mio")]
pub max_connection_age: u64,
}
#[cfg(feature = "with-mio")]
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct StatisticsConfig {
/// Print statistics this often (seconds). Do not print when set to zero.
pub interval: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
#[cfg(feature = "with-mio")]
handlers: Default::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
#[cfg(feature = "cpu-pinning")]
cpu_pinning: Default::default(),
#[cfg(feature = "with-mio")]
statistics: Default::default(),
}
}
} }
impl Default for NetworkConfig { impl Default for NetworkConfig {
@ -143,15 +81,21 @@ impl Default for NetworkConfig {
websocket_max_message_size: 64 * 1024, websocket_max_message_size: 64 * 1024,
websocket_max_frame_size: 16 * 1024, websocket_max_frame_size: 16 * 1024,
#[cfg(feature = "with-mio")]
poll_event_capacity: 4096,
#[cfg(feature = "with-mio")]
poll_timeout_microseconds: 200_000,
} }
} }
} }
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of offers to accept in announce request
pub max_offers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
impl Default for ProtocolConfig { impl Default for ProtocolConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -162,14 +106,17 @@ impl Default for ProtocolConfig {
} }
} }
#[cfg(feature = "with-mio")] #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
impl Default for HandlerConfig { #[serde(default)]
fn default() -> Self { pub struct CleaningConfig {
Self { /// Clean peers this often (seconds)
max_requests_per_iter: 256, pub torrent_cleaning_interval: u64,
channel_recv_timeout_microseconds: 200, /// Remove peers that have not announced for this long (seconds)
} pub max_peer_age: u64,
} // Clean connections this often (seconds)
pub connection_cleaning_interval: u64,
/// Close connections if no responses have been sent to them for this long (seconds)
pub max_connection_idle: u64,
} }
impl Default for CleaningConfig { impl Default for CleaningConfig {
@ -177,24 +124,12 @@ impl Default for CleaningConfig {
Self { Self {
torrent_cleaning_interval: 30, torrent_cleaning_interval: 30,
max_peer_age: 1800, max_peer_age: 1800,
#[cfg(feature = "with-glommio")]
max_connection_idle: 60 * 5, max_connection_idle: 60 * 5,
#[cfg(feature = "with-mio")]
max_connection_age: 1800,
#[cfg(feature = "with-glommio")]
connection_cleaning_interval: 30, connection_cleaning_interval: 30,
} }
} }
} }
#[cfg(feature = "with-mio")]
impl Default for StatisticsConfig {
fn default() -> Self {
Self { interval: 0 }
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::Config; use super::Config;

View file

@ -1,10 +0,0 @@
use std::sync::Arc;
use aquatic_common::access_list::AccessListArcSwap;
pub type TlsConfig = futures_rustls::rustls::ServerConfig;
#[derive(Default, Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
}

View file

@ -1,107 +0,0 @@
pub mod common;
pub mod request;
pub mod socket;
use std::sync::{atomic::AtomicUsize, Arc};
use crate::{common::create_tls_config, config::Config};
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use self::common::*;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
pub const SHARED_IN_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config, state: State) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_tls_config(&config).unwrap());
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let builder = LocalExecutorBuilder::default().name("socket");
let executor = builder.spawn(move || async move {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::SocketWorker(i),
);
socket::run_socket_worker(
config,
state,
tls_config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
)
.await
});
executors.push(executor);
}
for i in 0..(config.request_workers) {
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let builder = LocalExecutorBuilder::default().name("request");
let executor = builder.spawn(move || async move {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::RequestWorker(i),
);
request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await
});
executors.push(executor);
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}

View file

@ -1,128 +0,0 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use futures::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::enclose;
use glommio::prelude::*;
use glommio::timer::TimerActionRepeat;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_ws_protocol::*;
use crate::common::handlers::*;
use crate::common::*;
use crate::config::Config;
use super::common::State;
use super::SHARED_IN_CHANNEL_SIZE;
pub async fn run_request_worker(
config: Config,
state: State,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
) {
let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap();
let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap();
let out_message_senders = Rc::new(out_message_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let access_list = state.access_list;
// Periodically clean torrents
TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || {
enclose!((config, torrents, access_list) move || async move {
torrents.borrow_mut().clean(&config, &access_list);
Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval))
})()
}));
let mut handles = Vec::new();
for (_, receiver) in in_message_receivers.streams() {
let handle = spawn_local(handle_request_stream(
config.clone(),
torrents.clone(),
out_message_senders.clone(),
receiver,
))
.detach();
handles.push(handle);
}
for handle in handles {
handle.await;
}
}
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
out_message_senders: Rc<Senders<(ConnectionMeta, OutMessage)>>,
stream: S,
) where
S: futures_lite::Stream<Item = (ConnectionMeta, InMessage)> + ::std::marker::Unpin,
{
let rng = Rc::new(RefCell::new(SmallRng::from_entropy()));
let max_peer_age = config.cleaning.max_peer_age;
let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age)));
TimerActionRepeat::repeat(enclose!((peer_valid_until) move || {
enclose!((peer_valid_until) move || async move {
*peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age);
Some(Duration::from_secs(1))
})()
}));
let config = &config;
let torrents = &torrents;
let peer_valid_until = &peer_valid_until;
let rng = &rng;
let out_message_senders = &out_message_senders;
stream
.for_each_concurrent(
SHARED_IN_CHANNEL_SIZE,
move |(meta, in_message)| async move {
let mut out_messages = Vec::new();
match in_message {
InMessage::AnnounceRequest(request) => handle_announce_request(
&config,
&mut rng.borrow_mut(),
&mut torrents.borrow_mut(),
&mut out_messages,
peer_valid_until.borrow().to_owned(),
meta,
request,
),
InMessage::ScrapeRequest(request) => handle_scrape_request(
&config,
&mut torrents.borrow_mut(),
&mut out_messages,
meta,
request,
),
};
for (meta, out_message) in out_messages.drain(..) {
::log::info!("request worker trying to send OutMessage to socket worker");
out_message_senders
.send_to(meta.out_message_consumer_id.0, (meta, out_message))
.await
.expect("failed sending out_message to socket worker");
::log::info!("request worker sent OutMessage to socket worker");
}
},
)
.await;
}

View file

@ -1,28 +1,28 @@
pub mod common;
pub mod config;
pub mod workers;
use std::fs::File;
use std::io::BufReader;
use std::sync::{atomic::AtomicUsize, Arc};
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{consts::SIGUSR1, iterator::Signals};
use aquatic_common::access_list::update_access_list; use aquatic_common::access_list::update_access_list;
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use cfg_if::cfg_if; use aquatic_common::privileges::drop_privileges_after_socket_binding;
use signal_hook::{consts::SIGUSR1, iterator::Signals};
use crate::config::Config; use common::*;
use config::Config;
pub mod common;
pub mod config;
#[cfg(feature = "with-glommio")]
pub mod glommio;
#[cfg(feature = "with-mio")]
pub mod mio;
pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
pub const SHARED_IN_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
cfg_if!( let state = State::default();
if #[cfg(feature = "with-glommio")] {
let state = glommio::common::State::default();
} else {
let state = mio::common::State::default();
}
);
update_access_list(&config.access_list, &state.access_list)?; update_access_list(&config.access_list, &state.access_list)?;
@ -32,13 +32,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
cfg_if!( ::std::thread::spawn(move || run_workers(config, state));
if #[cfg(feature = "with-glommio")] {
::std::thread::spawn(move || glommio::run(config, state));
} else {
::std::thread::spawn(move || mio::run(config, state));
}
);
} }
#[cfg(feature = "cpu-pinning")] #[cfg(feature = "cpu-pinning")]
@ -59,3 +53,128 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
Ok(()) Ok(())
} }
fn run_workers(config: Config, state: State) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_tls_config(&config).unwrap());
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let builder = LocalExecutorBuilder::default().name("socket");
let executor = builder.spawn(move || async move {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::SocketWorker(i),
);
workers::socket::run_socket_worker(
config,
state,
tls_config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
)
.await
});
executors.push(executor);
}
for i in 0..(config.request_workers) {
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let builder = LocalExecutorBuilder::default().name("request");
let executor = builder.spawn(move || async move {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::RequestWorker(i),
);
workers::request::run_request_worker(
config,
state,
request_mesh_builder,
response_mesh_builder,
)
.await
});
executors.push(executor);
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}
fn create_tls_config(config: &Config) -> anyhow::Result<rustls::ServerConfig> {
let certs = {
let f = File::open(&config.network.tls_certificate_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::certs(&mut f)?
.into_iter()
.map(|bytes| rustls::Certificate(bytes))
.collect()
};
let private_key = {
let f = File::open(&config.network.tls_private_key_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::pkcs8_private_keys(&mut f)?
.first()
.map(|bytes| rustls::PrivateKey(bytes.clone()))
.ok_or(anyhow::anyhow!("No private keys in file"))?
};
let tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key)?;
Ok(tls_config)
}

View file

@ -1,51 +0,0 @@
use std::sync::Arc;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_ws_protocol::*;
use crossbeam_channel::{Receiver, Sender};
use log::error;
use mio::Token;
use parking_lot::Mutex;
use crate::common::*;
pub const LISTENER_TOKEN: Token = Token(0);
pub const CHANNEL_TOKEN: Token = Token(1);
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrent_maps: Arc<Mutex<TorrentMaps>>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(Default::default()),
torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())),
}
}
}
pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>;
pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>;
pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>;
#[derive(Clone)]
pub struct OutMessageSender(Vec<Sender<(ConnectionMeta, OutMessage)>>);
impl OutMessageSender {
pub fn new(senders: Vec<Sender<(ConnectionMeta, OutMessage)>>) -> Self {
Self(senders)
}
#[inline]
pub fn send(&self, meta: ConnectionMeta, message: OutMessage) {
if let Err(err) = self.0[meta.out_message_consumer_id.0].send((meta, message)) {
error!("OutMessageSender: couldn't send message: {:?}", err);
}
}
}
pub type SocketWorkerStatus = Option<Result<(), String>>;
pub type SocketWorkerStatuses = Arc<Mutex<Vec<SocketWorkerStatus>>>;

View file

@ -1,218 +0,0 @@
use std::sync::Arc;
use std::thread::Builder;
use std::time::Duration;
use anyhow::Context;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use histogram::Histogram;
use mio::{Poll, Waker};
use parking_lot::Mutex;
use privdrop::PrivDrop;
pub mod common;
pub mod request;
pub mod socket;
use crate::{common::create_tls_config, config::Config};
use common::*;
pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
const SHARED_IN_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config, state: State) -> anyhow::Result<()> {
start_workers(config.clone(), state.clone()).expect("couldn't start workers");
// TODO: privdrop here instead
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
state.torrent_maps.lock().clean(&config, &state.access_list);
}
}
pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
let tls_config = Arc::new(create_tls_config(&config)?);
let (in_message_sender, in_message_receiver) =
::crossbeam_channel::bounded(SHARED_IN_CHANNEL_SIZE);
let mut out_message_senders = Vec::new();
let mut wakers = Vec::new();
let socket_worker_statuses: SocketWorkerStatuses = {
let mut statuses = Vec::new();
for _ in 0..config.socket_workers {
statuses.push(None);
}
Arc::new(Mutex::new(statuses))
};
for i in 0..config.socket_workers {
let config = config.clone();
let state = state.clone();
let socket_worker_statuses = socket_worker_statuses.clone();
let in_message_sender = in_message_sender.clone();
let tls_config = tls_config.clone();
let poll = Poll::new()?;
let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN)?);
let (out_message_sender, out_message_receiver) =
::crossbeam_channel::bounded(SHARED_IN_CHANNEL_SIZE * 16);
out_message_senders.push(out_message_sender);
wakers.push(waker);
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::SocketWorker(i),
);
socket::run_socket_worker(
config,
state,
i,
socket_worker_statuses,
poll,
in_message_sender,
out_message_receiver,
tls_config,
);
})?;
}
// Wait for socket worker statuses. On error from any, quit program.
// On success from all, drop privileges if corresponding setting is set
// and continue program.
loop {
::std::thread::sleep(::std::time::Duration::from_millis(10));
if let Some(statuses) = socket_worker_statuses.try_lock() {
for opt_status in statuses.iter() {
if let Some(Err(err)) = opt_status {
return Err(::anyhow::anyhow!(err.to_owned()));
}
}
if statuses.iter().all(Option::is_some) {
if config.privileges.drop_privileges {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()
.context("Couldn't drop root privileges")?;
}
break;
}
}
}
let out_message_sender = OutMessageSender::new(out_message_senders);
for i in 0..config.request_workers {
let config = config.clone();
let state = state.clone();
let in_message_receiver = in_message_receiver.clone();
let out_message_sender = out_message_sender.clone();
let wakers = wakers.clone();
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::RequestWorker(i),
);
request::run_request_worker(
config,
state,
in_message_receiver,
out_message_sender,
wakers,
);
})?;
}
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
Builder::new()
.name("statistics".to_string())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
print_statistics(&state);
}
})
.expect("spawn statistics thread");
}
Ok(())
}
fn print_statistics(state: &State) {
let mut peers_per_torrent = Histogram::new();
{
let torrents = &mut state.torrent_maps.lock();
for torrent in torrents.ipv4.values() {
let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64;
if let Err(err) = peers_per_torrent.increment(num_peers) {
eprintln!("error incrementing peers_per_torrent histogram: {}", err)
}
}
for torrent in torrents.ipv6.values() {
let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64;
if let Err(err) = peers_per_torrent.increment(num_peers) {
eprintln!("error incrementing peers_per_torrent histogram: {}", err)
}
}
}
if peers_per_torrent.entries() != 0 {
println!(
"peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}",
peers_per_torrent.minimum().unwrap(),
peers_per_torrent.percentile(50.0).unwrap(),
peers_per_torrent.percentile(75.0).unwrap(),
peers_per_torrent.percentile(90.0).unwrap(),
peers_per_torrent.percentile(99.0).unwrap(),
peers_per_torrent.percentile(99.9).unwrap(),
peers_per_torrent.maximum().unwrap(),
);
}
}

View file

@ -1,103 +0,0 @@
use std::sync::Arc;
use std::time::Duration;
use mio::Waker;
use parking_lot::MutexGuard;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_ws_protocol::*;
use crate::common::handlers::{handle_announce_request, handle_scrape_request};
use crate::common::*;
use crate::config::Config;
use super::common::*;
pub fn run_request_worker(
config: Config,
state: State,
in_message_receiver: InMessageReceiver,
out_message_sender: OutMessageSender,
wakers: Vec<Arc<Waker>>,
) {
let mut wake_socket_workers: Vec<bool> = (0..config.socket_workers).map(|_| false).collect();
let mut announce_requests = Vec::new();
let mut scrape_requests = Vec::new();
let mut out_messages = Vec::new();
let mut rng = SmallRng::from_entropy();
let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds);
loop {
let mut opt_torrent_map_guard: Option<MutexGuard<TorrentMaps>> = None;
for i in 0..config.handlers.max_requests_per_iter {
let opt_in_message = if i == 0 {
in_message_receiver.recv().ok()
} else {
in_message_receiver.recv_timeout(timeout).ok()
};
match opt_in_message {
Some((meta, InMessage::AnnounceRequest(r))) => {
announce_requests.push((meta, r));
}
Some((meta, InMessage::ScrapeRequest(r))) => {
scrape_requests.push((meta, r));
}
None => {
if let Some(torrent_guard) = state.torrent_maps.try_lock() {
opt_torrent_map_guard = Some(torrent_guard);
break;
}
}
}
}
let mut torrent_map_guard =
opt_torrent_map_guard.unwrap_or_else(|| state.torrent_maps.lock());
let valid_until = ValidUntil::new(config.cleaning.max_peer_age);
for (meta, request) in announce_requests.drain(..) {
handle_announce_request(
&config,
&mut rng,
&mut torrent_map_guard,
&mut out_messages,
valid_until,
meta,
request,
);
}
for (meta, request) in scrape_requests.drain(..) {
handle_scrape_request(
&config,
&mut torrent_map_guard,
&mut out_messages,
meta,
request,
);
}
::std::mem::drop(torrent_map_guard);
for (meta, out_message) in out_messages.drain(..) {
wake_socket_workers[meta.out_message_consumer_id.0] = true;
out_message_sender.send(meta, out_message);
}
for (worker_index, wake) in wake_socket_workers.iter_mut().enumerate() {
if *wake {
if let Err(err) = wakers[worker_index].wake() {
::log::error!("request handler couldn't wake poll: {:?}", err);
}
*wake = false;
}
}
}
}

View file

@ -1,577 +0,0 @@
use std::{collections::VecDeque, io::ErrorKind, marker::PhantomData, net::Shutdown, sync::Arc};
use aquatic_common::ValidUntil;
use aquatic_ws_protocol::{InMessage, OutMessage};
use mio::{net::TcpStream, Interest, Poll, Token};
use rustls::{ServerConfig, ServerConnection};
use tungstenite::{
handshake::{server::NoCallback, MidHandshake},
protocol::WebSocketConfig,
HandshakeError, ServerHandshake,
};
use crate::common::ConnectionMeta;
const MAX_PENDING_MESSAGES: usize = 16;
type TlsStream = rustls::StreamOwned<ServerConnection, TcpStream>;
type WsHandshakeResult<S> =
Result<tungstenite::WebSocket<S>, HandshakeError<ServerHandshake<S, NoCallback>>>;
type ConnectionReadResult<T> = ::std::io::Result<ConnectionReadStatus<T>>;
pub trait RegistryStatus {}
pub struct Registered;
impl RegistryStatus for Registered {}
pub struct NotRegistered;
impl RegistryStatus for NotRegistered {}
enum ConnectionReadStatus<T> {
Message(T, InMessage),
Ok(T),
WouldBlock(T),
}
enum ConnectionState<R: RegistryStatus> {
TlsHandshaking(TlsHandshaking<R>),
WsHandshaking(WsHandshaking<R>),
WsConnection(WsConnection<R>),
}
pub struct Connection<R: RegistryStatus> {
pub valid_until: ValidUntil,
meta: ConnectionMeta,
state: ConnectionState<R>,
pub message_queue: VecDeque<OutMessage>,
pub interest: Interest,
phantom_data: PhantomData<R>,
}
impl<R: RegistryStatus> Connection<R> {
pub fn get_meta(&self) -> ConnectionMeta {
self.meta
}
}
impl Connection<NotRegistered> {
pub fn new(
tls_config: Arc<ServerConfig>,
ws_config: WebSocketConfig,
tcp_stream: TcpStream,
valid_until: ValidUntil,
meta: ConnectionMeta,
) -> Self {
let state =
ConnectionState::TlsHandshaking(TlsHandshaking::new(tls_config, ws_config, tcp_stream));
Self {
valid_until,
meta,
state,
message_queue: Default::default(),
interest: Interest::READABLE,
phantom_data: PhantomData::default(),
}
}
/// Read until stream blocks (or error occurs)
///
/// Requires Connection not to be registered, since it might be dropped on errors
pub fn read<F>(
mut self,
message_handler: &mut F,
) -> ::std::io::Result<Connection<NotRegistered>>
where
F: FnMut(ConnectionMeta, InMessage),
{
loop {
let result = match self.state {
ConnectionState::TlsHandshaking(inner) => inner.read(),
ConnectionState::WsHandshaking(inner) => inner.read(),
ConnectionState::WsConnection(inner) => inner.read(),
};
match result {
Ok(ConnectionReadStatus::Message(state, message)) => {
self.state = state;
message_handler(self.meta, message);
// Stop looping even if WouldBlock wasn't necessarily reached. Otherwise,
// we might get stuck reading from this connection only. Since we register
// the connection again upon reinsertion into the ConnectionMap, we should
// be getting new events anyway.
return Ok(self);
}
Ok(ConnectionReadStatus::Ok(state)) => {
self.state = state;
::log::debug!("read connection");
}
Ok(ConnectionReadStatus::WouldBlock(state)) => {
self.state = state;
::log::debug!("reading connection would block");
return Ok(self);
}
Err(err) => {
::log::debug!("Connection::read error: {}", err);
return Err(err);
}
}
}
}
pub fn register(self, poll: &mut Poll, token: Token) -> Connection<Registered> {
let state = match self.state {
ConnectionState::TlsHandshaking(inner) => {
ConnectionState::TlsHandshaking(inner.register(poll, token, self.interest))
}
ConnectionState::WsHandshaking(inner) => {
ConnectionState::WsHandshaking(inner.register(poll, token, self.interest))
}
ConnectionState::WsConnection(inner) => {
ConnectionState::WsConnection(inner.register(poll, token, self.interest))
}
};
Connection {
valid_until: self.valid_until,
meta: self.meta,
state,
message_queue: self.message_queue,
interest: self.interest,
phantom_data: PhantomData::default(),
}
}
pub fn close(self) {
::log::debug!("will close connection to {}", self.meta.peer_addr.get());
match self.state {
ConnectionState::TlsHandshaking(inner) => inner.close(),
ConnectionState::WsHandshaking(inner) => inner.close(),
ConnectionState::WsConnection(inner) => inner.close(),
}
}
}
impl Connection<Registered> {
pub fn write_or_queue_message(
&mut self,
poll: &mut Poll,
message: OutMessage,
) -> ::std::io::Result<()> {
let message_clone = message.clone();
match self.write_message(message) {
Ok(()) => Ok(()),
Err(err) if err.kind() == ErrorKind::WouldBlock => {
if self.message_queue.len() < MAX_PENDING_MESSAGES {
self.message_queue.push_back(message_clone);
if !self.interest.is_writable() {
self.interest = Interest::WRITABLE;
self.reregister(poll)?;
}
} else {
::log::info!("Connection::message_queue is full, dropping message");
}
Ok(())
}
Err(err) => Err(err),
}
}
pub fn write(&mut self, poll: &mut Poll) -> ::std::io::Result<()> {
if let ConnectionState::WsConnection(_) = self.state {
while let Some(message) = self.message_queue.pop_front() {
let message_clone = message.clone();
match self.write_message(message) {
Ok(()) => {}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
// Can't make message queue longer than it was before pop_front
self.message_queue.push_front(message_clone);
return Ok(());
}
Err(err) => {
return Err(err);
}
}
}
if self.message_queue.is_empty() {
self.interest = Interest::READABLE;
}
self.reregister(poll)?;
Ok(())
} else {
Err(std::io::Error::new(
ErrorKind::NotConnected,
"WebSocket connection not established",
))
}
}
fn write_message(&mut self, message: OutMessage) -> ::std::io::Result<()> {
if let ConnectionState::WsConnection(WsConnection {
ref mut web_socket, ..
}) = self.state
{
match web_socket.write_message(message.to_ws_message()) {
Ok(_) => {}
Err(tungstenite::Error::SendQueueFull(_message)) => {
return Err(std::io::Error::new(
ErrorKind::WouldBlock,
"Send queue full",
))
}
Err(tungstenite::Error::Io(err)) => return Err(err),
Err(err) => return Err(std::io::Error::new(ErrorKind::Other, err))?,
}
match web_socket.write_pending() {
Ok(()) => Ok(()),
Err(tungstenite::Error::Io(err)) => Err(err),
Err(err) => Err(std::io::Error::new(ErrorKind::Other, err))?,
}
} else {
Err(std::io::Error::new(
ErrorKind::NotConnected,
"WebSocket connection not established",
))
}
}
pub fn reregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> {
let token = Token(self.meta.connection_id.0);
match self.state {
ConnectionState::TlsHandshaking(ref mut inner) => {
inner.reregister(poll, token, self.interest)
}
ConnectionState::WsHandshaking(ref mut inner) => {
inner.reregister(poll, token, self.interest)
}
ConnectionState::WsConnection(ref mut inner) => {
inner.reregister(poll, token, self.interest)
}
}
}
pub fn deregister(self, poll: &mut Poll) -> Connection<NotRegistered> {
let state = match self.state {
ConnectionState::TlsHandshaking(inner) => {
ConnectionState::TlsHandshaking(inner.deregister(poll))
}
ConnectionState::WsHandshaking(inner) => {
ConnectionState::WsHandshaking(inner.deregister(poll))
}
ConnectionState::WsConnection(inner) => {
ConnectionState::WsConnection(inner.deregister(poll))
}
};
Connection {
valid_until: self.valid_until,
meta: self.meta,
state,
message_queue: self.message_queue,
interest: self.interest,
phantom_data: PhantomData::default(),
}
}
}
struct TlsHandshaking<R: RegistryStatus> {
tls_conn: ServerConnection,
ws_config: WebSocketConfig,
tcp_stream: TcpStream,
phantom_data: PhantomData<R>,
}
impl TlsHandshaking<NotRegistered> {
fn new(tls_config: Arc<ServerConfig>, ws_config: WebSocketConfig, stream: TcpStream) -> Self {
Self {
tls_conn: ServerConnection::new(tls_config).unwrap(),
ws_config,
tcp_stream: stream,
phantom_data: PhantomData::default(),
}
}
fn read(mut self) -> ConnectionReadResult<ConnectionState<NotRegistered>> {
match self.tls_conn.read_tls(&mut self.tcp_stream) {
Ok(0) => {
return Err(::std::io::Error::new(
ErrorKind::ConnectionReset,
"Connection closed",
))
}
Ok(_) => match self.tls_conn.process_new_packets() {
Ok(_) => {
while self.tls_conn.wants_write() {
self.tls_conn.write_tls(&mut self.tcp_stream)?;
}
if self.tls_conn.is_handshaking() {
Ok(ConnectionReadStatus::WouldBlock(
ConnectionState::TlsHandshaking(self),
))
} else {
let tls_stream = TlsStream::new(self.tls_conn, self.tcp_stream);
WsHandshaking::handle_handshake_result(tungstenite::accept_with_config(
tls_stream,
Some(self.ws_config),
))
}
}
Err(err) => {
let _ = self.tls_conn.write_tls(&mut self.tcp_stream);
Err(::std::io::Error::new(ErrorKind::InvalidData, err))
}
},
Err(err) if err.kind() == ErrorKind::WouldBlock => {
return Ok(ConnectionReadStatus::WouldBlock(
ConnectionState::TlsHandshaking(self),
))
}
Err(err) => return Err(err),
}
}
fn register(
mut self,
poll: &mut Poll,
token: Token,
interest: Interest,
) -> TlsHandshaking<Registered> {
poll.registry()
.register(&mut self.tcp_stream, token, interest)
.unwrap();
TlsHandshaking {
tls_conn: self.tls_conn,
ws_config: self.ws_config,
tcp_stream: self.tcp_stream,
phantom_data: PhantomData::default(),
}
}
fn close(self) {
::log::debug!("closing connection (TlsHandshaking state)");
let _ = self.tcp_stream.shutdown(Shutdown::Both);
}
}
impl TlsHandshaking<Registered> {
fn deregister(mut self, poll: &mut Poll) -> TlsHandshaking<NotRegistered> {
poll.registry().deregister(&mut self.tcp_stream).unwrap();
TlsHandshaking {
tls_conn: self.tls_conn,
ws_config: self.ws_config,
tcp_stream: self.tcp_stream,
phantom_data: PhantomData::default(),
}
}
fn reregister(
&mut self,
poll: &mut Poll,
token: Token,
interest: Interest,
) -> std::io::Result<()> {
poll.registry()
.reregister(&mut self.tcp_stream, token, interest)
}
}
struct WsHandshaking<R: RegistryStatus> {
mid_handshake: MidHandshake<ServerHandshake<TlsStream, NoCallback>>,
phantom_data: PhantomData<R>,
}
impl WsHandshaking<NotRegistered> {
fn read(self) -> ConnectionReadResult<ConnectionState<NotRegistered>> {
Self::handle_handshake_result(self.mid_handshake.handshake())
}
fn handle_handshake_result(
handshake_result: WsHandshakeResult<TlsStream>,
) -> ConnectionReadResult<ConnectionState<NotRegistered>> {
match handshake_result {
Ok(web_socket) => {
let conn = ConnectionState::WsConnection(WsConnection {
web_socket,
phantom_data: PhantomData::default(),
});
Ok(ConnectionReadStatus::Ok(conn))
}
Err(HandshakeError::Interrupted(mid_handshake)) => {
let conn = ConnectionState::WsHandshaking(WsHandshaking {
mid_handshake,
phantom_data: PhantomData::default(),
});
Ok(ConnectionReadStatus::WouldBlock(conn))
}
Err(HandshakeError::Failure(err)) => {
return Err(std::io::Error::new(ErrorKind::InvalidData, err))
}
}
}
fn register(
mut self,
poll: &mut Poll,
token: Token,
interest: Interest,
) -> WsHandshaking<Registered> {
let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock;
poll.registry()
.register(tcp_stream, token, interest)
.unwrap();
WsHandshaking {
mid_handshake: self.mid_handshake,
phantom_data: PhantomData::default(),
}
}
fn close(mut self) {
::log::debug!("closing connection (WsHandshaking state)");
let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock;
let _ = tcp_stream.shutdown(Shutdown::Both);
}
}
impl WsHandshaking<Registered> {
fn deregister(mut self, poll: &mut Poll) -> WsHandshaking<NotRegistered> {
let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock;
poll.registry().deregister(tcp_stream).unwrap();
WsHandshaking {
mid_handshake: self.mid_handshake,
phantom_data: PhantomData::default(),
}
}
fn reregister(
&mut self,
poll: &mut Poll,
token: Token,
interest: Interest,
) -> std::io::Result<()> {
let tcp_stream = &mut self.mid_handshake.get_mut().get_mut().sock;
poll.registry().reregister(tcp_stream, token, interest)
}
}
struct WsConnection<R: RegistryStatus> {
web_socket: tungstenite::WebSocket<TlsStream>,
phantom_data: PhantomData<R>,
}
impl WsConnection<NotRegistered> {
fn read(mut self) -> ConnectionReadResult<ConnectionState<NotRegistered>> {
match self.web_socket.read_message() {
Ok(
message @ tungstenite::Message::Text(_) | message @ tungstenite::Message::Binary(_),
) => match InMessage::from_ws_message(message) {
Ok(message) => {
::log::debug!("received WebSocket message");
Ok(ConnectionReadStatus::Message(
ConnectionState::WsConnection(self),
message,
))
}
Err(err) => Err(std::io::Error::new(ErrorKind::InvalidData, err)),
},
Ok(message) => {
::log::info!("received unexpected WebSocket message: {}", message);
Err(std::io::Error::new(
ErrorKind::InvalidData,
"unexpected WebSocket message type",
))
}
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {
let conn = ConnectionState::WsConnection(self);
Ok(ConnectionReadStatus::WouldBlock(conn))
}
Err(tungstenite::Error::Io(err)) => Err(err),
Err(err) => Err(std::io::Error::new(ErrorKind::InvalidData, err)),
}
}
fn register(
mut self,
poll: &mut Poll,
token: Token,
interest: Interest,
) -> WsConnection<Registered> {
poll.registry()
.register(self.web_socket.get_mut().get_mut(), token, interest)
.unwrap();
WsConnection {
web_socket: self.web_socket,
phantom_data: PhantomData::default(),
}
}
fn close(mut self) {
::log::debug!("closing connection (WsConnection state)");
let _ = self.web_socket.close(None);
let _ = self.web_socket.write_pending();
}
}
impl WsConnection<Registered> {
fn deregister(mut self, poll: &mut Poll) -> WsConnection<NotRegistered> {
poll.registry()
.deregister(self.web_socket.get_mut().get_mut())
.unwrap();
WsConnection {
web_socket: self.web_socket,
phantom_data: PhantomData::default(),
}
}
fn reregister(
&mut self,
poll: &mut Poll,
token: Token,
interest: Interest,
) -> std::io::Result<()> {
poll.registry()
.reregister(self.web_socket.get_mut().get_mut(), token, interest)
}
}

View file

@ -1,403 +0,0 @@
use std::io::ErrorKind;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::access_list::AccessListQuery;
use aquatic_common::CanonicalSocketAddr;
use hashbrown::HashMap;
use mio::net::TcpListener;
use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Protocol, Socket, Type};
use tungstenite::protocol::WebSocketConfig;
use aquatic_ws_protocol::*;
use crate::common::*;
use crate::config::Config;
pub mod connection;
use super::common::*;
use connection::{Connection, NotRegistered, Registered};
struct ConnectionMap {
token_counter: Token,
connections: HashMap<Token, Connection<Registered>>,
}
impl Default for ConnectionMap {
fn default() -> Self {
Self {
token_counter: Token(2),
connections: Default::default(),
}
}
}
impl ConnectionMap {
fn insert_and_register_new<F>(&mut self, poll: &mut Poll, connection_creator: F)
where
F: FnOnce(Token) -> Connection<NotRegistered>,
{
self.token_counter.0 = self.token_counter.0.wrapping_add(1);
// Don't assign LISTENER_TOKEN or CHANNEL_TOKEN
if self.token_counter.0 < 2 {
self.token_counter.0 = 2;
}
let token = self.token_counter;
// Remove, deregister and close any existing connection with this token.
// This shouldn't happen in practice.
if let Some(connection) = self.connections.remove(&token) {
::log::warn!(
"removing existing connection {} because of token reuse",
token.0
);
connection.deregister(poll).close();
}
let connection = connection_creator(token);
self.insert_and_register(poll, token, connection);
}
fn insert_and_register(
&mut self,
poll: &mut Poll,
key: Token,
conn: Connection<NotRegistered>,
) {
self.connections.insert(key, conn.register(poll, key));
}
fn remove_and_deregister(
&mut self,
poll: &mut Poll,
key: &Token,
) -> Option<Connection<NotRegistered>> {
if let Some(connection) = self.connections.remove(key) {
Some(connection.deregister(poll))
} else {
None
}
}
fn get_mut(&mut self, key: &Token) -> Option<&mut Connection<Registered>> {
self.connections.get_mut(key)
}
/// Close and remove inactive connections
fn clean(mut self, poll: &mut Poll) -> Self {
let now = Instant::now();
let mut retained_connections = HashMap::default();
for (token, connection) in self.connections.drain() {
if connection.valid_until.0 < now {
connection.deregister(poll).close();
} else {
retained_connections.insert(token, connection);
}
}
ConnectionMap {
connections: retained_connections,
..self
}
}
}
pub fn run_socket_worker(
config: Config,
state: State,
socket_worker_index: usize,
socket_worker_statuses: SocketWorkerStatuses,
poll: Poll,
in_message_sender: InMessageSender,
out_message_receiver: OutMessageReceiver,
tls_config: Arc<rustls::ServerConfig>,
) {
match create_listener(&config) {
Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
run_poll_loop(
config,
&state,
socket_worker_index,
poll,
in_message_sender,
out_message_receiver,
listener,
tls_config,
);
}
Err(err) => {
socket_worker_statuses.lock()[socket_worker_index] =
Some(Err(format!("Couldn't open socket: {:#}", err)));
}
}
}
fn run_poll_loop(
config: Config,
state: &State,
socket_worker_index: usize,
mut poll: Poll,
in_message_sender: InMessageSender,
out_message_receiver: OutMessageReceiver,
listener: ::std::net::TcpListener,
tls_config: Arc<rustls::ServerConfig>,
) {
let poll_timeout = Duration::from_micros(config.network.poll_timeout_microseconds);
let ws_config = WebSocketConfig {
max_message_size: Some(config.network.websocket_max_message_size),
max_frame_size: Some(config.network.websocket_max_frame_size),
max_send_queue: Some(2),
..Default::default()
};
let mut listener = TcpListener::from_std(listener);
let mut events = Events::with_capacity(config.network.poll_event_capacity);
poll.registry()
.register(&mut listener, LISTENER_TOKEN, Interest::READABLE)
.unwrap();
let mut connections = ConnectionMap::default();
let mut local_responses = Vec::new();
let mut iter_counter = 0usize;
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
for event in events.iter() {
let token = event.token();
match token {
LISTENER_TOKEN => {
accept_new_streams(
&tls_config,
ws_config,
socket_worker_index,
&mut listener,
&mut poll,
&mut connections,
valid_until,
);
}
CHANNEL_TOKEN => {
write_or_queue_messages(
&mut poll,
out_message_receiver
.try_iter()
.take(out_message_receiver.len()),
&mut connections,
);
}
token => {
if event.is_writable() {
let mut remove_connection = false;
if let Some(connection) = connections.get_mut(&token) {
if let Err(err) = connection.write(&mut poll) {
::log::debug!("Connection::write error: {}", err);
remove_connection = true;
}
}
if remove_connection {
if let Some(connection) =
connections.remove_and_deregister(&mut poll, &token)
{
connection.close();
}
}
}
if event.is_readable() {
handle_stream_read_event(
&config,
state,
&mut local_responses,
&in_message_sender,
&mut poll,
&mut connections,
token,
valid_until,
);
}
}
}
write_or_queue_messages(&mut poll, local_responses.drain(..), &mut connections);
}
// Remove inactive connections, but not every iteration
if iter_counter % 128 == 0 {
connections = connections.clean(&mut poll);
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn accept_new_streams(
tls_config: &Arc<rustls::ServerConfig>,
ws_config: WebSocketConfig,
socket_worker_index: usize,
listener: &mut TcpListener,
poll: &mut Poll,
connections: &mut ConnectionMap,
valid_until: ValidUntil,
) {
loop {
match listener.accept() {
Ok((stream, _)) => {
let peer_addr = if let Ok(peer_addr) = stream.peer_addr() {
CanonicalSocketAddr::new(peer_addr)
} else {
continue;
};
connections.insert_and_register_new(poll, move |token| {
let meta = ConnectionMeta {
out_message_consumer_id: ConsumerId(socket_worker_index),
connection_id: ConnectionId(token.0),
peer_addr,
pending_scrape_id: None, // FIXME
};
Connection::new(tls_config.clone(), ws_config, stream, valid_until, meta)
});
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => {
::log::info!("error while accepting streams: {}", err);
}
}
}
}
fn handle_stream_read_event(
config: &Config,
state: &State,
local_responses: &mut Vec<(ConnectionMeta, OutMessage)>,
in_message_sender: &InMessageSender,
poll: &mut Poll,
connections: &mut ConnectionMap,
token: Token,
valid_until: ValidUntil,
) {
let access_list_mode = config.access_list.mode;
if let Some(mut connection) = connections.remove_and_deregister(poll, &token) {
let message_handler = &mut |meta, message| match message {
InMessage::AnnounceRequest(ref request)
if !state
.access_list
.allows(access_list_mode, &request.info_hash.0) =>
{
let out_message = OutMessage::ErrorResponse(ErrorResponse {
failure_reason: "Info hash not allowed".into(),
action: Some(ErrorResponseAction::Announce),
info_hash: Some(request.info_hash),
});
local_responses.push((meta, out_message));
}
in_message => {
if let Err(err) = in_message_sender.send((meta, in_message)) {
::log::info!("InMessageSender: couldn't send message: {:?}", err);
}
}
};
connection.valid_until = valid_until;
match connection.read(message_handler) {
Ok(connection) => {
connections.insert_and_register(poll, token, connection);
}
Err(_) => {}
}
}
}
fn write_or_queue_messages<I>(poll: &mut Poll, responses: I, connections: &mut ConnectionMap)
where
I: Iterator<Item = (ConnectionMeta, OutMessage)>,
{
for (meta, out_message) in responses {
let token = Token(meta.connection_id.0);
let mut remove_connection = false;
if let Some(connection) = connections.get_mut(&token) {
if connection.get_meta().peer_addr != meta.peer_addr {
::log::warn!(
"socket worker error: connection socket addr {} didn't match channel {}. Token: {}.",
connection.get_meta().peer_addr.get(),
meta.peer_addr.get(),
token.0
);
remove_connection = true;
} else {
match connection.write_or_queue_message(poll, out_message) {
Ok(()) => {}
Err(err) => {
::log::debug!("Connection::write_or_queue_message error: {}", err);
remove_connection = true;
}
}
}
}
if remove_connection {
connections.remove_and_deregister(poll, &token);
}
}
}
pub fn create_listener(config: &Config) -> ::anyhow::Result<::std::net::TcpListener> {
let builder = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
} else {
Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))
}
.context("Couldn't create socket2::Socket")?;
if config.network.ipv6_only {
builder
.set_only_v6(true)
.context("Couldn't put socket in ipv6 only mode")?
}
builder
.set_nonblocking(true)
.context("Couldn't put socket in non-blocking mode")?;
builder
.set_reuse_port(true)
.context("Couldn't put socket in reuse_port mode")?;
builder
.bind(&config.network.address.into())
.with_context(|| format!("Couldn't bind socket to address {}", config.network.address))?;
builder
.listen(128)
.context("Couldn't listen for connections on socket")?;
Ok(builder.into())
}

View file

@ -0,0 +1,2 @@
pub mod request;
pub mod socket;

View file

@ -0,0 +1,416 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use futures::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::enclose;
use glommio::prelude::*;
use glommio::timer::TimerActionRepeat;
use hashbrown::HashMap;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::{extract_response_peers, AHashIndexMap};
use aquatic_ws_protocol::*;
use crate::common::*;
use crate::config::Config;
use crate::SHARED_IN_CHANNEL_SIZE;
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
enum PeerStatus {
Seeding,
Leeching,
Stopped,
}
impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
#[inline]
fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
if let AnnounceEvent::Stopped = event {
Self::Stopped
} else if let Some(0) = opt_bytes_left {
Self::Seeding
} else {
Self::Leeching
}
}
}
#[derive(Clone, Copy)]
struct Peer {
pub connection_meta: ConnectionMeta,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
type PeerMap = AHashIndexMap<PeerId, Peer>;
struct TorrentData {
pub peers: PeerMap,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl Default for TorrentData {
#[inline]
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
type TorrentMap = AHashIndexMap<InfoHash, TorrentData>;
#[derive(Default)]
struct TorrentMaps {
pub ipv4: TorrentMap,
pub ipv6: TorrentMap,
}
impl TorrentMaps {
fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
let mut access_list_cache = create_access_list_cache(access_list);
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4);
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6);
}
fn clean_torrent_map(
config: &Config,
access_list_cache: &mut AccessListCache,
torrent_map: &mut TorrentMap,
) {
let now = Instant::now();
torrent_map.retain(|info_hash, torrent_data| {
if !access_list_cache
.load()
.allows(config.access_list.mode, &info_hash.0)
{
return false;
}
let num_seeders = &mut torrent_data.num_seeders;
let num_leechers = &mut torrent_data.num_leechers;
torrent_data.peers.retain(|_, peer| {
let keep = peer.valid_until.0 >= now;
if !keep {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
}
keep
});
!torrent_data.peers.is_empty()
});
torrent_map.shrink_to_fit();
}
}
pub async fn run_request_worker(
config: Config,
state: State,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
) {
let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap();
let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap();
let out_message_senders = Rc::new(out_message_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let access_list = state.access_list;
// Periodically clean torrents
TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || {
enclose!((config, torrents, access_list) move || async move {
torrents.borrow_mut().clean(&config, &access_list);
Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval))
})()
}));
let mut handles = Vec::new();
for (_, receiver) in in_message_receivers.streams() {
let handle = spawn_local(handle_request_stream(
config.clone(),
torrents.clone(),
out_message_senders.clone(),
receiver,
))
.detach();
handles.push(handle);
}
for handle in handles {
handle.await;
}
}
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
out_message_senders: Rc<Senders<(ConnectionMeta, OutMessage)>>,
stream: S,
) where
S: futures_lite::Stream<Item = (ConnectionMeta, InMessage)> + ::std::marker::Unpin,
{
let rng = Rc::new(RefCell::new(SmallRng::from_entropy()));
let max_peer_age = config.cleaning.max_peer_age;
let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age)));
TimerActionRepeat::repeat(enclose!((peer_valid_until) move || {
enclose!((peer_valid_until) move || async move {
*peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age);
Some(Duration::from_secs(1))
})()
}));
let config = &config;
let torrents = &torrents;
let peer_valid_until = &peer_valid_until;
let rng = &rng;
let out_message_senders = &out_message_senders;
stream
.for_each_concurrent(
SHARED_IN_CHANNEL_SIZE,
move |(meta, in_message)| async move {
let mut out_messages = Vec::new();
match in_message {
InMessage::AnnounceRequest(request) => handle_announce_request(
&config,
&mut rng.borrow_mut(),
&mut torrents.borrow_mut(),
&mut out_messages,
peer_valid_until.borrow().to_owned(),
meta,
request,
),
InMessage::ScrapeRequest(request) => handle_scrape_request(
&config,
&mut torrents.borrow_mut(),
&mut out_messages,
meta,
request,
),
};
for (meta, out_message) in out_messages.drain(..) {
::log::info!("request worker trying to send OutMessage to socket worker");
out_message_senders
.send_to(meta.out_message_consumer_id.0, (meta, out_message))
.await
.expect("failed sending out_message to socket worker");
::log::info!("request worker sent OutMessage to socket worker");
}
},
)
.await;
}
fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrent_maps: &mut TorrentMaps,
out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
valid_until: ValidUntil,
request_sender_meta: ConnectionMeta,
request: AnnounceRequest,
) {
let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() {
torrent_maps.ipv4.entry(request.info_hash).or_default()
} else {
torrent_maps.ipv6.entry(request.info_hash).or_default()
};
// If there is already a peer with this peer_id, check that socket
// addr is same as that of request sender. Otherwise, ignore request.
// Since peers have access to each others peer_id's, they could send
// requests using them, causing all sorts of issues.
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr {
return;
}
}
::log::trace!("received request from {:?}", request_sender_meta);
// Insert/update/remove peer who sent this request
{
let peer_status = PeerStatus::from_event_and_bytes_left(
request.event.unwrap_or_default(),
request.bytes_left,
);
let peer = Peer {
connection_meta: request_sender_meta,
status: peer_status,
valid_until,
};
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(request.peer_id, peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(request.peer_id, peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
}
// If peer sent offers, send them on to random peers
if let Some(offers) = request.offers {
// FIXME: config: also maybe check this when parsing request
let max_num_peers_to_take = offers.len().min(config.protocol.max_offers);
#[inline]
fn f(peer: &Peer) -> Peer {
*peer
}
let offer_receivers: Vec<Peer> = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
request.peer_id,
f,
);
for (offer, offer_receiver) in offers.into_iter().zip(offer_receivers) {
let middleman_offer = MiddlemanOfferToPeer {
action: AnnounceAction,
info_hash: request.info_hash,
peer_id: request.peer_id,
offer: offer.offer,
offer_id: offer.offer_id,
};
out_messages.push((
offer_receiver.connection_meta,
OutMessage::Offer(middleman_offer),
));
::log::trace!(
"sending middleman offer to {:?}",
offer_receiver.connection_meta
);
}
}
// If peer sent answer, send it on to relevant peer
if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) =
(request.answer, request.to_peer_id, request.offer_id)
{
if let Some(answer_receiver) = torrent_data.peers.get(&answer_receiver_id) {
let middleman_answer = MiddlemanAnswerToPeer {
action: AnnounceAction,
peer_id: request.peer_id,
info_hash: request.info_hash,
answer,
offer_id,
};
out_messages.push((
answer_receiver.connection_meta,
OutMessage::Answer(middleman_answer),
));
::log::trace!(
"sending middleman answer to {:?}",
answer_receiver.connection_meta
);
}
}
let out_message = OutMessage::AnnounceResponse(AnnounceResponse {
action: AnnounceAction,
info_hash: request.info_hash,
complete: torrent_data.num_seeders,
incomplete: torrent_data.num_leechers,
announce_interval: config.protocol.peer_announce_interval,
});
out_messages.push((request_sender_meta, out_message));
}
fn handle_scrape_request(
config: &Config,
torrent_maps: &mut TorrentMaps,
out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
meta: ConnectionMeta,
request: ScrapeRequest,
) {
let info_hashes = if let Some(info_hashes) = request.info_hashes {
info_hashes.as_vec()
} else {
return;
};
let num_to_take = info_hashes.len().min(config.protocol.max_scrape_torrents);
let mut out_message = ScrapeResponse {
action: ScrapeAction,
files: HashMap::with_capacity(num_to_take),
};
let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() {
&mut torrent_maps.ipv4
} else {
&mut torrent_maps.ipv6
};
for info_hash in info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_map.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
};
out_message.files.insert(info_hash, stats);
}
}
out_messages.push((meta, OutMessage::ScrapeResponse(out_message)));
}

View file

@ -29,8 +29,6 @@ use crate::config::Config;
use crate::common::*; use crate::common::*;
use super::common::*;
const LOCAL_CHANNEL_SIZE: usize = 16; const LOCAL_CHANNEL_SIZE: usize = 16;
struct PendingScrapeResponse { struct PendingScrapeResponse {

View file

@ -1,14 +1,5 @@
#!/bin/bash #!/bin/sh
. ./scripts/env-native-cpu-without-avx-512 . ./scripts/env-native-cpu-without-avx-512
if [ "$1" != "mio" ] && [ "$1" != "glommio" ]; then cargo run --release --bin aquatic_ws -- $@
echo "Usage: $0 [mio|glommio] [ARGS]"
else
if [ "$1" = "mio" ]; then
cargo run --release --bin aquatic_ws -- "${@:2}"
else
cargo run --release --features "with-glommio" --no-default-features --bin aquatic_ws -- "${@:2}"
fi
fi