diff --git a/CHANGELOG.md b/CHANGELOG.md
index 09708fc..8a8def4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,10 @@
#### Added
* Add support for reporting peer client information
+* Speed up parsing and serialization of requests and responses with
+ [zerocopy](https://crates.io/crates/zerocopy)
+* Store torrents with up to two peers without an extra heap allocation for the
+ peers.
#### Changed
@@ -24,22 +28,18 @@
* Remove support for unbounded worker channels
* Add backpressure in socket workers. They will postpone reading from the
socket if sending a request to a swarm worker failed
-* Reuse allocations in swarm response channel
-* Remove config key `network.poll_event_capacity`
-* Harden ConnectionValidator to make IP spoofing even more costly
* Distribute announce responses from swarm workers over socket workers to
decrease performance loss due to underutilized threads
+* Harden ConnectionValidator to make IP spoofing even more costly
+* Remove config key `network.poll_event_capacity` (always use 1)
### aquatic_http
#### Added
* Reload TLS certificate (and key) on SIGUSR1
-
-#### Changed
-
-* Allow running without TLS
-* Allow running behind reverse proxy
+* Support running without TLS
+* Support running behind reverse proxy
#### Fixed
@@ -64,6 +64,7 @@
#### Fixed
+* Fix memory leak
* Fix bug where clean up after closing connections wasn't always done
* Fix double counting of error responses
* Actually close connections that are too slow to send responses to
diff --git a/Cargo.lock b/Cargo.lock
index 3968b40..53a4b60 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -64,9 +64,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstream"
-version = "0.6.5"
+version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6"
+checksum = "6e2e1ebcb11de5c03c67de28a7df593d32191b44939c482e97702baaaa6ab6a5"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -134,7 +134,7 @@ dependencies = [
"anyhow",
"aquatic_udp",
"aquatic_udp_load_test",
- "clap 4.4.13",
+ "clap 4.4.18",
"humanize-bytes",
"indexmap 2.1.0",
"indoc",
@@ -293,6 +293,7 @@ dependencies = [
"aquatic_common",
"aquatic_toml_config",
"aquatic_udp_protocol",
+ "arrayvec",
"blake3",
"cfg-if",
"compact_str",
@@ -305,9 +306,9 @@ dependencies = [
"io-uring",
"libc",
"log",
- "metrics 0.21.1",
- "metrics-exporter-prometheus 0.12.2",
- "metrics-util 0.15.1",
+ "metrics 0.22.0",
+ "metrics-exporter-prometheus 0.13.0",
+ "metrics-util 0.16.0",
"mimalloc",
"mio",
"num-format",
@@ -319,7 +320,6 @@ dependencies = [
"slab",
"socket2 0.5.5",
"tempfile",
- "thingbuf",
"time",
"tinytemplate",
]
@@ -498,9 +498,9 @@ dependencies = [
[[package]]
name = "base64"
-version = "0.21.5"
+version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9"
+checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "bendy"
@@ -528,9 +528,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
-version = "2.4.1"
+version = "2.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07"
+checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf"
[[package]]
name = "bitmaps"
@@ -661,9 +661,9 @@ dependencies = [
[[package]]
name = "clap"
-version = "4.4.13"
+version = "4.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642"
+checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c"
dependencies = [
"clap_builder",
"clap_derive",
@@ -671,9 +671,9 @@ dependencies = [
[[package]]
name = "clap_builder"
-version = "4.4.12"
+version = "4.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9"
+checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7"
dependencies = [
"anstream",
"anstyle",
@@ -806,7 +806,7 @@ dependencies = [
"anes",
"cast",
"ciborium",
- "clap 4.4.13",
+ "clap 4.4.18",
"criterion-plot",
"is-terminal",
"itertools 0.10.5",
@@ -835,11 +835,10 @@ dependencies = [
[[package]]
name = "crossbeam"
-version = "0.8.3"
+version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6eb9105919ca8e40d437fc9cbb8f1975d916f1bd28afe795a48aae32a2cc8920"
+checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
- "cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
@@ -849,54 +848,46 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
-version = "0.5.10"
+version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "82a9b73a36529d9c47029b9fb3a6f0ea3cc916a261195352ba19e770fc1748b2"
+checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b"
dependencies = [
- "cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
-version = "0.8.4"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fca89a0e215bab21874660c67903c5f143333cab1da83d041c7ded6053774751"
+checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
- "cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
-version = "0.9.17"
+version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0e3681d554572a651dda4186cd47240627c3d0114d45a95f6ad27f2f22e7548d"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
- "autocfg",
- "cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
-version = "0.3.10"
+version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "adc6598521bb5a83d491e8c1fe51db7296019d2ca3cb93cc6c2a20369a4d78a2"
+checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
- "cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
-version = "0.8.18"
+version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c"
-dependencies = [
- "cfg-if",
-]
+checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crypto-common"
@@ -1158,9 +1149,9 @@ dependencies = [
[[package]]
name = "futures-rustls"
-version = "0.25.0"
+version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3afda89bce8f65072d24f8b99a2127e229462d8008182ca93f1d5d2e5df8f22f"
+checksum = "c8d8a2499f0fecc0492eb3e47eab4e92da7875e1028ad2528f214ac3346ca04e"
dependencies = [
"futures-io",
"rustls",
@@ -1209,9 +1200,9 @@ dependencies = [
[[package]]
name = "getrandom"
-version = "0.2.11"
+version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
+checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
dependencies = [
"cfg-if",
"js-sys",
@@ -1356,9 +1347,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
-version = "0.3.3"
+version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
+checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f"
[[package]]
name = "hex"
@@ -1534,7 +1525,7 @@ version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455"
dependencies = [
- "hermit-abi 0.3.3",
+ "hermit-abi 0.3.4",
"rustix",
"windows-sys 0.52.0",
]
@@ -1565,9 +1556,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "js-sys"
-version = "0.3.66"
+version = "0.3.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca"
+checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1"
dependencies = [
"wasm-bindgen",
]
@@ -1654,9 +1645,9 @@ dependencies = [
[[package]]
name = "libc"
-version = "0.2.151"
+version = "0.2.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4"
+checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7"
[[package]]
name = "libm"
@@ -1676,9 +1667,9 @@ dependencies = [
[[package]]
name = "linux-raw-sys"
-version = "0.4.12"
+version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456"
+checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
[[package]]
name = "lock_api"
@@ -1819,16 +1810,12 @@ version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e"
dependencies = [
- "aho-corasick",
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown 0.13.1",
- "indexmap 1.9.3",
"metrics 0.21.1",
"num_cpus",
- "ordered-float 3.9.2",
"quanta 0.11.1",
- "radix_trie",
"sketches-ddsketch 0.2.1",
]
@@ -1845,7 +1832,7 @@ dependencies = [
"indexmap 1.9.3",
"metrics 0.22.0",
"num_cpus",
- "ordered-float 4.2.0",
+ "ordered-float",
"quanta 0.12.2",
"radix_trie",
"sketches-ddsketch 0.2.1",
@@ -2008,7 +1995,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
- "hermit-abi 0.3.3",
+ "hermit-abi 0.3.4",
"libc",
]
@@ -2042,15 +2029,6 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
-[[package]]
-name = "ordered-float"
-version = "3.9.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc"
-dependencies = [
- "num-traits",
-]
-
[[package]]
name = "ordered-float"
version = "4.2.0"
@@ -2078,29 +2056,6 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
-[[package]]
-name = "parking_lot"
-version = "0.12.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
-dependencies = [
- "lock_api",
- "parking_lot_core",
-]
-
-[[package]]
-name = "parking_lot_core"
-version = "0.9.9"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
-dependencies = [
- "cfg-if",
- "libc",
- "redox_syscall",
- "smallvec",
- "windows-targets 0.48.5",
-]
-
[[package]]
name = "percent-encoding"
version = "2.3.1"
@@ -2141,9 +2096,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
-version = "0.3.28"
+version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a"
+checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb"
[[package]]
name = "plotters"
@@ -2371,14 +2326,14 @@ version = "11.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1"
dependencies = [
- "bitflags 2.4.1",
+ "bitflags 2.4.2",
]
[[package]]
name = "rayon"
-version = "1.8.0"
+version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1"
+checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051"
dependencies = [
"either",
"rayon-core",
@@ -2386,9 +2341,9 @@ dependencies = [
[[package]]
name = "rayon-core"
-version = "1.12.0"
+version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed"
+checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",
@@ -2483,11 +2438,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
-version = "0.38.28"
+version = "0.38.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316"
+checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca"
dependencies = [
- "bitflags 2.4.1",
+ "bitflags 2.4.2",
"errno 0.3.8",
"libc",
"linux-raw-sys",
@@ -2496,9 +2451,9 @@ dependencies = [
[[package]]
name = "rustls"
-version = "0.22.1"
+version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fe6b63262c9fcac8659abfaa96cac103d28166d3ff3eaf8f412e19f3ae9e5a48"
+checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
dependencies = [
"log",
"ring",
@@ -2659,9 +2614,9 @@ dependencies = [
[[package]]
name = "simd-json"
-version = "0.13.6"
+version = "0.13.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c58001aca67fc467da571f35e7e1dc9c094e91b099cc54bd3cead2962db2432"
+checksum = "2faf8f101b9bc484337a6a6b0409cf76c139f2fb70a9e3aee6b6774be7bfbf76"
dependencies = [
"getrandom",
"halfbrown",
@@ -2723,9 +2678,9 @@ dependencies = [
[[package]]
name = "smallvec"
-version = "1.11.2"
+version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970"
+checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "smartstring"
@@ -2848,16 +2803,6 @@ version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
-[[package]]
-name = "thingbuf"
-version = "0.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4706f1bfb859af03f099ada2de3cea3e515843c2d3e93b7893f16d94a37f9415"
-dependencies = [
- "parking_lot",
- "pin-project",
-]
-
[[package]]
name = "thiserror"
version = "1.0.56"
@@ -3061,9 +3006,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-bidi"
-version = "0.3.14"
+version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416"
+checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
[[package]]
name = "unicode-ident"
@@ -3166,9 +3111,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
-version = "0.2.89"
+version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e"
+checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@@ -3176,9 +3121,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
-version = "0.2.89"
+version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826"
+checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd"
dependencies = [
"bumpalo",
"log",
@@ -3191,9 +3136,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
-version = "0.2.89"
+version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2"
+checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -3201,9 +3146,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
-version = "0.2.89"
+version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283"
+checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7"
dependencies = [
"proc-macro2",
"quote",
@@ -3214,15 +3159,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
-version = "0.2.89"
+version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f"
+checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b"
[[package]]
name = "web-sys"
-version = "0.3.66"
+version = "0.3.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f"
+checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -3405,9 +3350,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04"
[[package]]
name = "winnow"
-version = "0.5.33"
+version = "0.5.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b7520bbdec7211caa7c4e682eb1fbe07abe20cee6756b6e00f537c82c11816aa"
+checksum = "b7cf47b659b318dccbd69cc4797a39ae128f533dce7902a1096044d1967b9c16"
dependencies = [
"memchr",
]
diff --git a/crates/bencher/src/common.rs b/crates/bencher/src/common.rs
index 6d17664..4e5b958 100644
--- a/crates/bencher/src/common.rs
+++ b/crates/bencher/src/common.rs
@@ -27,17 +27,11 @@ impl TaskSetCpuList {
let indicator = self.0.iter().map(|indicator| match indicator {
TaskSetCpuIndicator::Single(i) => i.to_string(),
TaskSetCpuIndicator::Range(range) => {
- format!(
- "{}-{}",
- range.start,
- range.clone().into_iter().last().unwrap()
- )
+ format!("{}-{}", range.start, range.clone().last().unwrap())
}
});
- Itertools::intersperse_with(indicator, || ",".to_string())
- .into_iter()
- .collect()
+ Itertools::intersperse_with(indicator, || ",".to_string()).collect()
}
pub fn new(
@@ -163,7 +157,7 @@ pub fn simple_load_test_runs(
workers: &[(usize, Priority)],
) -> Vec<(usize, Priority, TaskSetCpuList)> {
workers
- .into_iter()
+ .iter()
.copied()
.map(|(workers, priority)| {
(
diff --git a/crates/bencher/src/html.rs b/crates/bencher/src/html.rs
index 15a0b93..03278f3 100644
--- a/crates/bencher/src/html.rs
+++ b/crates/bencher/src/html.rs
@@ -191,7 +191,7 @@ pub fn html_all_runs(all_results: &[TrackerCoreCountResults]) -> String {
load_test_key_names = load_test_key_names.iter()
.map(|name| format!("
Load test {} | ", name))
.join("\n"),
- body = results.into_iter().map(|r| {
+ body = results.iter_mut().map(|r| {
formatdoc! {
"
diff --git a/crates/bencher/src/protocols/udp.rs b/crates/bencher/src/protocols/udp.rs
index 5ea7eaf..48ee1bf 100644
--- a/crates/bencher/src/protocols/udp.rs
+++ b/crates/bencher/src/protocols/udp.rs
@@ -259,6 +259,7 @@ struct AquaticUdpRunner {
}
impl AquaticUdpRunner {
+ #[allow(clippy::new_ret_no_self)]
fn new(
socket_workers: usize,
swarm_workers: usize,
@@ -275,6 +276,7 @@ impl AquaticUdpRunner {
impl ProcessRunner for AquaticUdpRunner {
type Command = UdpCommand;
+ #[allow(clippy::field_reassign_with_default)]
fn run(
&self,
command: &Self::Command,
@@ -322,6 +324,7 @@ struct OpenTrackerUdpRunner {
}
impl OpenTrackerUdpRunner {
+ #[allow(clippy::new_ret_no_self)]
fn new(workers: usize, priority: Priority) -> Rc> {
Rc::new(Self { workers, priority })
}
@@ -368,6 +371,7 @@ impl ProcessRunner for OpenTrackerUdpRunner {
struct ChihayaUdpRunner;
impl ChihayaUdpRunner {
+ #[allow(clippy::new_ret_no_self)]
fn new() -> Rc> {
Rc::new(Self {})
}
@@ -426,6 +430,7 @@ struct AquaticUdpLoadTestRunner {
impl ProcessRunner for AquaticUdpLoadTestRunner {
type Command = UdpCommand;
+ #[allow(clippy::field_reassign_with_default)]
fn run(
&self,
command: &Self::Command,
diff --git a/crates/bencher/src/run.rs b/crates/bencher/src/run.rs
index 2757404..a28e62f 100644
--- a/crates/bencher/src/run.rs
+++ b/crates/bencher/src/run.rs
@@ -59,9 +59,7 @@ impl RunConfig {
.run(command, &self.tracker_vcpus, &mut tracker_config_file)
{
Ok(handle) => ChildWrapper(handle),
- Err(err) => {
- return Err(RunErrorResults::new(self).set_error(err.into(), "run tracker"))
- }
+ Err(err) => return Err(RunErrorResults::new(self).set_error(err, "run tracker")),
};
::std::thread::sleep(Duration::from_secs(1));
@@ -74,7 +72,7 @@ impl RunConfig {
Ok(handle) => ChildWrapper(handle),
Err(err) => {
return Err(RunErrorResults::new(self)
- .set_error(err.into(), "run load test")
+ .set_error(err, "run load test")
.set_tracker_outputs(tracker))
}
};
@@ -328,7 +326,7 @@ impl FromStr for ProcessStats {
type Err = ();
fn from_str(s: &str) -> Result {
- let mut parts = s.trim().split_whitespace();
+ let mut parts = s.split_whitespace();
let avg_cpu_utilization = parts.next().ok_or(())?.parse().map_err(|_| ())?;
let peak_rss_kb: f32 = parts.next().ok_or(())?.parse().map_err(|_| ())?;
diff --git a/crates/bencher/src/set.rs b/crates/bencher/src/set.rs
index a34a3e7..7ddeef2 100644
--- a/crates/bencher/src/set.rs
+++ b/crates/bencher/src/set.rs
@@ -73,13 +73,13 @@ pub fn run_sets(
(minutes / 60, minutes % 60)
};
- println!("");
+ println!();
println!("Total number of load test runs: {}", total_num_runs);
println!(
"Estimated duration: {} hours, {} minutes",
estimated_hours, estimated_minutes
);
- println!("");
+ println!();
let results = set_configs
.into_iter()
@@ -115,7 +115,7 @@ pub fn run_sets(
&load_test_gen,
load_test_parameters,
implementation,
- &tracker_run,
+ tracker_run,
tracker_vcpus.clone(),
load_test_vcpus,
)
diff --git a/crates/combined_binary/src/main.rs b/crates/combined_binary/src/main.rs
index 97fba0f..56eeddd 100644
--- a/crates/combined_binary/src/main.rs
+++ b/crates/combined_binary/src/main.rs
@@ -12,12 +12,12 @@ fn main() {
::std::process::exit(match run() {
Ok(()) => 0,
Err(None) => {
- print_help(|| gen_info(), None);
+ print_help(gen_info, None);
0
}
Err(opt_err @ Some(_)) => {
- print_help(|| gen_info(), opt_err);
+ print_help(gen_info, opt_err);
1
}
@@ -62,7 +62,7 @@ fn run() -> Result<(), Option> {
arg => {
let opt_err = if arg == "-h" || arg == "--help" {
None
- } else if arg.chars().next() == Some('-') {
+ } else if arg.starts_with('-') {
Some("First argument must be protocol".to_string())
} else {
Some("Invalid protocol".to_string())
diff --git a/crates/common/src/access_list.rs b/crates/common/src/access_list.rs
index 459c5ca..5ba7ea8 100644
--- a/crates/common/src/access_list.rs
+++ b/crates/common/src/access_list.rs
@@ -71,7 +71,7 @@ impl AccessList {
}
new_list
- .insert_from_line(&line)
+ .insert_from_line(line)
.with_context(|| format!("Invalid line in access list: {}", line))?;
}
@@ -86,6 +86,7 @@ impl AccessList {
}
}
+ #[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
self.0.len()
}
@@ -155,10 +156,10 @@ mod tests {
fn test_parse_info_hash() {
let f = parse_info_hash;
- assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok());
- assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err());
- assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeee".into()).is_err());
- assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeö".into()).is_err());
+ assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee").is_ok());
+ assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef").is_err());
+ assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeee").is_err());
+ assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeö").is_err());
}
#[test]
diff --git a/crates/common/src/cli.rs b/crates/common/src/cli.rs
index db18f09..d935285 100644
--- a/crates/common/src/cli.rs
+++ b/crates/common/src/cli.rs
@@ -47,6 +47,7 @@ impl Options {
{
let mut options = Options::default();
+ #[allow(clippy::while_let_loop)] // False positive
loop {
if let Some(arg) = arg_iter.next() {
match arg.as_str() {
diff --git a/crates/common/src/cpu_pinning.rs b/crates/common/src/cpu_pinning.rs
index 870ea59..529f15c 100644
--- a/crates/common/src/cpu_pinning.rs
+++ b/crates/common/src/cpu_pinning.rs
@@ -296,12 +296,12 @@ pub fn pin_current_if_configured_to(
let cpu_set = core_cpu_sets
.get(core_index)
- .expect(&format!("get cpu set for core {}", core_index))
+ .unwrap_or_else(|| panic!("get cpu set for core {}", core_index))
.to_owned();
topology
.set_cpubind(cpu_set, CPUBIND_THREAD)
- .expect(&format!("bind thread to core {}", core_index));
+ .unwrap_or_else(|err| panic!("bind thread to core {}: {:?}", core_index, err));
::log::info!(
"Pinned worker {:?} to cpu core {}",
diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs
index 44f25f5..6d0dbb6 100644
--- a/crates/common/src/lib.rs
+++ b/crates/common/src/lib.rs
@@ -39,6 +39,7 @@ impl ValidUntil {
pub struct ServerStartInstant(Instant);
impl ServerStartInstant {
+ #[allow(clippy::new_without_default)] // I prefer ::new here
pub fn new() -> Self {
Self(Instant::now())
}
@@ -82,13 +83,11 @@ impl Drop for PanicSentinel {
if ::std::thread::panicking() {
let already_triggered = self.0.fetch_or(true, Ordering::SeqCst);
- if !already_triggered {
- if unsafe { libc::raise(15) } == -1 {
- panic!(
- "Could not raise SIGTERM: {:#}",
- ::std::io::Error::last_os_error()
- )
- }
+ if !already_triggered && unsafe { libc::raise(15) } == -1 {
+ panic!(
+ "Could not raise SIGTERM: {:#}",
+ ::std::io::Error::last_os_error()
+ )
}
}
}
diff --git a/crates/common/src/privileges.rs b/crates/common/src/privileges.rs
index 907d80b..10731ca 100644
--- a/crates/common/src/privileges.rs
+++ b/crates/common/src/privileges.rs
@@ -48,15 +48,13 @@ impl PrivilegeDropper {
}
pub fn after_socket_creation(self) -> anyhow::Result<()> {
- if self.config.drop_privileges {
- if self.barrier.wait().is_leader() {
- PrivDrop::default()
- .chroot(self.config.chroot_path.clone())
- .group(self.config.group.clone())
- .user(self.config.user.clone())
- .apply()
- .with_context(|| "couldn't drop privileges after socket creation")?;
- }
+ if self.config.drop_privileges && self.barrier.wait().is_leader() {
+ PrivDrop::default()
+ .chroot(self.config.chroot_path.clone())
+ .group(self.config.group.clone())
+ .user(self.config.user.clone())
+ .apply()
+ .with_context(|| "couldn't drop privileges after socket creation")?;
}
Ok(())
diff --git a/crates/common/src/rustls_config.rs b/crates/common/src/rustls_config.rs
index 56b8336..86d2bd0 100644
--- a/crates/common/src/rustls_config.rs
+++ b/crates/common/src/rustls_config.rs
@@ -46,6 +46,7 @@ pub fn create_rustls_config(
.next()
.ok_or(anyhow::anyhow!("No private keys in file"))??;
+ #[allow(clippy::let_and_return)] // Using temporary variable fixes lifetime issue
key
};
diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs
index 7db5146..d44508a 100644
--- a/crates/http/src/workers/socket/connection.rs
+++ b/crates/http/src/workers/socket/connection.rs
@@ -16,11 +16,9 @@ use aquatic_http_protocol::response::{
use arc_swap::ArcSwap;
use either::Either;
use futures::stream::FuturesUnordered;
-use futures_lite::future::race;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::Senders;
-use glommio::channels::local_channel::LocalReceiver;
use glommio::channels::shared_channel::{self, SharedReceiver};
use glommio::net::TcpStream;
use once_cell::sync::Lazy;
@@ -76,7 +74,6 @@ pub(super) async fn run_connection(
server_start_instant: ServerStartInstant,
opt_tls_config: Option>>,
valid_until: Rc>,
- close_conn_receiver: LocalReceiver<()>,
stream: TcpStream,
) -> Result<(), ConnectionError> {
let access_list_cache = create_access_list_cache(&access_list);
@@ -119,7 +116,7 @@ pub(super) async fn run_connection(
stream,
};
- conn.run(close_conn_receiver).await?;
+ conn.run().await
} else {
let mut conn = Connection {
config,
@@ -135,10 +132,8 @@ pub(super) async fn run_connection(
stream,
};
- conn.run(close_conn_receiver).await?;
+ conn.run().await
}
-
- Ok(())
}
struct Connection {
@@ -159,18 +154,7 @@ impl Connection
where
S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static,
{
- async fn run(&mut self, close_conn_receiver: LocalReceiver<()>) -> Result<(), ConnectionError> {
- let f1 = async { self.run_request_response_loop().await };
- let f2 = async {
- close_conn_receiver.recv().await;
-
- Err(ConnectionError::Inactive)
- };
-
- race(f1, f2).await
- }
-
- async fn run_request_response_loop(&mut self) -> Result<(), ConnectionError> {
+ async fn run(&mut self) -> Result<(), ConnectionError> {
loop {
let response = match self.read_request().await? {
Either::Left(response) => Response::Failure(response),
@@ -399,7 +383,7 @@ where
let body_len = response
.write(&mut &mut self.response_buffer[position..])
- .map_err(|err| ConnectionError::ResponseBufferWrite(err))?;
+ .map_err(ConnectionError::ResponseBufferWrite)?;
position += body_len;
@@ -407,7 +391,7 @@ where
return Err(ConnectionError::ResponseBufferFull);
}
- (&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n");
+ self.response_buffer[position..position + 2].copy_from_slice(b"\r\n");
position += 2;
@@ -419,7 +403,7 @@ where
let start = RESPONSE_HEADER_A.len();
let end = start + RESPONSE_HEADER_B.len();
- (&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B);
+ self.response_buffer[start..end].copy_from_slice(RESPONSE_HEADER_B);
}
// Set content-len header value
@@ -431,7 +415,7 @@ where
let start = RESPONSE_HEADER_A.len();
let end = start + content_len_bytes.len();
- (&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes);
+ self.response_buffer[start..end].copy_from_slice(content_len_bytes);
}
// Write buffer to stream
diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs
index 0baa68b..025b4a4 100644
--- a/crates/http/src/workers/socket/mod.rs
+++ b/crates/http/src/workers/socket/mod.rs
@@ -12,6 +12,7 @@ use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant};
use arc_swap::ArcSwap;
+use futures_lite::future::race;
use futures_lite::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role};
use glommio::channels::local_channel::{new_bounded, LocalSender};
@@ -32,6 +33,7 @@ struct ConnectionHandle {
valid_until: Rc>,
}
+#[allow(clippy::too_many_arguments)]
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
@@ -97,16 +99,23 @@ pub async fn run_socket_worker(
"worker_index" => worker_index.to_string(),
);
- let result = run_connection(
- config,
- access_list,
- request_senders,
- server_start_instant,
- opt_tls_config,
- valid_until.clone(),
- close_conn_receiver,
- stream,
- ).await;
+ let f1 = async { run_connection(
+ config,
+ access_list,
+ request_senders,
+ server_start_instant,
+ opt_tls_config,
+ valid_until.clone(),
+ stream,
+ ).await
+ };
+ let f2 = async {
+ close_conn_receiver.recv().await;
+
+ Err(ConnectionError::Inactive)
+ };
+
+ let result = race(f1, f2).await;
#[cfg(feature = "metrics")]
::metrics::decrement_gauge!(
diff --git a/crates/http/src/workers/socket/request.rs b/crates/http/src/workers/socket/request.rs
index 4412382..ec5f19f 100644
--- a/crates/http/src/workers/socket/request.rs
+++ b/crates/http/src/workers/socket/request.rs
@@ -52,7 +52,7 @@ fn parse_forwarded_header(
header_format: ReverseProxyPeerIpHeaderFormat,
headers: &[httparse::Header<'_>],
) -> anyhow::Result {
- for header in headers.into_iter().rev() {
+ for header in headers.iter().rev() {
if header.name == header_name {
match header_format {
ReverseProxyPeerIpHeaderFormat::LastAddress => {
diff --git a/crates/http/src/workers/swarm/storage.rs b/crates/http/src/workers/swarm/storage.rs
index 1aa9bfe..d9ff867 100644
--- a/crates/http/src/workers/swarm/storage.rs
+++ b/crates/http/src/workers/swarm/storage.rs
@@ -66,16 +66,14 @@ impl TorrentMaps {
valid_until,
);
- let response = AnnounceResponse {
+ AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(response_peers),
peers6: ResponsePeerListV6(vec![]),
warning_message: None,
- };
-
- response
+ }
}
IpAddr::V6(peer_ip_address) => {
let (seeders, leechers, response_peers) = self
@@ -90,16 +88,14 @@ impl TorrentMaps {
valid_until,
);
- let response = AnnounceResponse {
+ AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(vec![]),
peers6: ResponsePeerListV6(response_peers),
warning_message: None,
- };
-
- response
+ }
}
}
}
diff --git a/crates/http_load_test/src/utils.rs b/crates/http_load_test/src/utils.rs
index c3dab09..d22c2c1 100644
--- a/crates/http_load_test/src/utils.rs
+++ b/crates/http_load_test/src/utils.rs
@@ -19,7 +19,7 @@ pub fn create_random_request(
let items = [RequestType::Announce, RequestType::Scrape];
- let dist = WeightedIndex::new(&weights).expect("random request weighted index");
+ let dist = WeightedIndex::new(weights).expect("random request weighted index");
match items[dist.sample(rng)] {
RequestType::Announce => create_announce_request(config, state, rng),
@@ -37,7 +37,7 @@ fn create_announce_request(config: &Config, state: &LoadTestState, rng: &mut imp
}
};
- let info_hash_index = select_info_hash_index(config, &state, rng);
+ let info_hash_index = select_info_hash_index(config, state, rng);
Request::Announce(AnnounceRequest {
info_hash: state.info_hashes[info_hash_index],
@@ -57,7 +57,7 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl
let mut scrape_hashes = Vec::with_capacity(5);
for _ in 0..5 {
- let info_hash_index = select_info_hash_index(config, &state, rng);
+ let info_hash_index = select_info_hash_index(config, state, rng);
scrape_hashes.push(state.info_hashes[info_hash_index]);
}
diff --git a/crates/http_protocol/src/request.rs b/crates/http_protocol/src/request.rs
index 865a448..2fd175c 100644
--- a/crates/http_protocol/src/request.rs
+++ b/crates/http_protocol/src/request.rs
@@ -86,7 +86,7 @@ impl AnnounceRequest {
let mut position = 0usize;
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) {
- let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len());
+ let segment_end = ampersand_iter.next().unwrap_or(query_string.len());
let key = query_string
.get(position..equal_sign_index)
@@ -207,7 +207,7 @@ impl ScrapeRequest {
let mut position = 0usize;
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) {
- let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len());
+ let segment_end = ampersand_iter.next().unwrap_or(query_string.len());
let key = query_string
.get(position..equal_sign_index)
@@ -348,7 +348,7 @@ mod tests {
let mut bytes = Vec::new();
bytes.extend_from_slice(b"GET ");
- bytes.extend_from_slice(&ANNOUNCE_REQUEST_PATH.as_bytes());
+ bytes.extend_from_slice(ANNOUNCE_REQUEST_PATH.as_bytes());
bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n");
let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap();
@@ -362,7 +362,7 @@ mod tests {
let mut bytes = Vec::new();
bytes.extend_from_slice(b"GET ");
- bytes.extend_from_slice(&SCRAPE_REQUEST_PATH.as_bytes());
+ bytes.extend_from_slice(SCRAPE_REQUEST_PATH.as_bytes());
bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n");
let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap();
diff --git a/crates/http_protocol/src/utils.rs b/crates/http_protocol/src/utils.rs
index 618eb02..1668acc 100644
--- a/crates/http_protocol/src/utils.rs
+++ b/crates/http_protocol/src/utils.rs
@@ -43,7 +43,7 @@ pub fn urldecode_20_bytes(value: &str) -> anyhow::Result<[u8; 20]> {
let hex = [first as u8, second as u8];
- hex::decode_to_slice(&hex, &mut out_arr[i..i + 1])
+ hex::decode_to_slice(hex, &mut out_arr[i..i + 1])
.map_err(|err| anyhow::anyhow!("hex decode error: {:?}", err))?;
} else {
out_arr[i] = c as u8;
@@ -280,6 +280,7 @@ mod tests {
}
}
+ #[allow(clippy::too_many_arguments)]
#[quickcheck]
fn test_urlencode_urldecode_20_bytes(
a: u8,
diff --git a/crates/peer_id/src/lib.rs b/crates/peer_id/src/lib.rs
index dbc7fcf..304e1e5 100644
--- a/crates/peer_id/src/lib.rs
+++ b/crates/peer_id/src/lib.rs
@@ -234,7 +234,7 @@ mod tests {
let len = bytes.len();
- (&mut peer_id.0[..len]).copy_from_slice(bytes);
+ peer_id.0[..len].copy_from_slice(bytes);
peer_id
}
diff --git a/crates/toml_config_derive/src/lib.rs b/crates/toml_config_derive/src/lib.rs
index d84e200..89cdff7 100644
--- a/crates/toml_config_derive/src/lib.rs
+++ b/crates/toml_config_derive/src/lib.rs
@@ -146,21 +146,18 @@ fn extract_comment_string(attrs: Vec) -> TokenStream {
}
for token_tree in attr.tokens {
- match token_tree {
- TokenTree::Literal(literal) => {
- let mut comment = format!("{}", literal);
+ if let TokenTree::Literal(literal) = token_tree {
+ let mut comment = format!("{}", literal);
- // Strip leading and trailing quotation marks
- comment.remove(comment.len() - 1);
- comment.remove(0);
+ // Strip leading and trailing quotation marks
+ comment.remove(comment.len() - 1);
+ comment.remove(0);
- // Add toml comment indicator
- comment.insert(0, '#');
+ // Add toml comment indicator
+ comment.insert(0, '#');
- output.push_str(&comment);
- output.push('\n');
- }
- _ => {}
+ output.push_str(&comment);
+ output.push('\n');
}
}
}
diff --git a/crates/udp/Cargo.toml b/crates/udp/Cargo.toml
index 4e9eac3..430ab49 100644
--- a/crates/udp/Cargo.toml
+++ b/crates/udp/Cargo.toml
@@ -32,6 +32,7 @@ aquatic_toml_config.workspace = true
aquatic_udp_protocol.workspace = true
anyhow = "1"
+arrayvec = "0.7"
blake3 = "1"
cfg-if = "1"
compact_str = "0.7"
@@ -51,20 +52,18 @@ serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }
slab = "0.4"
socket2 = { version = "0.5", features = ["all"] }
-thingbuf = "0.1"
time = { version = "0.3", features = ["formatting"] }
tinytemplate = "1"
# prometheus feature
-metrics = { version = "0.21", optional = true }
-metrics-util = { version = "0.15", optional = true }
-metrics-exporter-prometheus = { version = "0.12", optional = true, default-features = false, features = ["http-listener"] }
+metrics = { version = "0.22", optional = true }
+metrics-util = { version = "0.16", optional = true }
+metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] }
# io-uring feature
io-uring = { version = "0.6", optional = true }
[dev-dependencies]
-hex = "0.4"
tempfile = "3"
quickcheck = "1"
quickcheck_macros = "1"
diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs
index 6504f06..f803ea5 100644
--- a/crates/udp/src/common.rs
+++ b/crates/udp/src/common.rs
@@ -1,68 +1,19 @@
-use std::borrow::Cow;
use std::collections::BTreeMap;
use std::hash::Hash;
-use std::io::Write;
-use std::mem::size_of;
-use std::net::{SocketAddr, SocketAddrV4};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
-use crossbeam_channel::{Sender, TrySendError};
+use crossbeam_channel::{Receiver, SendError, Sender, TrySendError};
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
use aquatic_udp_protocol::*;
use hdrhistogram::Histogram;
-use thingbuf::mpsc::blocking::SendRef;
use crate::config::Config;
pub const BUFFER_SIZE: usize = 8192;
-#[derive(PartialEq, Eq, Clone, Debug)]
-pub enum CowResponse<'a> {
- Connect(Cow<'a, ConnectResponse>),
- AnnounceIpv4(Cow<'a, AnnounceResponse>),
- AnnounceIpv6(Cow<'a, AnnounceResponse>),
- Scrape(Cow<'a, ScrapeResponse>),
- Error(Cow<'a, ErrorResponse>),
-}
-
-impl From for CowResponse<'_> {
- fn from(value: Response) -> Self {
- match value {
- Response::AnnounceIpv4(r) => Self::AnnounceIpv4(Cow::Owned(r)),
- Response::AnnounceIpv6(r) => Self::AnnounceIpv6(Cow::Owned(r)),
- Response::Connect(r) => Self::Connect(Cow::Owned(r)),
- Response::Scrape(r) => Self::Scrape(Cow::Owned(r)),
- Response::Error(r) => Self::Error(Cow::Owned(r)),
- }
- }
-}
-
-impl<'a> CowResponse<'a> {
- pub fn into_owned(self) -> Response {
- match self {
- CowResponse::Connect(r) => Response::Connect(r.into_owned()),
- CowResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r.into_owned()),
- CowResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r.into_owned()),
- CowResponse::Scrape(r) => Response::Scrape(r.into_owned()),
- CowResponse::Error(r) => Response::Error(r.into_owned()),
- }
- }
-
- #[inline]
- pub fn write(&self, bytes: &mut impl Write) -> Result<(), ::std::io::Error> {
- match self {
- Self::Connect(r) => r.write(bytes),
- Self::AnnounceIpv4(r) => r.write(bytes),
- Self::AnnounceIpv6(r) => r.write(bytes),
- Self::Scrape(r) => r.write(bytes),
- Self::Error(r) => r.write(bytes),
- }
- }
-}
-
#[derive(Debug)]
pub struct PendingScrapeRequest {
pub slab_key: usize,
@@ -88,52 +39,6 @@ pub enum ConnectedResponse {
Scrape(PendingScrapeResponse),
}
-pub enum ConnectedResponseKind {
- AnnounceIpv4,
- AnnounceIpv6,
- Scrape,
-}
-
-pub struct ConnectedResponseWithAddr {
- pub kind: ConnectedResponseKind,
- pub announce_ipv4: AnnounceResponse,
- pub announce_ipv6: AnnounceResponse,
- pub scrape: PendingScrapeResponse,
- pub addr: CanonicalSocketAddr,
-}
-
-impl ConnectedResponseWithAddr {
- pub fn estimated_max_size(config: &Config) -> usize {
- size_of::()
- + config.protocol.max_response_peers
- * (size_of::>()
- + size_of::>())
- }
-}
-
-pub struct Recycler;
-
-impl thingbuf::Recycle for Recycler {
- fn new_element(&self) -> ConnectedResponseWithAddr {
- ConnectedResponseWithAddr {
- kind: ConnectedResponseKind::AnnounceIpv4,
- announce_ipv4: AnnounceResponse::empty(),
- announce_ipv6: AnnounceResponse::empty(),
- scrape: PendingScrapeResponse {
- slab_key: 0,
- torrent_stats: Default::default(),
- },
- addr: CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(0.into(), 0))),
- }
- }
- fn recycle(&self, element: &mut ConnectedResponseWithAddr) {
- element.announce_ipv4.peers.clear();
- element.announce_ipv6.peers.clear();
- element.scrape.torrent_stats.clear();
- element.addr = CanonicalSocketAddr::new(SocketAddr::V4(SocketAddrV4::new(0.into(), 0)));
- }
-}
-
#[derive(Clone, Copy, Debug)]
pub struct SocketWorkerIndex(pub usize);
@@ -180,54 +85,73 @@ impl ConnectedRequestSender {
}
pub struct ConnectedResponseSender {
- senders: Vec>,
+ senders: Vec>,
to_any_last_index_picked: usize,
}
impl ConnectedResponseSender {
- pub fn new(
- senders: Vec>,
- ) -> Self {
+ pub fn new(senders: Vec>) -> Self {
Self {
senders,
to_any_last_index_picked: 0,
}
}
- pub fn try_send_ref_to(
+ pub fn try_send_to(
&self,
index: SocketWorkerIndex,
- ) -> Result, thingbuf::mpsc::errors::TrySendError> {
- self.senders[index.0].try_send_ref()
+ addr: CanonicalSocketAddr,
+ response: ConnectedResponse,
+ ) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> {
+ self.senders[index.0].try_send((addr, response))
}
- pub fn send_ref_to(
+ pub fn send_to(
&self,
index: SocketWorkerIndex,
- ) -> Result, thingbuf::mpsc::errors::Closed> {
- self.senders[index.0].send_ref()
+ addr: CanonicalSocketAddr,
+ response: ConnectedResponse,
+ ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> {
+ self.senders[index.0].send((addr, response))
}
- pub fn send_ref_to_any(
+ pub fn send_to_any(
&mut self,
- ) -> Result, thingbuf::mpsc::errors::Closed> {
+ addr: CanonicalSocketAddr,
+ response: ConnectedResponse,
+ ) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> {
let start = self.to_any_last_index_picked + 1;
- for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) {
- if let Ok(sender) = self.senders[i].try_send_ref() {
- self.to_any_last_index_picked = i;
+ let mut message = Some((addr, response));
- return Ok(sender);
+ for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) {
+ match self.senders[i].try_send(message.take().unwrap()) {
+ Ok(()) => {
+ self.to_any_last_index_picked = i;
+
+ return Ok(());
+ }
+ Err(TrySendError::Full(msg)) => {
+ message = Some(msg);
+ }
+ Err(TrySendError::Disconnected(_)) => {
+ panic!("ConnectedResponseReceiver disconnected");
+ }
}
}
+ let (addr, response) = message.unwrap();
+
self.to_any_last_index_picked = start % self.senders.len();
- self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked))
+ self.send_to(
+ SocketWorkerIndex(self.to_any_last_index_picked),
+ addr,
+ response,
+ )
}
}
-pub type ConnectedResponseReceiver =
- thingbuf::mpsc::blocking::Receiver;
+pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>;
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum PeerStatus {
@@ -287,7 +211,7 @@ impl Statistics {
}
fn create_atomic_usize_vec(len: usize) -> Vec {
- ::std::iter::repeat_with(|| AtomicUsize::default())
+ ::std::iter::repeat_with(AtomicUsize::default)
.take(len)
.collect()
}
diff --git a/crates/udp/src/lib.rs b/crates/udp/src/lib.rs
index c9871ba..50a0439 100644
--- a/crates/udp/src/lib.rs
+++ b/crates/udp/src/lib.rs
@@ -3,7 +3,6 @@ pub mod config;
pub mod workers;
use std::collections::BTreeMap;
-use std::mem::size_of;
use std::thread::Builder;
use std::time::Duration;
@@ -16,16 +15,14 @@ use aquatic_common::access_list::update_access_list;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::PrivilegeDropper;
-use aquatic_common::{CanonicalSocketAddr, PanicSentinelWatcher, ServerStartInstant};
+use aquatic_common::{PanicSentinelWatcher, ServerStartInstant};
use common::{
- ConnectedRequestSender, ConnectedResponseSender, Recycler, SocketWorkerIndex, State,
- SwarmWorkerIndex,
+ ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex,
};
use config::Config;
use workers::socket::ConnectionValidator;
-
-use crate::common::{ConnectedRequest, ConnectedResponseWithAddr};
+use workers::swarm::SwarmWorker;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -33,11 +30,6 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
- ::log::info!(
- "Estimated max channel memory use: {:.02} MB",
- est_max_total_channel_memory(&config)
- );
-
let state = State::new(config.swarm_workers);
let connection_validator = ConnectionValidator::new(&config)?;
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
@@ -56,19 +48,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let server_start_instant = ServerStartInstant::new();
for i in 0..config.swarm_workers {
- let (request_sender, request_receiver) = if config.worker_channel_size == 0 {
- unbounded()
- } else {
- bounded(config.worker_channel_size)
- };
+ let (request_sender, request_receiver) = bounded(config.worker_channel_size);
request_senders.push(request_sender);
request_receivers.insert(i, request_receiver);
}
for i in 0..config.socket_workers {
- let (response_sender, response_receiver) =
- thingbuf::mpsc::blocking::with_recycle(config.worker_channel_size, Recycler);
+ let (response_sender, response_receiver) = bounded(config.worker_channel_size);
response_senders.push(response_sender);
response_receivers.insert(i, response_receiver);
@@ -93,16 +80,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
WorkerIndex::SwarmWorker(i),
);
- workers::swarm::run_swarm_worker(
- sentinel,
+ let mut worker = SwarmWorker {
+ _sentinel: sentinel,
config,
state,
server_start_instant,
request_receiver,
response_sender,
statistics_sender,
- SwarmWorkerIndex(i),
- )
+ worker_index: SwarmWorkerIndex(i),
+ };
+
+ worker.run();
})
.with_context(|| "spawn swarm worker")?;
}
@@ -214,16 +203,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
Ok(())
}
-
-fn est_max_total_channel_memory(config: &Config) -> f64 {
- let request_channel_max_size = config.swarm_workers
- * config.worker_channel_size
- * (size_of::()
- + size_of::()
- + size_of::());
- let response_channel_max_size = config.socket_workers
- * config.worker_channel_size
- * ConnectedResponseWithAddr::estimated_max_size(&config);
-
- (request_channel_max_size as u64 + response_channel_max_size as u64) as f64 / (1024.0 * 1024.0)
-}
diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs
index 64daf5f..070e00b 100644
--- a/crates/udp/src/workers/socket/mio.rs
+++ b/crates/udp/src/workers/socket/mio.rs
@@ -1,4 +1,3 @@
-use std::borrow::Cow;
use std::io::{Cursor, ErrorKind};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
@@ -42,7 +41,7 @@ pub struct SocketWorker {
server_start_instant: ServerStartInstant,
pending_scrape_responses: PendingScrapeResponseSlab,
socket: UdpSocket,
- opt_resend_buffer: Option>,
+ opt_resend_buffer: Option>,
buffer: [u8; BUFFER_SIZE],
polling_mode: PollMode,
/// Storage for requests that couldn't be sent to swarm worker because channel was full
@@ -50,6 +49,7 @@ pub struct SocketWorker {
}
impl SocketWorker {
+ #[allow(clippy::too_many_arguments)]
pub fn run(
_sentinel: PanicSentinel,
shared_state: State,
@@ -133,14 +133,14 @@ impl SocketWorker {
// If resend buffer is enabled, send any responses in it
if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() {
- for (response, addr) in resend_buffer.drain(..) {
+ for (addr, response) in resend_buffer.drain(..) {
Self::send_response(
&self.config,
&self.shared_state,
&mut self.socket,
&mut self.buffer,
&mut None,
- response.into(),
+ response,
addr,
);
}
@@ -206,7 +206,7 @@ impl SocketWorker {
if let Err(HandleRequestError::RequestChannelFull(failed_requests)) =
self.handle_request(pending_scrape_valid_until, request, src)
{
- self.pending_requests.extend(failed_requests.into_iter());
+ self.pending_requests.extend(failed_requests);
self.polling_mode = PollMode::SkipReceiving;
break;
@@ -235,7 +235,7 @@ impl SocketWorker {
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
- CowResponse::Error(Cow::Owned(response)),
+ Response::Error(response),
src,
);
}
@@ -310,7 +310,7 @@ impl SocketWorker {
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
- CowResponse::Connect(Cow::Owned(response)),
+ Response::Connect(response),
src,
);
@@ -346,7 +346,7 @@ impl SocketWorker {
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
- CowResponse::Error(Cow::Owned(response)),
+ Response::Error(response),
src,
);
@@ -392,30 +392,20 @@ impl SocketWorker {
}
fn handle_swarm_worker_responses(&mut self) {
- loop {
- let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() {
- recv_ref
- } else {
- break;
- };
-
- let response = match recv_ref.kind {
- ConnectedResponseKind::Scrape => {
+ for (addr, response) in self.response_receiver.try_iter() {
+ let response = match response {
+ ConnectedResponse::Scrape(response) => {
if let Some(r) = self
.pending_scrape_responses
- .add_and_get_finished(&recv_ref.scrape)
+ .add_and_get_finished(&response)
{
- CowResponse::Scrape(Cow::Owned(r))
+ Response::Scrape(r)
} else {
continue;
}
}
- ConnectedResponseKind::AnnounceIpv4 => {
- CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4))
- }
- ConnectedResponseKind::AnnounceIpv6 => {
- CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6))
- }
+ ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r),
+ ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r),
};
Self::send_response(
@@ -425,7 +415,7 @@ impl SocketWorker {
&mut self.buffer,
&mut self.opt_resend_buffer,
response,
- recv_ref.addr,
+ addr,
);
}
}
@@ -435,8 +425,8 @@ impl SocketWorker {
shared_state: &State,
socket: &mut UdpSocket,
buffer: &mut [u8],
- opt_resend_buffer: &mut Option>,
- response: CowResponse,
+ opt_resend_buffer: &mut Option>,
+ response: Response,
canonical_addr: CanonicalSocketAddr,
) {
let mut buffer = Cursor::new(&mut buffer[..]);
@@ -478,18 +468,18 @@ impl SocketWorker {
};
match response {
- CowResponse::Connect(_) => {
+ Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
}
- CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => {
+ Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
}
- CowResponse::Scrape(_) => {
+ Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
}
- CowResponse::Error(_) => {
+ Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
}
}
@@ -503,7 +493,7 @@ impl SocketWorker {
if resend_buffer.len() < config.network.resend_buffer_max_len {
::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err);
- resend_buffer.push((response.into_owned(), canonical_addr));
+ resend_buffer.push((canonical_addr, response));
} else {
::log::warn!("Response resend buffer full, dropping response");
}
diff --git a/crates/udp/src/workers/socket/mod.rs b/crates/udp/src/workers/socket/mod.rs
index b683e13..6889c79 100644
--- a/crates/udp/src/workers/socket/mod.rs
+++ b/crates/udp/src/workers/socket/mod.rs
@@ -36,6 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8;
/// - 8 bit udp header
const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8;
+#[allow(clippy::too_many_arguments)]
pub fn run_socket_worker(
sentinel: PanicSentinel,
shared_state: State,
diff --git a/crates/udp/src/workers/socket/storage.rs b/crates/udp/src/workers/socket/storage.rs
index 5ded3ac..84c11a7 100644
--- a/crates/udp/src/workers/socket/storage.rs
+++ b/crates/udp/src/workers/socket/storage.rs
@@ -44,7 +44,7 @@ impl PendingScrapeResponseSlab {
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
- .entry(SwarmWorkerIndex::from_info_hash(&config, info_hash))
+ .entry(SwarmWorkerIndex::from_info_hash(config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
slab_key,
info_hashes: BTreeMap::new(),
@@ -130,9 +130,10 @@ mod tests {
return TestResult::discard();
}
- let mut config = Config::default();
-
- config.swarm_workers = swarm_workers as usize;
+ let config = Config {
+ swarm_workers: swarm_workers as usize,
+ ..Default::default()
+ };
let valid_until = ValidUntil::new(ServerStartInstant::new(), 1);
diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs
index 4ddf875..43c78c9 100644
--- a/crates/udp/src/workers/socket/uring/mod.rs
+++ b/crates/udp/src/workers/socket/uring/mod.rs
@@ -2,7 +2,6 @@ mod buf_ring;
mod recv_helper;
mod send_buffers;
-use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::net::UdpSocket;
@@ -97,6 +96,7 @@ pub struct SocketWorker {
}
impl SocketWorker {
+ #[allow(clippy::too_many_arguments)]
pub fn run(
_sentinel: PanicSentinel,
shared_state: State,
@@ -137,7 +137,7 @@ impl SocketWorker {
.build()
.unwrap();
- let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap());
+ let recv_sqe = recv_helper.create_entry(buf_ring.bgid());
// This timeout enables regular updates of pending_scrape_valid_until
// and wakes the main loop to send any pending responses in the case
@@ -210,15 +210,14 @@ impl SocketWorker {
// Enqueue local responses
for _ in 0..sq_space {
if let Some((response, addr)) = self.local_responses.pop_front() {
- match self.send_buffers.prepare_entry(response.into(), addr) {
+ match self.send_buffers.prepare_entry(response, addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers(response)) => {
- self.local_responses
- .push_front((response.into_owned(), addr));
+ self.local_responses.push_front((response, addr));
break;
}
@@ -233,40 +232,32 @@ impl SocketWorker {
// Enqueue swarm worker responses
for _ in 0..(sq_space - num_send_added) {
- let recv_ref = if let Ok(recv_ref) = self.response_receiver.try_recv_ref() {
- recv_ref
+ let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() {
+ r
} else {
break;
};
- let response = match recv_ref.kind {
- ConnectedResponseKind::AnnounceIpv4 => {
- CowResponse::AnnounceIpv4(Cow::Borrowed(&recv_ref.announce_ipv4))
- }
- ConnectedResponseKind::AnnounceIpv6 => {
- CowResponse::AnnounceIpv6(Cow::Borrowed(&recv_ref.announce_ipv6))
- }
- ConnectedResponseKind::Scrape => {
- if let Some(response) = self
- .pending_scrape_responses
- .add_and_get_finished(&recv_ref.scrape)
- {
- CowResponse::Scrape(Cow::Owned(response))
+ let response = match response {
+ ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r),
+ ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r),
+ ConnectedResponse::Scrape(r) => {
+ if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) {
+ Response::Scrape(r)
} else {
continue;
}
}
};
- match self.send_buffers.prepare_entry(response, recv_ref.addr) {
+ match self.send_buffers.prepare_entry(response, addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers(response)) => {
- self.local_responses
- .push_back((response.into_owned(), recv_ref.addr));
+ self.local_responses.push_back((response, addr));
break;
}
@@ -481,11 +472,11 @@ impl SocketWorker {
let worker_index =
SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash);
- if let Err(_) = self.request_sender.try_send_to(
- worker_index,
- ConnectedRequest::Announce(request),
- src,
- ) {
+ if self
+ .request_sender
+ .try_send_to(worker_index, ConnectedRequest::Announce(request), src)
+ .is_err()
+ {
::log::warn!("request sender full, dropping request");
}
} else {
@@ -510,11 +501,11 @@ impl SocketWorker {
);
for (swarm_worker_index, request) in split_requests {
- if let Err(_) = self.request_sender.try_send_to(
- swarm_worker_index,
- ConnectedRequest::Scrape(request),
- src,
- ) {
+ if self
+ .request_sender
+ .try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src)
+ .is_err()
+ {
::log::warn!("request sender full, dropping request");
}
}
diff --git a/crates/udp/src/workers/socket/uring/recv_helper.rs b/crates/udp/src/workers/socket/uring/recv_helper.rs
index ff6cdde..4a485f6 100644
--- a/crates/udp/src/workers/socket/uring/recv_helper.rs
+++ b/crates/udp/src/workers/socket/uring/recv_helper.rs
@@ -11,6 +11,7 @@ use crate::config::Config;
use super::{SOCKET_IDENTIFIER, USER_DATA_RECV};
+#[allow(clippy::enum_variant_names)]
pub enum Error {
RecvMsgParseError,
RecvMsgTruncated,
diff --git a/crates/udp/src/workers/socket/uring/send_buffers.rs b/crates/udp/src/workers/socket/uring/send_buffers.rs
index b62ca90..458d96f 100644
--- a/crates/udp/src/workers/socket/uring/send_buffers.rs
+++ b/crates/udp/src/workers/socket/uring/send_buffers.rs
@@ -6,14 +6,15 @@ use std::{
};
use aquatic_common::CanonicalSocketAddr;
+use aquatic_udp_protocol::Response;
use io_uring::opcode::SendMsg;
-use crate::{common::CowResponse, config::Config};
+use crate::config::Config;
use super::{RESPONSE_BUF_LEN, SOCKET_IDENTIFIER};
-pub enum Error<'a> {
- NoBuffers(CowResponse<'a>),
+pub enum Error {
+ NoBuffers(Response),
SerializationFailed(std::io::Error),
}
@@ -57,11 +58,11 @@ impl SendBuffers {
self.likely_next_free_index = 0;
}
- pub fn prepare_entry<'a>(
+ pub fn prepare_entry(
&mut self,
- response: CowResponse<'a>,
+ response: Response,
addr: CanonicalSocketAddr,
- ) -> Result> {
+ ) -> Result {
let index = if let Some(index) = self.next_free_index() {
index
} else {
@@ -163,7 +164,7 @@ impl SendBuffer {
fn prepare_entry(
&mut self,
- response: CowResponse,
+ response: Response,
addr: CanonicalSocketAddr,
socket_is_ipv4: bool,
metadata: &mut SendBufferMetadata,
@@ -237,12 +238,12 @@ pub enum ResponseType {
}
impl ResponseType {
- fn from_response(response: &CowResponse) -> Self {
+ fn from_response(response: &Response) -> Self {
match response {
- CowResponse::Connect(_) => Self::Connect,
- CowResponse::AnnounceIpv4(_) | CowResponse::AnnounceIpv6(_) => Self::Announce,
- CowResponse::Scrape(_) => Self::Scrape,
- CowResponse::Error(_) => Self::Error,
+ Response::Connect(_) => Self::Connect,
+ Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => Self::Announce,
+ Response::Scrape(_) => Self::Scrape,
+ Response::Error(_) => Self::Error,
}
}
}
diff --git a/crates/udp/src/workers/socket/validator.rs b/crates/udp/src/workers/socket/validator.rs
index 4b5fe56..c68d1ef 100644
--- a/crates/udp/src/workers/socket/validator.rs
+++ b/crates/udp/src/workers/socket/validator.rs
@@ -59,8 +59,8 @@ impl ConnectionValidator {
let mut connection_id_bytes = [0u8; 8];
- (&mut connection_id_bytes[..4]).copy_from_slice(&elapsed);
- (&mut connection_id_bytes[4..]).copy_from_slice(&hash);
+ connection_id_bytes[..4].copy_from_slice(&elapsed);
+ connection_id_bytes[4..].copy_from_slice(&hash);
ConnectionId::new(i64::from_ne_bytes(connection_id_bytes))
}
@@ -78,7 +78,7 @@ impl ConnectionValidator {
return false;
}
- let tracker_elapsed = u64::from(self.start_time.elapsed().as_secs());
+ let tracker_elapsed = self.start_time.elapsed().as_secs();
let client_elapsed = u64::from(u32::from_ne_bytes(elapsed));
let client_expiration_time = client_elapsed + self.max_connection_age;
diff --git a/crates/udp/src/workers/statistics/collector.rs b/crates/udp/src/workers/statistics/collector.rs
index 33c0a4e..820e0b5 100644
--- a/crates/udp/src/workers/statistics/collector.rs
+++ b/crates/udp/src/workers/statistics/collector.rs
@@ -9,6 +9,18 @@ use serde::Serialize;
use crate::common::Statistics;
use crate::config::Config;
+#[cfg(feature = "prometheus")]
+macro_rules! set_peer_histogram_gauge {
+ ($ip_version:ident, $data:expr, $type_label:expr) => {
+ ::metrics::gauge!(
+ "aquatic_peers_per_torrent",
+ "type" => $type_label,
+ "ip_version" => $ip_version.clone(),
+ )
+ .set($data as f64);
+ };
+}
+
pub struct StatisticsCollector {
shared: Arc,
last_update: Instant,
@@ -79,59 +91,65 @@ impl StatisticsCollector {
if config.statistics.run_prometheus_endpoint {
::metrics::counter!(
"aquatic_requests_total",
- requests_received.try_into().unwrap(),
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(requests_received.try_into().unwrap());
+
::metrics::counter!(
"aquatic_responses_total",
- responses_sent_connect.try_into().unwrap(),
"type" => "connect",
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(responses_sent_connect.try_into().unwrap());
+
::metrics::counter!(
"aquatic_responses_total",
- responses_sent_announce.try_into().unwrap(),
"type" => "announce",
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(responses_sent_announce.try_into().unwrap());
+
::metrics::counter!(
"aquatic_responses_total",
- responses_sent_scrape.try_into().unwrap(),
"type" => "scrape",
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(responses_sent_scrape.try_into().unwrap());
+
::metrics::counter!(
"aquatic_responses_total",
- responses_sent_error.try_into().unwrap(),
"type" => "error",
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(responses_sent_error.try_into().unwrap());
+
::metrics::counter!(
"aquatic_rx_bytes",
- bytes_received.try_into().unwrap(),
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(bytes_received.try_into().unwrap());
+
::metrics::counter!(
"aquatic_tx_bytes",
- bytes_sent.try_into().unwrap(),
"ip_version" => self.ip_version.clone(),
- );
+ )
+ .increment(bytes_sent.try_into().unwrap());
for (worker_index, n) in num_torrents_by_worker.iter().copied().enumerate() {
::metrics::gauge!(
"aquatic_torrents",
- n as f64,
"ip_version" => self.ip_version.clone(),
"worker_index" => worker_index.to_string(),
- );
+ )
+ .set(n as f64);
}
for (worker_index, n) in num_peers_by_worker.iter().copied().enumerate() {
::metrics::gauge!(
"aquatic_peers",
- n as f64,
"ip_version" => self.ip_version.clone(),
"worker_index" => worker_index.to_string(),
- );
+ )
+ .set(n as f64);
}
if config.statistics.torrent_peer_histograms {
@@ -236,83 +254,18 @@ impl PeerHistogramStatistics {
#[cfg(feature = "prometheus")]
fn update_metrics(&self, ip_version: String) {
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.min as f64,
- "type" => "max",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p10 as f64,
- "type" => "p10",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p20 as f64,
- "type" => "p20",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p30 as f64,
- "type" => "p30",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p40 as f64,
- "type" => "p40",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p50 as f64,
- "type" => "p50",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p60 as f64,
- "type" => "p60",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p70 as f64,
- "type" => "p70",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p80 as f64,
- "type" => "p80",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p90 as f64,
- "type" => "p90",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p99 as f64,
- "type" => "p99",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.p999 as f64,
- "type" => "p99.9",
- "ip_version" => ip_version.clone(),
- );
- ::metrics::gauge!(
- "aquatic_peers_per_torrent",
- self.max as f64,
- "type" => "max",
- "ip_version" => ip_version.clone(),
- );
+ set_peer_histogram_gauge!(ip_version, self.min, "min");
+ set_peer_histogram_gauge!(ip_version, self.p10, "p10");
+ set_peer_histogram_gauge!(ip_version, self.p20, "p20");
+ set_peer_histogram_gauge!(ip_version, self.p30, "p30");
+ set_peer_histogram_gauge!(ip_version, self.p40, "p40");
+ set_peer_histogram_gauge!(ip_version, self.p50, "p50");
+ set_peer_histogram_gauge!(ip_version, self.p60, "p60");
+ set_peer_histogram_gauge!(ip_version, self.p70, "p70");
+ set_peer_histogram_gauge!(ip_version, self.p80, "p80");
+ set_peer_histogram_gauge!(ip_version, self.p90, "p90");
+ set_peer_histogram_gauge!(ip_version, self.p99, "p99");
+ set_peer_histogram_gauge!(ip_version, self.p999, "p999");
+ set_peer_histogram_gauge!(ip_version, self.max, "max");
}
}
diff --git a/crates/udp/src/workers/statistics/mod.rs b/crates/udp/src/workers/statistics/mod.rs
index c950850..e9a8565 100644
--- a/crates/udp/src/workers/statistics/mod.rs
+++ b/crates/udp/src/workers/statistics/mod.rs
@@ -152,9 +152,9 @@ pub fn run_statistics_worker(
for (prefix, count) in prefixes {
::metrics::gauge!(
"aquatic_peer_id_prefixes",
- count as f64,
"prefix_hex" => prefix.to_string(),
- );
+ )
+ .set(count as f64);
}
}
@@ -169,9 +169,9 @@ pub fn run_statistics_worker(
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_peer_clients",
- count as f64,
"client" => client.to_string(),
- );
+ )
+ .set(count as f64);
}
}
diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs
index ea551f3..d989477 100644
--- a/crates/udp/src/workers/swarm/mod.rs
+++ b/crates/udp/src/workers/swarm/mod.rs
@@ -17,136 +17,128 @@ use crate::config::Config;
use storage::TorrentMaps;
-pub fn run_swarm_worker(
- _sentinel: PanicSentinel,
- config: Config,
- state: State,
- server_start_instant: ServerStartInstant,
- request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
- mut response_sender: ConnectedResponseSender,
- statistics_sender: Sender,
- worker_index: SwarmWorkerIndex,
-) {
- let mut torrents = TorrentMaps::default();
- let mut rng = SmallRng::from_entropy();
+pub struct SwarmWorker {
+ pub _sentinel: PanicSentinel,
+ pub config: Config,
+ pub state: State,
+ pub server_start_instant: ServerStartInstant,
+ pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
+ pub response_sender: ConnectedResponseSender,
+ pub statistics_sender: Sender,
+ pub worker_index: SwarmWorkerIndex,
+}
- let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms);
- let mut peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
+impl SwarmWorker {
+ pub fn run(&mut self) {
+ let mut torrents = TorrentMaps::default();
+ let mut rng = SmallRng::from_entropy();
- let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
- let statistics_update_interval = Duration::from_secs(config.statistics.interval);
+ let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms);
+ let mut peer_valid_until =
+ ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age);
- let mut last_cleaning = Instant::now();
- let mut last_statistics_update = Instant::now();
+ let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval);
+ let statistics_update_interval = Duration::from_secs(self.config.statistics.interval);
- let mut iter_counter = 0usize;
+ let mut last_cleaning = Instant::now();
+ let mut last_statistics_update = Instant::now();
- loop {
- if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
- // It is OK to block here as long as we don't also do blocking
- // sends in socket workers (doing both could cause a deadlock)
- match (request, src.get().ip()) {
- (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
- // It doesn't matter which socket worker receives announce responses
- let mut send_ref = response_sender
- .send_ref_to_any()
- .expect("swarm response channel is closed");
+ let mut iter_counter = 0usize;
- send_ref.addr = src;
- send_ref.kind = ConnectedResponseKind::AnnounceIpv4;
+ loop {
+ if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) {
+ // It is OK to block here as long as we don't also do blocking
+ // sends in socket workers (doing both could cause a deadlock)
+ match (request, src.get().ip()) {
+ (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
+ let response = torrents
+ .ipv4
+ .0
+ .entry(request.info_hash)
+ .or_default()
+ .announce(
+ &self.config,
+ &self.statistics_sender,
+ &mut rng,
+ &request,
+ ip.into(),
+ peer_valid_until,
+ );
- torrents
- .ipv4
- .0
- .entry(request.info_hash)
- .or_default()
- .announce(
- &config,
- &statistics_sender,
- &mut rng,
- &request,
- ip.into(),
- peer_valid_until,
- &mut send_ref.announce_ipv4,
- );
- }
- (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
- // It doesn't matter which socket worker receives announce responses
- let mut send_ref = response_sender
- .send_ref_to_any()
- .expect("swarm response channel is closed");
+ // It doesn't matter which socket worker receives announce responses
+ self.response_sender
+ .send_to_any(src, ConnectedResponse::AnnounceIpv4(response))
+ .expect("swarm response channel is closed");
+ }
+ (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
+ let response = torrents
+ .ipv6
+ .0
+ .entry(request.info_hash)
+ .or_default()
+ .announce(
+ &self.config,
+ &self.statistics_sender,
+ &mut rng,
+ &request,
+ ip.into(),
+ peer_valid_until,
+ );
- send_ref.addr = src;
- send_ref.kind = ConnectedResponseKind::AnnounceIpv6;
+ // It doesn't matter which socket worker receives announce responses
+ self.response_sender
+ .send_to_any(src, ConnectedResponse::AnnounceIpv6(response))
+ .expect("swarm response channel is closed");
+ }
+ (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
+ let response = torrents.ipv4.scrape(request);
- torrents
- .ipv6
- .0
- .entry(request.info_hash)
- .or_default()
- .announce(
- &config,
- &statistics_sender,
- &mut rng,
- &request,
- ip.into(),
- peer_valid_until,
- &mut send_ref.announce_ipv6,
- );
- }
- (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
- let mut send_ref = response_sender
- .send_ref_to(sender_index)
- .expect("swarm response channel is closed");
+ self.response_sender
+ .send_to(sender_index, src, ConnectedResponse::Scrape(response))
+ .expect("swarm response channel is closed");
+ }
+ (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
+ let response = torrents.ipv6.scrape(request);
- send_ref.addr = src;
- send_ref.kind = ConnectedResponseKind::Scrape;
-
- torrents.ipv4.scrape(request, &mut send_ref.scrape);
- }
- (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
- let mut send_ref = response_sender
- .send_ref_to(sender_index)
- .expect("swarm response channel is closed");
-
- send_ref.addr = src;
- send_ref.kind = ConnectedResponseKind::Scrape;
-
- torrents.ipv6.scrape(request, &mut send_ref.scrape);
- }
- };
- }
-
- // Run periodic tasks
- if iter_counter % 128 == 0 {
- let now = Instant::now();
-
- peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
-
- if now > last_cleaning + cleaning_interval {
- torrents.clean_and_update_statistics(
- &config,
- &state,
- &statistics_sender,
- &state.access_list,
- server_start_instant,
- worker_index,
- );
-
- last_cleaning = now;
+ self.response_sender
+ .send_to(sender_index, src, ConnectedResponse::Scrape(response))
+ .expect("swarm response channel is closed");
+ }
+ };
}
- if config.statistics.active()
- && now > last_statistics_update + statistics_update_interval
- {
- state.statistics_ipv4.torrents[worker_index.0]
- .store(torrents.ipv4.num_torrents(), Ordering::Release);
- state.statistics_ipv6.torrents[worker_index.0]
- .store(torrents.ipv6.num_torrents(), Ordering::Release);
- last_statistics_update = now;
+ // Run periodic tasks
+ if iter_counter % 128 == 0 {
+ let now = Instant::now();
+
+ peer_valid_until =
+ ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age);
+
+ if now > last_cleaning + cleaning_interval {
+ torrents.clean_and_update_statistics(
+ &self.config,
+ &self.state,
+ &self.statistics_sender,
+ &self.state.access_list,
+ self.server_start_instant,
+ self.worker_index,
+ );
+
+ last_cleaning = now;
+ }
+ if self.config.statistics.active()
+ && now > last_statistics_update + statistics_update_interval
+ {
+ self.state.statistics_ipv4.torrents[self.worker_index.0]
+ .store(torrents.ipv4.num_torrents(), Ordering::Release);
+ self.state.statistics_ipv6.torrents[self.worker_index.0]
+ .store(torrents.ipv6.num_torrents(), Ordering::Release);
+
+ last_statistics_update = now;
+ }
}
- }
- iter_counter = iter_counter.wrapping_add(1);
+ iter_counter = iter_counter.wrapping_add(1);
+ }
}
}
diff --git a/crates/udp/src/workers/swarm/storage.rs b/crates/udp/src/workers/swarm/storage.rs
index 4feb80d..0c1dcc2 100644
--- a/crates/udp/src/workers/swarm/storage.rs
+++ b/crates/udp/src/workers/swarm/storage.rs
@@ -10,6 +10,7 @@ use aquatic_common::{
};
use aquatic_udp_protocol::*;
+use arrayvec::ArrayVec;
use crossbeam_channel::Sender;
use hdrhistogram::Histogram;
use rand::prelude::SmallRng;
@@ -18,6 +19,8 @@ use rand::Rng;
use crate::common::*;
use crate::config::Config;
+const SMALL_PEER_MAP_CAPACITY: usize = 2;
+
pub struct TorrentMaps {
pub ipv4: TorrentMap,
pub ipv6: TorrentMap,
@@ -76,20 +79,29 @@ impl TorrentMaps {
pub struct TorrentMap(pub IndexMap>);
impl TorrentMap {
- pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) {
- response.slab_key = request.slab_key;
+ pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse {
+ let torrent_stats = request
+ .info_hashes
+ .into_iter()
+ .map(|(i, info_hash)| {
+ let stats = self
+ .0
+ .get(&info_hash)
+ .map(|torrent_data| torrent_data.scrape_statistics())
+ .unwrap_or_else(|| TorrentScrapeStatistics {
+ seeders: NumberOfPeers::new(0),
+ leechers: NumberOfPeers::new(0),
+ completed: NumberOfDownloads::new(0),
+ });
- let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| {
- let stats = self
- .0
- .get(&info_hash)
- .map(|torrent_data| torrent_data.scrape_statistics())
- .unwrap_or_else(|| create_torrent_scrape_statistics(0, 0));
+ (i, stats)
+ })
+ .collect();
- (i, stats)
- });
-
- response.torrent_stats.extend(torrent_stats);
+ PendingScrapeResponse {
+ slab_key: request.slab_key,
+ torrent_stats,
+ }
}
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
fn clean_and_get_statistics(
@@ -100,7 +112,7 @@ impl TorrentMap {
access_list_mode: AccessListMode,
now: SecondsSinceServerStart,
) -> (usize, Option>) {
- let mut num_peers = 0;
+ let mut total_num_peers = 0;
let mut opt_histogram: Option> = if config.statistics.torrent_peer_histograms
{
@@ -124,17 +136,27 @@ impl TorrentMap {
return false;
}
- torrent.clean(config, statistics_sender, now);
+ let num_peers = match torrent {
+ TorrentData::Small(peer_map) => {
+ peer_map.clean_and_get_num_peers(config, statistics_sender, now)
+ }
+ TorrentData::Large(peer_map) => {
+ let num_peers =
+ peer_map.clean_and_get_num_peers(config, statistics_sender, now);
- num_peers += torrent.peers.len();
+ if let Some(peer_map) = peer_map.try_shrink() {
+ *torrent = TorrentData::Small(peer_map);
+ }
+
+ num_peers
+ }
+ };
+
+ total_num_peers += num_peers;
match opt_histogram {
- Some(ref mut histogram) if torrent.peers.len() != 0 => {
- let n = torrent
- .peers
- .len()
- .try_into()
- .expect("Couldn't fit usize into u64");
+ Some(ref mut histogram) if num_peers > 0 => {
+ let n = num_peers.try_into().expect("Couldn't fit usize into u64");
if let Err(err) = histogram.record(n) {
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
@@ -143,12 +165,12 @@ impl TorrentMap {
_ => (),
}
- !torrent.peers.is_empty()
+ num_peers > 0
});
self.0.shrink_to_fit();
- (num_peers, opt_histogram)
+ (total_num_peers, opt_histogram)
}
pub fn num_torrents(&self) -> usize {
@@ -156,9 +178,9 @@ impl TorrentMap {
}
}
-pub struct TorrentData {
- peers: IndexMap, Peer>,
- num_seeders: usize,
+pub enum TorrentData {
+ Small(SmallPeerMap),
+ Large(LargePeerMap),
}
impl TorrentData {
@@ -170,8 +192,7 @@ impl TorrentData {
request: &AnnounceRequest,
ip_address: I,
valid_until: ValidUntil,
- response: &mut AnnounceResponse,
- ) {
+ ) -> AnnounceResponse {
let max_num_peers_to_take: usize = if request.peers_wanted.0.get() <= 0 {
config.protocol.max_response_peers
} else {
@@ -189,60 +210,77 @@ impl TorrentData {
port: request.port,
};
- let opt_removed_peer = self.peers.remove(&peer_map_key);
-
- if let Some(Peer {
- is_seeder: true, ..
- }) = opt_removed_peer
- {
- self.num_seeders -= 1;
- }
-
// Create the response before inserting the peer. This means that we
// don't have to filter it out from the response peers, and that the
// reported number of seeders/leechers will not include it
+ let (response, opt_removed_peer) = match self {
+ Self::Small(peer_map) => {
+ let opt_removed_peer = peer_map.remove(&peer_map_key);
- response.fixed = AnnounceResponseFixedData {
- transaction_id: request.transaction_id,
- announce_interval: AnnounceInterval::new(config.protocol.peer_announce_interval),
- leechers: NumberOfPeers::new(self.num_leechers().try_into().unwrap_or(i32::MAX)),
- seeders: NumberOfPeers::new(self.num_seeders().try_into().unwrap_or(i32::MAX)),
+ let (seeders, leechers) = peer_map.num_seeders_leechers();
+
+ let response = AnnounceResponse {
+ fixed: AnnounceResponseFixedData {
+ transaction_id: request.transaction_id,
+ announce_interval: AnnounceInterval::new(
+ config.protocol.peer_announce_interval,
+ ),
+ leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
+ seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
+ },
+ peers: peer_map.extract_response_peers(max_num_peers_to_take),
+ };
+
+ // Convert peer map to large variant if it is full and
+ // announcing peer is not stopped and will therefore be
+ // inserted
+ if peer_map.is_full() && status != PeerStatus::Stopped {
+ *self = Self::Large(peer_map.to_large());
+ }
+
+ (response, opt_removed_peer)
+ }
+ Self::Large(peer_map) => {
+ let opt_removed_peer = peer_map.remove_peer(&peer_map_key);
+
+ let (seeders, leechers) = peer_map.num_seeders_leechers();
+
+ let response = AnnounceResponse {
+ fixed: AnnounceResponseFixedData {
+ transaction_id: request.transaction_id,
+ announce_interval: AnnounceInterval::new(
+ config.protocol.peer_announce_interval,
+ ),
+ leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
+ seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
+ },
+ peers: peer_map.extract_response_peers(rng, max_num_peers_to_take),
+ };
+
+ // Try shrinking the map if announcing peer is stopped and
+ // will therefore not be inserted
+ if status == PeerStatus::Stopped {
+ if let Some(peer_map) = peer_map.try_shrink() {
+ *self = Self::Small(peer_map);
+ }
+ }
+
+ (response, opt_removed_peer)
+ }
};
- extract_response_peers(
- rng,
- &self.peers,
- max_num_peers_to_take,
- |k, _| *k,
- &mut response.peers,
- );
-
match status {
- PeerStatus::Leeching => {
+ PeerStatus::Leeching | PeerStatus::Seeding => {
let peer = Peer {
peer_id: request.peer_id,
- is_seeder: false,
+ is_seeder: status == PeerStatus::Seeding,
valid_until,
};
- self.peers.insert(peer_map_key, peer);
-
- if config.statistics.peer_clients && opt_removed_peer.is_none() {
- statistics_sender
- .try_send(StatisticsMessage::PeerAdded(request.peer_id))
- .expect("statistics channel should be unbounded");
+ match self {
+ Self::Small(peer_map) => peer_map.insert(peer_map_key, peer),
+ Self::Large(peer_map) => peer_map.insert(peer_map_key, peer),
}
- }
- PeerStatus::Seeding => {
- let peer = Peer {
- peer_id: request.peer_id,
- is_seeder: true,
- valid_until,
- };
-
- self.peers.insert(peer_map_key, peer);
-
- self.num_seeders += 1;
if config.statistics.peer_clients && opt_removed_peer.is_none() {
statistics_sender
@@ -258,30 +296,184 @@ impl TorrentData {
}
}
};
- }
- pub fn num_leechers(&self) -> usize {
- self.peers.len() - self.num_seeders
- }
-
- pub fn num_seeders(&self) -> usize {
- self.num_seeders
+ response
}
pub fn scrape_statistics(&self) -> TorrentScrapeStatistics {
- create_torrent_scrape_statistics(
- self.num_seeders.try_into().unwrap_or(i32::MAX),
- self.num_leechers().try_into().unwrap_or(i32::MAX),
- )
+ let (seeders, leechers) = match self {
+ Self::Small(peer_map) => peer_map.num_seeders_leechers(),
+ Self::Large(peer_map) => peer_map.num_seeders_leechers(),
+ };
+
+ TorrentScrapeStatistics {
+ seeders: NumberOfPeers::new(seeders.try_into().unwrap_or(i32::MAX)),
+ leechers: NumberOfPeers::new(leechers.try_into().unwrap_or(i32::MAX)),
+ completed: NumberOfDownloads::new(0),
+ }
+ }
+}
+
+impl Default for TorrentData {
+ fn default() -> Self {
+ Self::Small(SmallPeerMap(ArrayVec::default()))
+ }
+}
+
+/// Store torrents with up to two peers without an extra heap allocation
+///
+/// On public open trackers, this is likely to be the majority of torrents.
+#[derive(Default, Debug)]
+pub struct SmallPeerMap(ArrayVec<(ResponsePeer, Peer), SMALL_PEER_MAP_CAPACITY>);
+
+impl SmallPeerMap {
+ fn is_full(&self) -> bool {
+ self.0.is_full()
}
- /// Remove inactive peers and reclaim space
- fn clean(
+ fn num_seeders_leechers(&self) -> (usize, usize) {
+ let seeders = self.0.iter().filter(|(_, p)| p.is_seeder).count();
+ let leechers = self.0.len() - seeders;
+
+ (seeders, leechers)
+ }
+
+ fn insert(&mut self, key: ResponsePeer, peer: Peer) {
+ self.0.push((key, peer));
+ }
+
+ fn remove(&mut self, key: &ResponsePeer) -> Option {
+ for (i, (k, _)) in self.0.iter().enumerate() {
+ if k == key {
+ return Some(self.0.remove(i).1);
+ }
+ }
+
+ None
+ }
+
+ fn extract_response_peers(&self, max_num_peers_to_take: usize) -> Vec> {
+ Vec::from_iter(self.0.iter().take(max_num_peers_to_take).map(|(k, _)| *k))
+ }
+
+ fn clean_and_get_num_peers(
&mut self,
config: &Config,
statistics_sender: &Sender,
now: SecondsSinceServerStart,
- ) {
+ ) -> usize {
+ self.0.retain(|(_, peer)| {
+ let keep = peer.valid_until.valid(now);
+
+ if !keep
+ && config.statistics.peer_clients
+ && statistics_sender
+ .try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
+ .is_err()
+ {
+ // Should never happen in practice
+ ::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
+ }
+
+ keep
+ });
+
+ self.0.len()
+ }
+
+ fn to_large(&self) -> LargePeerMap {
+ let (num_seeders, _) = self.num_seeders_leechers();
+ let peers = self.0.iter().copied().collect();
+
+ LargePeerMap { peers, num_seeders }
+ }
+}
+
+#[derive(Default)]
+pub struct LargePeerMap {
+ peers: IndexMap, Peer>,
+ num_seeders: usize,
+}
+
+impl LargePeerMap {
+ fn num_seeders_leechers(&self) -> (usize, usize) {
+ (self.num_seeders, self.peers.len() - self.num_seeders)
+ }
+
+ fn insert(&mut self, key: ResponsePeer, peer: Peer) {
+ if peer.is_seeder {
+ self.num_seeders += 1;
+ }
+
+ self.peers.insert(key, peer);
+ }
+
+ fn remove_peer(&mut self, key: &ResponsePeer) -> Option {
+ let opt_removed_peer = self.peers.remove(key);
+
+ if let Some(Peer {
+ is_seeder: true, ..
+ }) = opt_removed_peer
+ {
+ self.num_seeders -= 1;
+ }
+
+ opt_removed_peer
+ }
+
+ /// Extract response peers
+ ///
+ /// If there are more peers in map than `max_num_peers_to_take`, do a random
+ /// selection of peers from first and second halves of map in order to avoid
+ /// returning too homogeneous peers.
+ ///
+ /// Does NOT filter out announcing peer.
+ pub fn extract_response_peers(
+ &self,
+ rng: &mut impl Rng,
+ max_num_peers_to_take: usize,
+ ) -> Vec> {
+ if self.peers.len() <= max_num_peers_to_take {
+ self.peers.keys().copied().collect()
+ } else {
+ let middle_index = self.peers.len() / 2;
+ let num_to_take_per_half = max_num_peers_to_take / 2;
+
+ let offset_half_one = {
+ let from = 0;
+ let to = usize::max(1, middle_index - num_to_take_per_half);
+
+ rng.gen_range(from..to)
+ };
+ let offset_half_two = {
+ let from = middle_index;
+ let to = usize::max(middle_index + 1, self.peers.len() - num_to_take_per_half);
+
+ rng.gen_range(from..to)
+ };
+
+ let end_half_one = offset_half_one + num_to_take_per_half;
+ let end_half_two = offset_half_two + num_to_take_per_half;
+
+ let mut peers = Vec::with_capacity(max_num_peers_to_take);
+
+ if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) {
+ peers.extend(slice.keys());
+ }
+ if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) {
+ peers.extend(slice.keys());
+ }
+
+ peers
+ }
+ }
+
+ fn clean_and_get_num_peers(
+ &mut self,
+ config: &Config,
+ statistics_sender: &Sender,
+ now: SecondsSinceServerStart,
+ ) -> usize {
self.peers.retain(|_, peer| {
let keep = peer.valid_until.valid(now);
@@ -289,13 +481,13 @@ impl TorrentData {
if peer.is_seeder {
self.num_seeders -= 1;
}
- if config.statistics.peer_clients {
- if let Err(_) =
- statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
- {
- // Should never happen in practice
- ::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
- }
+ if config.statistics.peer_clients
+ && statistics_sender
+ .try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
+ .is_err()
+ {
+ // Should never happen in practice
+ ::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
}
}
@@ -305,79 +497,22 @@ impl TorrentData {
if !self.peers.is_empty() {
self.peers.shrink_to_fit();
}
+
+ self.peers.len()
+ }
+
+ fn try_shrink(&mut self) -> Option> {
+ (self.peers.len() <= SMALL_PEER_MAP_CAPACITY).then(|| {
+ SmallPeerMap(ArrayVec::from_iter(
+ self.peers.iter().map(|(k, v)| (*k, *v)),
+ ))
+ })
}
}
-impl Default for TorrentData {
- fn default() -> Self {
- Self {
- peers: Default::default(),
- num_seeders: 0,
- }
- }
-}
-
-#[derive(Clone, Debug)]
+#[derive(Clone, Copy, Debug)]
struct Peer {
peer_id: PeerId,
is_seeder: bool,
valid_until: ValidUntil,
}
-
-/// Extract response peers
-///
-/// If there are more peers in map than `max_num_peers_to_take`, do a random
-/// selection of peers from first and second halves of map in order to avoid
-/// returning too homogeneous peers.
-///
-/// Does NOT filter out announcing peer.
-#[inline]
-pub fn extract_response_peers(
- rng: &mut impl Rng,
- peer_map: &IndexMap,
- max_num_peers_to_take: usize,
- peer_conversion_function: F,
- peers: &mut Vec,
-) where
- K: Eq + ::std::hash::Hash,
- F: Fn(&K, &V) -> R,
-{
- if peer_map.len() <= max_num_peers_to_take {
- peers.extend(peer_map.iter().map(|(k, v)| peer_conversion_function(k, v)));
- } else {
- let middle_index = peer_map.len() / 2;
- let num_to_take_per_half = max_num_peers_to_take / 2;
-
- let offset_half_one = {
- let from = 0;
- let to = usize::max(1, middle_index - num_to_take_per_half);
-
- rng.gen_range(from..to)
- };
- let offset_half_two = {
- let from = middle_index;
- let to = usize::max(middle_index + 1, peer_map.len() - num_to_take_per_half);
-
- rng.gen_range(from..to)
- };
-
- let end_half_one = offset_half_one + num_to_take_per_half;
- let end_half_two = offset_half_two + num_to_take_per_half;
-
- if let Some(slice) = peer_map.get_range(offset_half_one..end_half_one) {
- peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v)));
- }
- if let Some(slice) = peer_map.get_range(offset_half_two..end_half_two) {
- peers.extend(slice.iter().map(|(k, v)| peer_conversion_function(k, v)));
- }
- }
-}
-
-#[inline(always)]
-fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics {
- TorrentScrapeStatistics {
- seeders: NumberOfPeers::new(seeders),
- completed: NumberOfDownloads::new(0), // No implementation planned
- leechers: NumberOfPeers::new(leechers),
- }
-}
diff --git a/crates/udp/tests/common/mod.rs b/crates/udp/tests/common/mod.rs
index 832d027..ee8e365 100644
--- a/crates/udp/tests/common/mod.rs
+++ b/crates/udp/tests/common/mod.rs
@@ -29,7 +29,7 @@ pub fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result ::anyhow::Result<()> {
let ip = if config.server_address.is_ipv6() {
Ipv6Addr::LOCALHOST.into()
+ } else if config.network.multiple_client_ipv4s {
+ Ipv4Addr::new(127, 0, 0, 1 + i).into()
} else {
- if config.network.multiple_client_ipv4s {
- Ipv4Addr::new(127, 0, 0, 1 + i).into()
- } else {
- Ipv4Addr::LOCALHOST.into()
- }
+ Ipv4Addr::LOCALHOST.into()
};
let addr = SocketAddr::new(ip, port);
diff --git a/crates/udp_protocol/src/common.rs b/crates/udp_protocol/src/common.rs
index a0043df..6c54cb2 100644
--- a/crates/udp_protocol/src/common.rs
+++ b/crates/udp_protocol/src/common.rs
@@ -104,15 +104,15 @@ pub struct Ipv4AddrBytes(pub [u8; 4]);
impl Ip for Ipv4AddrBytes {}
-impl Into for Ipv4AddrBytes {
- fn into(self) -> Ipv4Addr {
- Ipv4Addr::from(self.0)
+impl From for Ipv4Addr {
+ fn from(val: Ipv4AddrBytes) -> Self {
+ Ipv4Addr::from(val.0)
}
}
-impl Into for Ipv4Addr {
- fn into(self) -> Ipv4AddrBytes {
- Ipv4AddrBytes(self.octets())
+impl From for Ipv4AddrBytes {
+ fn from(val: Ipv4Addr) -> Self {
+ Ipv4AddrBytes(val.octets())
}
}
@@ -122,15 +122,15 @@ pub struct Ipv6AddrBytes(pub [u8; 16]);
impl Ip for Ipv6AddrBytes {}
-impl Into for Ipv6AddrBytes {
- fn into(self) -> Ipv6Addr {
- Ipv6Addr::from(self.0)
+impl From for Ipv6Addr {
+ fn from(val: Ipv6AddrBytes) -> Self {
+ Ipv6Addr::from(val.0)
}
}
-impl Into for Ipv6Addr {
- fn into(self) -> Ipv6AddrBytes {
- Ipv6AddrBytes(self.octets())
+impl From for Ipv6AddrBytes {
+ fn from(val: Ipv6Addr) -> Self {
+ Ipv6AddrBytes(val.octets())
}
}
diff --git a/crates/udp_protocol/src/response.rs b/crates/udp_protocol/src/response.rs
index 882a928..4e3353f 100644
--- a/crates/udp_protocol/src/response.rs
+++ b/crates/udp_protocol/src/response.rs
@@ -85,7 +85,7 @@ impl Response {
// Error
3 => {
let transaction_id = read_i32_ne(&mut bytes).map(TransactionId)?;
- let message = String::from_utf8_lossy(&bytes).into_owned().into();
+ let message = String::from_utf8_lossy(bytes).into_owned().into();
Ok((ErrorResponse {
transaction_id,
diff --git a/crates/ws/src/common.rs b/crates/ws/src/common.rs
index 3240546..aced74a 100644
--- a/crates/ws/src/common.rs
+++ b/crates/ws/src/common.rs
@@ -57,12 +57,12 @@ pub struct OutMessageMeta {
pub pending_scrape_id: Option,
}
-impl Into for InMessageMeta {
- fn into(self) -> OutMessageMeta {
+impl From for OutMessageMeta {
+ fn from(val: InMessageMeta) -> Self {
OutMessageMeta {
- out_message_consumer_id: self.out_message_consumer_id,
- connection_id: self.connection_id,
- pending_scrape_id: self.pending_scrape_id,
+ out_message_consumer_id: val.out_message_consumer_id,
+ connection_id: val.connection_id,
+ pending_scrape_id: val.pending_scrape_id,
}
}
}
diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs
index 82d93ef..00de5c6 100644
--- a/crates/ws/src/workers/socket/connection.rs
+++ b/crates/ws/src/workers/socket/connection.rs
@@ -41,6 +41,9 @@ use crate::workers::socket::calculate_in_message_consumer_index;
#[cfg(feature = "metrics")]
use crate::workers::socket::{ip_version_to_metrics_str, WORKER_INDEX};
+/// Optional second tuple field is for peer id hex representation
+type PeerClientGauge = (Gauge, Option);
+
pub struct ConnectionRunner {
pub config: Rc,
pub access_list: Arc,
@@ -77,7 +80,7 @@ impl ConnectionRunner {
clean_up_data.before_open();
let config = self.config.clone();
- let connection_id = self.connection_id.clone();
+ let connection_id = self.connection_id;
race(
async {
@@ -283,6 +286,8 @@ impl ConnectionReader {
}
}
+ // Silence RefCell lint due to false positives
+ #[allow(clippy::await_holding_refcell_ref)]
async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> {
#[cfg(feature = "metrics")]
self.total_announce_requests_counter.increment(1);
@@ -485,6 +490,8 @@ struct ConnectionWriter {
}
impl ConnectionWriter {
+ // Silence RefCell lint due to false positives
+ #[allow(clippy::await_holding_refcell_ref)]
async fn run_out_message_loop(&mut self) -> anyhow::Result<()> {
loop {
let (meta, out_message) = self.out_message_receiver.recv().await.ok_or_else(|| {
@@ -590,7 +597,7 @@ impl ConnectionWriter {
struct ConnectionCleanupData {
announced_info_hashes: Rc>>,
ip_version: IpVersion,
- opt_peer_client: Rc)>>>,
+ opt_peer_client: Rc>>,
#[cfg(feature = "metrics")]
active_connections_gauge: Gauge,
}
@@ -608,7 +615,7 @@ impl ConnectionCleanupData {
let mut announced_info_hashes = HashMap::new();
for (info_hash, peer_id) in self.announced_info_hashes.take().into_iter() {
- let consumer_index = calculate_in_message_consumer_index(&config, info_hash);
+ let consumer_index = calculate_in_message_consumer_index(config, info_hash);
announced_info_hashes
.entry(consumer_index)
diff --git a/crates/ws/src/workers/socket/mod.rs b/crates/ws/src/workers/socket/mod.rs
index 03e17c5..040fce7 100644
--- a/crates/ws/src/workers/socket/mod.rs
+++ b/crates/ws/src/workers/socket/mod.rs
@@ -48,6 +48,7 @@ struct ConnectionHandle {
valid_until_after_tls_update: Option,
}
+#[allow(clippy::too_many_arguments)]
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
diff --git a/crates/ws/src/workers/swarm/mod.rs b/crates/ws/src/workers/swarm/mod.rs
index 7788d12..f26c34d 100644
--- a/crates/ws/src/workers/swarm/mod.rs
+++ b/crates/ws/src/workers/swarm/mod.rs
@@ -24,6 +24,7 @@ use self::storage::TorrentMaps;
#[cfg(feature = "metrics")]
thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() }
+#[allow(clippy::too_many_arguments)]
pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
@@ -140,7 +141,7 @@ async fn handle_request_stream(
match in_message {
InMessage::AnnounceRequest(request) => {
torrents.borrow_mut().handle_announce_request(
- &config,
+ config,
&mut rng.borrow_mut(),
&mut out_messages,
server_start_instant,
@@ -150,7 +151,7 @@ async fn handle_request_stream(
}
InMessage::ScrapeRequest(request) => torrents
.borrow_mut()
- .handle_scrape_request(&config, &mut out_messages, meta, request),
+ .handle_scrape_request(config, &mut out_messages, meta, request),
};
for (meta, out_message) in out_messages {
diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs
index 37cf153..d579ad9 100644
--- a/crates/ws/src/workers/swarm/storage.rs
+++ b/crates/ws/src/workers/swarm/storage.rs
@@ -248,7 +248,11 @@ impl TorrentMaps {
regarding_offer_id: offer_id,
};
- if let Some(_) = answer_receiver.expecting_answers.remove(&expecting_answer) {
+ if answer_receiver
+ .expecting_answers
+ .remove(&expecting_answer)
+ .is_some()
+ {
let answer_out_message = AnswerOutMessage {
action: AnnounceAction::Announce,
peer_id: request.peer_id,
@@ -426,13 +430,11 @@ impl TorrentMaps {
#[cfg(feature = "metrics")]
self.peers_gauge_ipv4.decrement(1.0);
}
- } else {
- if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) {
- torrent_data.remove_peer(peer_id);
+ } else if let Some(torrent_data) = self.ipv6.get_mut(&info_hash) {
+ torrent_data.remove_peer(peer_id);
- #[cfg(feature = "metrics")]
- self.peers_gauge_ipv6.decrement(1.0);
- }
+ #[cfg(feature = "metrics")]
+ self.peers_gauge_ipv6.decrement(1.0);
}
}
}
diff --git a/crates/ws_load_test/src/network.rs b/crates/ws_load_test/src/network.rs
index 55da243..a0aa125 100644
--- a/crates/ws_load_test/src/network.rs
+++ b/crates/ws_load_test/src/network.rs
@@ -149,6 +149,19 @@ impl Connection {
}
async fn send_message(&mut self) -> anyhow::Result<()> {
+ let request = self.create_request();
+
+ self.stream.send(request.to_ws_message()).await?;
+
+ self.load_test_state
+ .statistics
+ .requests
+ .fetch_add(1, Ordering::Relaxed);
+
+ Ok(())
+ }
+
+ fn create_request(&mut self) -> InMessage {
let mut rng = self.rng.borrow_mut();
let request = match random_request_type(&self.config, &mut *rng) {
@@ -226,18 +239,9 @@ impl Connection {
}
};
- drop(rng);
-
self.can_send_answer = None;
- self.stream.send(request.to_ws_message()).await?;
-
- self.load_test_state
- .statistics
- .requests
- .fetch_add(1, Ordering::Relaxed);
-
- Ok(())
+ request
}
async fn read_message(&mut self) -> anyhow::Result<()> {
@@ -312,7 +316,7 @@ pub fn random_request_type(config: &Config, rng: &mut impl Rng) -> RequestType {
let items = [RequestType::Announce, RequestType::Scrape];
- let dist = WeightedIndex::new(&weights).expect("random request weighted index");
+ let dist = WeightedIndex::new(weights).expect("random request weighted index");
items[dist.sample(rng)]
}
diff --git a/crates/ws_protocol/src/common.rs b/crates/ws_protocol/src/common.rs
index f465f04..5c0a430 100644
--- a/crates/ws_protocol/src/common.rs
+++ b/crates/ws_protocol/src/common.rs
@@ -156,7 +156,7 @@ mod tests {
assert!(bytes.len() == 20);
- arr.copy_from_slice(&bytes[..]);
+ arr.copy_from_slice(bytes);
InfoHash(arr)
}
diff --git a/crates/ws_protocol/src/lib.rs b/crates/ws_protocol/src/lib.rs
index c61681a..2652e86 100644
--- a/crates/ws_protocol/src/lib.rs
+++ b/crates/ws_protocol/src/lib.rs
@@ -270,7 +270,7 @@ mod tests {
assert!(bytes.len() == 20);
- arr.copy_from_slice(&bytes[..]);
+ arr.copy_from_slice(bytes);
InfoHash(arr)
}
@@ -370,10 +370,6 @@ mod tests {
::simd_json::serde::from_str(&mut json).unwrap()
};
- let success = info_hashes == deserialized;
-
- if !success {}
-
- success
+ info_hashes == deserialized
}
}