diff --git a/.gitignore b/.gitignore index 3de5924..0446378 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ **/criterion/*/new .DS_Store +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 5eaf471..0a8e40a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,8 @@ dependencies = [ "log", "privdrop", "rand", + "rustls 0.20.4", + "rustls-pemfile", "serde", ] @@ -135,15 +137,41 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", - "rustls", + "rustls 0.20.4", "serde", ] +[[package]] +name = "aquatic_http_private" +version = "0.2.0" +dependencies = [ + "anyhow", + "aquatic_cli_helpers", + "aquatic_common", + "aquatic_http_protocol", + "aquatic_toml_config", + "axum", + "dotenv", + "futures-util", + "hex", + "hyper", + "log", + "mimalloc", + "rand", + "rustls 0.20.4", + "serde", + "socket2 0.4.4", + "sqlx", + "tokio", + "tokio-rustls 0.23.3", +] + [[package]] name = "aquatic_http_protocol" version = "0.2.0" dependencies = [ "anyhow", + "axum", "bendy", "criterion", "hex", @@ -279,7 +307,7 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", - "rustls", + "rustls 0.20.4", "rustls-pemfile", "serde", "signal-hook", @@ -307,7 +335,7 @@ dependencies = [ "quickcheck_macros", "rand", "rand_distr", - "rustls", + "rustls 0.20.4", "serde", "serde_json", "tungstenite", @@ -343,6 +371,17 @@ dependencies = [ "nodrop", ] +[[package]] +name = "async-trait" +version = "0.1.53" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-tungstenite" version = "0.17.2" @@ -356,6 +395,15 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "atoi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +dependencies = [ + "num-traits", +] + [[package]] name = "atone" version = "0.3.5" @@ -373,12 +421,65 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "autocfg" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78" +dependencies = [ + "autocfg 1.1.0", +] + [[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5611d4977882c5af1c0f7a34d51b5d87f784f86912bb543986b014ea4995ef93" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "headers", + "http", + "http-body", + "hyper", + "itoa 1.0.1", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95cd109b3e93c9541dcce5b0219dcf89169dcc58c1bebed65082808324258afb" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -400,6 +501,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64ct" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6b4d9b1225d28d360ec6a231d65af1fd99a2a095154c8040689617290569c5c" + [[package]] name = "bendy" version = "0.3.3" @@ -429,6 +536,15 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb" +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.2" @@ -545,6 +661,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "const-oid" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" + [[package]] name = "cpufeatures" version = "0.2.2" @@ -554,6 +676,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" + [[package]] name = "criterion" version = "0.3.5" @@ -631,7 +768,7 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" dependencies = [ - "autocfg", + "autocfg 1.1.0", "cfg-if", "crossbeam-utils", "lazy_static", @@ -659,6 +796,17 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crypto-bigint" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83bd3bb4314701c568e340cd8cf78c975aa0ca79e03d3f6d1677d5b0c9c0c03" +dependencies = [ + "generic-array", + "rand_core", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.3" @@ -691,16 +839,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "der" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4" +dependencies = [ + "const-oid", + "crypto-bigint", +] + +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" dependencies = [ - "block-buffer", + "block-buffer 0.10.2", "crypto-common", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "either" version = "1.6.1" @@ -861,6 +1034,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.11.2", +] + [[package]] name = "futures-io" version = "0.3.21" @@ -900,8 +1084,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01fe9932a224b72b45336d96040aa86386d674a31d0af27d800ea7bc8ca97fe" dependencies = [ "futures-io", - "rustls", - "webpki", + "rustls 0.20.4", + "webpki 0.22.0", ] [[package]] @@ -1053,6 +1237,15 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +[[package]] +name = "hashbrown" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.12.0" @@ -1063,6 +1256,49 @@ dependencies = [ "serde", ] +[[package]] +name = "hashlink" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +dependencies = [ + "hashbrown 0.11.2", +] + +[[package]] +name = "headers" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cff78e5788be1e0ab65b04d306b2ed5092c815ec97ec70f4ebd5aee158aa55d" +dependencies = [ + "base64", + "bitflags 1.3.2", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha-1 0.10.0", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1089,12 +1325,35 @@ dependencies = [ "itoa 1.0.1", ] +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + [[package]] name = "hwloc" version = "0.5.0" @@ -1110,6 +1369,29 @@ dependencies = [ "winapi 0.2.8", ] +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa 1.0.1", + "pin-project-lite", + "socket2 0.4.4", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.2.3" @@ -1121,6 +1403,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" +dependencies = [ + "autocfg 1.1.0", + "hashbrown 0.11.2", +] + [[package]] name = "indexmap-amortized" version = "1.6.1" @@ -1128,7 +1420,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81b5a05ffb45214e51fdd40c1f773ab57c74d2a7b41cfadc7ea443acf0359df1" dependencies = [ "atone", - "autocfg", + "autocfg 1.1.0", "griddle", ] @@ -1207,6 +1499,9 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin 0.5.2", +] [[package]] name = "libc" @@ -1235,7 +1530,7 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" dependencies = [ - "autocfg", + "autocfg 1.1.0", "scopeguard", ] @@ -1263,11 +1558,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "membarrier" version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "925b0811d7e5fb2b666b5906c5047b7ec23aab78edc4d51b7b0f82dc5c955b1c" +source = "git+https://github.com/glommer/membarrier-rs.git?branch=issue-22#a79ea2d9b6e976b83b7fd709073cf977b1e47581" dependencies = [ "cfg-if", "kernel32-sys", @@ -1287,7 +1587,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" dependencies = [ - "autocfg", + "autocfg 1.1.0", ] [[package]] @@ -1296,7 +1596,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" dependencies = [ - "autocfg", + "autocfg 1.1.0", ] [[package]] @@ -1308,6 +1608,18 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.4.4" @@ -1315,7 +1627,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" dependencies = [ "adler", - "autocfg", + "autocfg 1.1.0", ] [[package]] @@ -1375,6 +1687,16 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" +[[package]] +name = "nom" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "ntapi" version = "0.3.7" @@ -1395,6 +1717,35 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +dependencies = [ + "autocfg 1.1.0", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-bigint-dig" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4547ee5541c18742396ae2c895d0717d0f886d8823b8399cdaf7b07d63ad0480" +dependencies = [ + "autocfg 0.1.8", + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-format" version = "0.4.0" @@ -1411,7 +1762,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ - "autocfg", + "autocfg 1.1.0", "num-traits", ] @@ -1421,7 +1772,7 @@ version = "0.1.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59" dependencies = [ - "autocfg", + "autocfg 1.1.0", "num-integer", "num-traits", ] @@ -1432,7 +1783,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ - "autocfg", + "autocfg 1.1.0", "libm", ] @@ -1482,6 +1833,12 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "owned-alloc" version = "0.2.0" @@ -1494,6 +1851,69 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.5", +] + +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.2", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + +[[package]] +name = "paste" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" + +[[package]] +name = "pem-rfc7468" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e93a3b1cc0510b03020f33f21e62acdde3dcaef432edc95bea377fbd4c2cd4" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1532,6 +1952,30 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "116bee8279d783c0cf370efa1a94632f2108e5ef0bb32df31f051647810a4e2c" +dependencies = [ + "der", + "pem-rfc7468", + "zeroize", +] + +[[package]] +name = "pkcs8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee3ef9b64d26bad0536099c816c6734379e45bbd5f14798def6809e5cc350447" +dependencies = [ + "der", + "pem-rfc7468", + "pkcs1", + "spki", + "zeroize", +] + [[package]] name = "pkg-config" version = "0.3.25" @@ -1668,7 +2112,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" dependencies = [ - "autocfg", + "autocfg 1.1.0", "crossbeam-deque", "either", "rayon-core", @@ -1687,6 +2131,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.5.5" @@ -1734,6 +2187,26 @@ dependencies = [ "libc", ] +[[package]] +name = "rsa" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c2603e2823634ab331437001b411b9ed11660fbc4066f3908c84a9439260d" +dependencies = [ + "byteorder", + "digest 0.9.0", + "lazy_static", + "num-bigint-dig", + "num-integer", + "num-iter", + "num-traits", + "pkcs1", + "pkcs8", + "rand", + "subtle", + "zeroize", +] + [[package]] name = "rustc-demangle" version = "0.1.21" @@ -1749,6 +2222,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64", + "log", + "ring", + "sct 0.6.1", + "webpki 0.21.4", +] + [[package]] name = "rustls" version = "0.20.4" @@ -1757,8 +2243,8 @@ checksum = "4fbfeb8d0ddb84706bc597a5574ab8912817c52a397f819e5b614e2265206921" dependencies = [ "log", "ring", - "sct", - "webpki", + "sct 0.7.0", + "webpki 0.22.0", ] [[package]] @@ -1797,6 +2283,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sct" version = "0.7.0" @@ -1873,6 +2369,19 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha-1" version = "0.10.0" @@ -1881,7 +2390,20 @@ checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.3", +] + +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", ] [[package]] @@ -1959,7 +2481,7 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" dependencies = [ - "autocfg", + "autocfg 1.1.0", "static_assertions", "version_check", ] @@ -2000,12 +2522,135 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c01a0c15da1b0b0e1494112e7af814a678fec9bd157881b49beac661e9b6f32" +dependencies = [ + "der", +] + +[[package]] +name = "sqlformat" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4" +dependencies = [ + "itertools", + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc15591eb44ffb5816a4a70a7efd5dd87bfd3aa84c4c200401c4396140525826" +dependencies = [ + "sqlx-core", + "sqlx-macros", +] + +[[package]] +name = "sqlx-core" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "195183bf6ff8328bb82c0511a83faf60aacf75840103388851db61d7a9854ae3" +dependencies = [ + "ahash", + "atoi", + "bitflags 1.3.2", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "digest 0.9.0", + "either", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-util", + "generic-array", + "hashlink", + "hex", + "indexmap", + "itoa 1.0.1", + "libc", + "log", + "memchr", + "num-bigint", + "once_cell", + "paste", + "percent-encoding", + "rand", + "rsa", + "rustls 0.19.1", + "sha-1 0.9.8", + "sha2", + "smallvec", + "sqlformat", + "sqlx-rt", + "stringprep", + "thiserror", + "tokio-stream", + "url", + "webpki 0.21.4", + "webpki-roots", +] + +[[package]] +name = "sqlx-macros" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eee35713129561f5e55c554bba1c378e2a7e67f81257b7311183de98c50e6f94" +dependencies = [ + "dotenv", + "either", + "heck", + "once_cell", + "proc-macro2", + "quote", + "sha2", + "sqlx-core", + "sqlx-rt", + "syn", + "url", +] + +[[package]] +name = "sqlx-rt" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b555e70fbbf84e269ec3858b7a6515bcfe7a166a7cc9c636dd6efd20431678b6" +dependencies = [ + "once_cell", + "tokio", + "tokio-rustls 0.22.0", +] + [[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "1.0.90" @@ -2017,6 +2662,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "synstructure" version = "0.12.6" @@ -2111,6 +2762,83 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot 0.12.0", + "pin-project-lite", + "signal-hook-registry", + "socket2 0.4.4", + "tokio-macros", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls 0.19.1", + "tokio", + "webpki 0.21.4", +] + +[[package]] +name = "tokio-rustls" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e" +dependencies = [ + "rustls 0.20.4", + "tokio", + "webpki 0.22.0", +] + +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.5.8" @@ -2120,6 +2848,54 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8" +dependencies = [ + "bitflags 1.3.2", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + [[package]] name = "tracing" version = "0.1.32" @@ -2127,6 +2903,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2152,6 +2929,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "tungstenite" version = "0.17.2" @@ -2165,7 +2948,7 @@ dependencies = [ "httparse", "log", "rand", - "sha-1", + "sha-1 0.10.0", "thiserror", "url", "utf-8", @@ -2192,6 +2975,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" + [[package]] name = "unicode-width" version = "0.1.9" @@ -2204,6 +2993,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "untrusted" version = "0.7.1" @@ -2269,6 +3064,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" @@ -2345,6 +3150,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki" version = "0.22.0" @@ -2355,6 +3170,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki-roots" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +dependencies = [ + "webpki 0.21.4", +] + [[package]] name = "winapi" version = "0.2.8" @@ -2397,3 +3221,67 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" + +[[package]] +name = "windows_i686_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" + +[[package]] +name = "windows_i686_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" + +[[package]] +name = "zeroize" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d68d9dcec5f9b43a30d38c49f91dfedfaac384cb8f085faca366c26207dd1619" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] diff --git a/Cargo.toml b/Cargo.toml index 92ec3a4..0ee9fc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "aquatic_common", "aquatic_http", "aquatic_http_load_test", + "aquatic_http_private", "aquatic_http_protocol", "aquatic_toml_config", "aquatic_toml_config_derive", @@ -19,30 +20,19 @@ members = [ ] [patch.crates-io] -aquatic = { path = "aquatic" } -aquatic_cli_helpers = { path = "aquatic_cli_helpers" } -aquatic_common = { path = "aquatic_common" } -aquatic_http_load_test = { path = "aquatic_http_load_test" } -aquatic_http = { path = "aquatic_http" } -aquatic_http_protocol = { path = "aquatic_http_protocol" } -aquatic_toml_config_derive = { path = "aquatic_toml_config_derive" } -aquatic_toml_config = { path = "aquatic_toml_config" } -aquatic_udp_bench = { path = "aquatic_udp_bench" } -aquatic_udp_load_test = { path = "aquatic_udp_load_test" } -aquatic_udp = { path = "aquatic_udp" } -aquatic_udp_protocol = { path = "aquatic_udp_protocol" } -aquatic_ws_load_test = { path = "aquatic_ws_load_test" } -aquatic_ws = { path = "aquatic_ws" } -aquatic_ws_protocol = { path = "aquatic_ws_protocol" } +membarrier = { git = "https://github.com/glommer/membarrier-rs.git", branch = "issue-22" } [profile.release] -debug = true -lto = true +debug = false +lto = "thin" +opt-level = 3 [profile.test] -opt-level = 3 +inherits = "release-debug" [profile.bench] +inherits = "release-debug" + +[profile.release-debug] +inherits = "release" debug = true -opt-level = 3 -lto = true \ No newline at end of file diff --git a/TODO.md b/TODO.md index 24186ca..aeb510a 100644 --- a/TODO.md +++ b/TODO.md @@ -2,10 +2,15 @@ ## High priority +* aquatic_http_private + * Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead + * stored procedure + * test ip format + * site will likely want num_seeders and num_leechers for all torrents.. + ## Medium priority -* Use thin LTO? -* Add release-debug profile? +* rename request workers to swarm workers * quit whole program if any thread panics * config: fail on unrecognized keys? * Run cargo-deny in CI @@ -26,8 +31,6 @@ * add flag to print parsed config when starting * aquatic_udp - * look at proper cpu pinning (check that one thread gets bound per core) - * then consider so_attach_reuseport_cbpf * what poll event capacity is actually needed? * stagger connection cleaning intervals? * load test @@ -35,11 +38,9 @@ with probability 0.2 * aquatic_ws - * glommio - * proper cpu set pinning - * general - * large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes + * large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes +* so_attach_reuseport_cbpf * extract response peers: extract "one extra" to compensate for removal, of sender if present in selection? diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index f58064d..2fbe263 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -13,8 +13,8 @@ readme = "../README.md" name = "aquatic" [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_http = "0.2.0" -aquatic_udp = "0.2.0" -aquatic_ws = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_http = { version = "0.2.0", path = "../aquatic_http" } +aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" } +aquatic_ws = { version = "0.2.0", path = "../aquatic_ws" } mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_cli_helpers/Cargo.toml b/aquatic_cli_helpers/Cargo.toml index 64f7a74..7e93b6c 100644 --- a/aquatic_cli_helpers/Cargo.toml +++ b/aquatic_cli_helpers/Cargo.toml @@ -9,7 +9,7 @@ repository = "https://github.com/greatest-ape/aquatic" readme = "../README.md" [dependencies] -aquatic_toml_config = "0.2.0" +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } anyhow = "1" git-testament = "0.2" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 1a1790f..bb12b7f 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -13,9 +13,10 @@ name = "aquatic_common" [features] cpu-pinning = ["hwloc", "libc"] +rustls-config = ["rustls", "rustls-pemfile"] [dependencies] -aquatic_toml_config = "0.2.0" +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } ahash = "0.7" anyhow = "1" @@ -30,4 +31,8 @@ serde = { version = "1", features = ["derive"] } # cpu-pinning hwloc = { version = "0.5", optional = true } -libc = { version = "0.2", optional = true } \ No newline at end of file +libc = { version = "0.2", optional = true } + +# rustls-config +rustls = { version = "0.20", optional = true } +rustls-pemfile = { version = "0.3", optional = true } \ No newline at end of file diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 6995382..687a547 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -8,6 +8,8 @@ pub mod access_list; #[cfg(feature = "cpu-pinning")] pub mod cpu_pinning; pub mod privileges; +#[cfg(feature = "rustls-config")] +pub mod rustls_config; /// Amortized IndexMap using AHash hasher pub type AmortizedIndexMap = indexmap_amortized::IndexMap; diff --git a/aquatic_common/src/rustls_config.rs b/aquatic_common/src/rustls_config.rs new file mode 100644 index 0000000..b852ec1 --- /dev/null +++ b/aquatic_common/src/rustls_config.rs @@ -0,0 +1,35 @@ +use std::{fs::File, io::BufReader, path::Path}; + +pub type RustlsConfig = rustls::ServerConfig; + +pub fn create_rustls_config( + tls_certificate_path: &Path, + tls_private_key_path: &Path, +) -> anyhow::Result { + let certs = { + let f = File::open(tls_certificate_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::certs(&mut f)? + .into_iter() + .map(|bytes| rustls::Certificate(bytes)) + .collect() + }; + + let private_key = { + let f = File::open(tls_private_key_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::pkcs8_private_keys(&mut f)? + .first() + .map(|bytes| rustls::PrivateKey(bytes.clone())) + .ok_or(anyhow::anyhow!("No private keys in file"))? + }; + + let tls_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, private_key)?; + + Ok(tls_config) +} diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 86b4b5a..c654787 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -19,10 +19,10 @@ name = "aquatic_http" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_http_protocol = "0.2.0" -aquatic_toml_config = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] } +aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } anyhow = "1" cfg-if = "1" diff --git a/aquatic_http/src/common.rs b/aquatic_http/src/common.rs index 5bfa9b1..8088f03 100644 --- a/aquatic_http/src/common.rs +++ b/aquatic_http/src/common.rs @@ -10,8 +10,6 @@ use aquatic_http_protocol::{ response::{AnnounceResponse, ScrapeResponse}, }; -pub type TlsConfig = futures_rustls::rustls::ServerConfig; - #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index b484a53..0f92da5 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -2,15 +2,12 @@ use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::{ access_list::update_access_list, privileges::drop_privileges_after_socket_binding, + rustls_config::create_rustls_config, }; -use common::{State, TlsConfig}; +use common::State; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{consts::SIGUSR1, iterator::Signals}; -use std::{ - fs::File, - io::BufReader, - sync::{atomic::AtomicUsize, Arc}, -}; +use std::sync::{atomic::AtomicUsize, Arc}; use crate::config::Config; @@ -64,7 +61,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - let tls_config = Arc::new(create_tls_config(&config).unwrap()); + let tls_config = Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?); let mut executors = Vec::new(); @@ -151,32 +151,3 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { Ok(()) } - -fn create_tls_config(config: &Config) -> anyhow::Result { - let certs = { - let f = File::open(&config.network.tls_certificate_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::certs(&mut f)? - .into_iter() - .map(|bytes| futures_rustls::rustls::Certificate(bytes)) - .collect() - }; - - let private_key = { - let f = File::open(&config.network.tls_private_key_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::pkcs8_private_keys(&mut f)? - .first() - .map(|bytes| futures_rustls::rustls::PrivateKey(bytes.clone())) - .ok_or(anyhow::anyhow!("No private keys in file"))? - }; - - let tls_config = futures_rustls::rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, private_key)?; - - Ok(tls_config) -} diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/request.rs index 4dd36aa..8ac677e 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/request.rs @@ -337,6 +337,7 @@ pub fn handle_announce_request( announce_interval: config.protocol.peer_announce_interval, peers: ResponsePeerListV4(response_peers), peers6: ResponsePeerListV6(vec![]), + warning_message: None, }; response @@ -366,6 +367,7 @@ pub fn handle_announce_request( announce_interval: config.protocol.peer_announce_interval, peers: ResponsePeerListV4(vec![]), peers6: ResponsePeerListV6(response_peers), + warning_message: None, }; response diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 56e91bd..3992551 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::CanonicalSocketAddr; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; @@ -54,7 +55,7 @@ struct ConnectionReference { pub async fn run_socket_worker( config: Config, state: State, - tls_config: Arc, + tls_config: Arc, request_mesh_builder: MeshBuilder, response_mesh_builder: MeshBuilder, num_bound_sockets: Arc, @@ -195,7 +196,7 @@ impl Connection { response_receiver: LocalReceiver, response_consumer_id: ConsumerId, connection_id: ConnectionId, - tls_config: Arc, + tls_config: Arc, connection_slab: Rc>>, stream: TcpStream, ) -> anyhow::Result<()> { diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 18200ce..ca4f073 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -16,10 +16,10 @@ name = "aquatic_http_load_test" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_http_protocol = "0.2.0" -aquatic_toml_config = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common" } +aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } anyhow = "1" futures-lite = "1" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 6f88e79..a581e89 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -18,6 +18,8 @@ pub struct Config { /// opened as quickly as possible, which is useful when the tracker /// does not keep connections alive. pub connection_creation_interval_ms: u64, + /// Announce/scrape url suffix. Use `/my_token/` to get `/announce/my_token/` + pub url_suffix: String, pub duration: usize, pub torrents: TorrentConfig, #[cfg(feature = "cpu-pinning")] @@ -56,6 +58,7 @@ impl Default for Config { num_workers: 1, num_connections: 128, connection_creation_interval_ms: 10, + url_suffix: "".into(), duration: 0, torrents: TorrentConfig::default(), #[cfg(feature = "cpu-pinning")] diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index fdae866..a8fc57a 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -135,7 +135,7 @@ impl Connection { let request = create_random_request(&self.config, &self.load_test_state, &mut self.rng); - request.write(&mut self.tls.writer())?; + request.write(&mut self.tls.writer(), self.config.url_suffix.as_bytes())?; self.queued_responses += 1; self.send_new_request = false; @@ -213,9 +213,7 @@ impl Connection { } if let Some(body_start_index) = opt_body_start_index { - let interesting_bytes = &interesting_bytes[body_start_index..]; - - match Response::from_bytes(interesting_bytes) { + match Response::from_bytes(&interesting_bytes[body_start_index..]) { Ok(response) => { match response { Response::Announce(_) => { diff --git a/aquatic_http_load_test/src/utils.rs b/aquatic_http_load_test/src/utils.rs index 774f276..c6d9e54 100644 --- a/aquatic_http_load_test/src/utils.rs +++ b/aquatic_http_load_test/src/utils.rs @@ -46,8 +46,9 @@ fn create_announce_request(config: &Config, state: &LoadTestState, rng: &mut imp event, key: None, numwant: None, - compact: true, port: rng.gen(), + bytes_uploaded: 0, + bytes_downloaded: 0, }) } diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml new file mode 100644 index 0000000..0d59f6a --- /dev/null +++ b/aquatic_http_private/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "aquatic_http_private" +version = "0.2.0" +edition = "2021" +authors = ["Joakim FrostegÄrd "] +license = "Apache-2.0" +repository = "https://github.com/greatest-ape/aquatic" +keywords = ["http", "benchmark", "peer-to-peer", "torrent", "bittorrent"] + +[lib] +name = "aquatic_http_private" + +[[bin]] +name = "aquatic_http_private" + +[dependencies] +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] } +aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol", features = ["with-axum"] } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } + +anyhow = "1" +axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] } +dotenv = "0.15" +futures-util = { version = "0.3", default-features = false } +hex = "0.4" +hyper = "0.14" +log = "0.4" +mimalloc = { version = "0.1", default-features = false } +rand = { version = "0.8", features = ["small_rng"] } +rustls = "0.20" +serde = { version = "1", features = ["derive"] } +socket2 = { version = "0.4", features = ["all"] } +sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } +tokio = { version = "1", features = ["full"] } +tokio-rustls = "0.23" diff --git a/aquatic_http_private/README.md b/aquatic_http_private/README.md new file mode 100644 index 0000000..615d6b2 --- /dev/null +++ b/aquatic_http_private/README.md @@ -0,0 +1,96 @@ +# aquatic_http_private + +HTTP (over TLS) BitTorrent tracker that calls a mysql stored procedure to +determine if requests can proceed. + +Work in progress. + +## Usage + +### Database setup + +* Create database (you will typically skip this step and use your own database): + +```sql +CREATE DATABASE aquatic_db; +``` + +* Create aquatic user (use a better password): + +```sql +CREATE USER 'aquatic'@localhost IDENTIFIED BY 'aquatic_password'; +``` + +* Create stored procedure `aquatic_announce_v1`: + +```sql +-- Create stored procedure called by aquatic for each announce request. +-- +-- Set output parameter p_announce_allowed determines to true to allow announce. +CREATE OR REPLACE PROCEDURE aquatic_announce_v1 ( + -- Canonical source ip address (IPv4/IPv6) + IN p_source_ip VARBINARY(16), + -- Source port (not port where peer says it will accept BitTorrent requests) + IN p_source_port SMALLINT UNSIGNED, + -- User agent (can be NULL) + IN p_user_agent TEXT, + -- User token extracted from announce url ('/announce/USER_TOKEN/) + IN p_user_token VARCHAR(255), + -- Hex-encoded info hash + IN p_info_hash CHAR(40), + -- Peer ID. BINARY since it can be any bytes according to spec. + IN p_peer_id BINARY(20), + -- Event (started/stopped/completed) (can be NULL) + IN p_event VARCHAR(9), + -- Bytes uploaded. Passed directly from request. + IN p_uploaded BIGINT UNSIGNED, + -- Bytes downloaded. Passed directly from request. + IN p_downloaded BIGINT UNSIGNED, + -- Bytes left + IN p_left BIGINT UNSIGNED, + -- Return true to send annonunce response. Defaults to false if not set. + OUT p_announce_allowed BOOLEAN, + -- Optional failure reason. Defaults to NULL if not set. + OUT p_failure_reason TEXT, + -- Optional warning message. Defaults to NULL if not set. + OUT p_warning_message TEXT +) +MODIFIES SQL DATA +BEGIN + -- Replace with your custom code + SELECT true INTO p_announce_allowed; +END +``` + +* Give aquatic user permission to call stored procedure: + +```sql +GRANT EXECUTE ON PROCEDURE aquatic_db.aquatic_announce_v1 TO 'aquatic'@localhost; +FLUSH PRIVILEGES; +``` + +`CREATE OR REPLACE PROCEDURE` command, which leaves privileges in place, +requires MariaDB 10.1.3 or later. If your database does not support it, +each time you want to replace the procedure, you need to drop it, then +create it using `CREATE PROCEDURE` and grant execution privileges again. + +### Tracker setup + +* Install rust compiler and cmake + +* Create `.env` file with database credentials: + +```sh +DATABASE_URL="mysql://aquatic:aquatic_password@localhost/aquatic_db" +``` + +* Build and run tracker: + +```sh +# Build +cargo build --release -p aquatic_http_private +# Generate config file (remember to set paths to TLS cert and key) +./target/release/aquatic_http_private -p > http-private-config.toml +# Run tracker +./target/release/aquatic_http_private -c http-private-config.toml +``` diff --git a/aquatic_http_private/src/common.rs b/aquatic_http_private/src/common.rs new file mode 100644 index 0000000..092d09a --- /dev/null +++ b/aquatic_http_private/src/common.rs @@ -0,0 +1,52 @@ +use tokio::sync::{mpsc, oneshot}; + +use aquatic_common::CanonicalSocketAddr; +use aquatic_http_protocol::{common::InfoHash, response::Response}; + +use crate::{config::Config, workers::socket::db::ValidatedAnnounceRequest}; + +#[derive(Debug)] +pub struct ChannelAnnounceRequest { + pub request: ValidatedAnnounceRequest, + pub source_addr: CanonicalSocketAddr, + pub response_sender: oneshot::Sender, +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct RequestWorkerIndex(pub usize); + +impl RequestWorkerIndex { + pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { + Self(info_hash.0[0] as usize % config.request_workers) + } +} + +pub struct ChannelRequestSender(Vec>); + +impl ChannelRequestSender { + pub fn new(senders: Vec>) -> Self { + Self(senders) + } + + pub async fn send_to( + &self, + index: RequestWorkerIndex, + request: ValidatedAnnounceRequest, + source_addr: CanonicalSocketAddr, + ) -> anyhow::Result> { + let (response_sender, response_receiver) = oneshot::channel(); + + let request = ChannelAnnounceRequest { + request, + source_addr, + response_sender, + }; + + match self.0[index.0].send(request).await { + Ok(()) => Ok(response_receiver), + Err(err) => { + Err(anyhow::Error::new(err).context("error sending ChannelAnnounceRequest")) + } + } + } +} diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs new file mode 100644 index 0000000..094c15e --- /dev/null +++ b/aquatic_http_private/src/config.rs @@ -0,0 +1,118 @@ +use std::{net::SocketAddr, path::PathBuf}; + +use aquatic_common::privileges::PrivilegeConfig; +use aquatic_toml_config::TomlConfig; +use serde::Deserialize; + +use aquatic_cli_helpers::LogLevel; + +/// aquatic_http_private configuration +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct Config { + /// Socket workers receive requests from the socket, parse them and send + /// them on to the request workers. They then receive responses from the + /// request workers, encode them and send them back over the socket. + pub socket_workers: usize, + /// Request workers receive a number of requests from socket workers, + /// generate responses and send them back to the socket workers. + pub request_workers: usize, + pub worker_channel_size: usize, + pub db_connections_per_worker: u32, + pub log_level: LogLevel, + pub network: NetworkConfig, + pub protocol: ProtocolConfig, + pub cleaning: CleaningConfig, + pub privileges: PrivilegeConfig, +} + +impl Default for Config { + fn default() -> Self { + Self { + socket_workers: 1, + request_workers: 1, + worker_channel_size: 128, + db_connections_per_worker: 1, + log_level: LogLevel::default(), + network: NetworkConfig::default(), + protocol: ProtocolConfig::default(), + cleaning: CleaningConfig::default(), + privileges: PrivilegeConfig::default(), + } + } +} + +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct NetworkConfig { + /// Bind to this address + pub address: SocketAddr, + /// Path to TLS certificate (DER-encoded X.509) + pub tls_certificate_path: PathBuf, + /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) + pub tls_private_key_path: PathBuf, + pub keep_alive: bool, +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + address: SocketAddr::from(([0, 0, 0, 0], 3000)), + tls_certificate_path: "".into(), + tls_private_key_path: "".into(), + keep_alive: true, + } + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct ProtocolConfig { + /// Maximum number of torrents to accept in scrape request + pub max_scrape_torrents: usize, + /// Maximum number of requested peers to accept in announce request + pub max_peers: usize, + /// Ask peers to announce this often (seconds) + pub peer_announce_interval: usize, +} + +impl Default for ProtocolConfig { + fn default() -> Self { + Self { + max_scrape_torrents: 100, + max_peers: 50, + peer_announce_interval: 300, + } + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct CleaningConfig { + /// Clean peers this often (seconds) + pub torrent_cleaning_interval: u64, + /// Remove peers that have not announced for this long (seconds) + pub max_peer_age: u64, +} + +impl Default for CleaningConfig { + fn default() -> Self { + Self { + torrent_cleaning_interval: 30, + max_peer_age: 360, + } + } +} + +#[cfg(test)] +mod tests { + use super::Config; + + ::aquatic_toml_config::gen_serialize_deserialize_test!(Config); +} diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs new file mode 100644 index 0000000..8e9d7a7 --- /dev/null +++ b/aquatic_http_private/src/lib.rs @@ -0,0 +1,69 @@ +mod common; +pub mod config; +mod workers; + +use std::{collections::VecDeque, sync::Arc}; + +use aquatic_common::rustls_config::create_rustls_config; +use common::ChannelRequestSender; +use dotenv::dotenv; +use tokio::sync::mpsc::channel; + +use config::Config; + +pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tracker"; +pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); + +pub fn run(config: Config) -> anyhow::Result<()> { + dotenv().ok(); + + let tls_config = Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?); + + let mut request_senders = Vec::new(); + let mut request_receivers = VecDeque::new(); + + for _ in 0..config.request_workers { + let (request_sender, request_receiver) = channel(config.worker_channel_size); + + request_senders.push(request_sender); + request_receivers.push_back(request_receiver); + } + + let mut handles = Vec::new(); + + for _ in 0..config.socket_workers { + let config = config.clone(); + let tls_config = tls_config.clone(); + let request_sender = ChannelRequestSender::new(request_senders.clone()); + + let handle = ::std::thread::Builder::new() + .name("socket".into()) + .spawn(move || { + workers::socket::run_socket_worker(config, tls_config, request_sender) + })?; + + handles.push(handle); + } + + for _ in 0..config.request_workers { + let config = config.clone(); + let request_receiver = request_receivers.pop_front().unwrap(); + + let handle = ::std::thread::Builder::new() + .name("request".into()) + .spawn(move || workers::request::run_request_worker(config, request_receiver))?; + + handles.push(handle); + } + + for handle in handles { + handle + .join() + .map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??; + } + + Ok(()) +} diff --git a/aquatic_http_private/src/main.rs b/aquatic_http_private/src/main.rs new file mode 100644 index 0000000..c26aaeb --- /dev/null +++ b/aquatic_http_private/src/main.rs @@ -0,0 +1,14 @@ +use aquatic_cli_helpers::run_app_with_cli_and_config; +use aquatic_http_private::config::Config; + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + +fn main() { + run_app_with_cli_and_config::( + aquatic_http_private::APP_NAME, + aquatic_http_private::APP_VERSION, + aquatic_http_private::run, + None, + ) +} diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs new file mode 100644 index 0000000..63fc0ec --- /dev/null +++ b/aquatic_http_private/src/workers/mod.rs @@ -0,0 +1,2 @@ +pub mod request; +pub mod socket; diff --git a/aquatic_http_private/src/workers/request/common.rs b/aquatic_http_private/src/workers/request/common.rs new file mode 100644 index 0000000..abbcbc3 --- /dev/null +++ b/aquatic_http_private/src/workers/request/common.rs @@ -0,0 +1,122 @@ +use std::net::{Ipv4Addr, Ipv6Addr}; +use std::time::Instant; + +use aquatic_common::{AmortizedIndexMap, ValidUntil}; +use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId}; +use aquatic_http_protocol::response::ResponsePeer; + +pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} + +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped, +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if let Some(0) = opt_bytes_left { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Peer { + pub ip_address: I, + pub port: u16, + pub status: PeerStatus, + pub valid_until: ValidUntil, +} + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PeerMapKey { + pub peer_id: PeerId, + pub ip_address: I, +} + +pub type PeerMap = AmortizedIndexMap, Peer>; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: usize, + pub num_leechers: usize, +} + +impl Default for TorrentData { + #[inline] + fn default() -> Self { + Self { + peers: Default::default(), + num_seeders: 0, + num_leechers: 0, + } + } +} + +pub type TorrentMap = AmortizedIndexMap>; + +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + +impl TorrentMaps { + pub fn clean(&mut self) { + Self::clean_torrent_map(&mut self.ipv4); + Self::clean_torrent_map(&mut self.ipv6); + } + + fn clean_torrent_map(torrent_map: &mut TorrentMap) { + let now = Instant::now(); + + torrent_map.retain(|_, torrent_data| { + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + if peer.valid_until.0 >= now { + true + } else { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + + false + } + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} diff --git a/aquatic_http_private/src/workers/request/mod.rs b/aquatic_http_private/src/workers/request/mod.rs new file mode 100644 index 0000000..358ead6 --- /dev/null +++ b/aquatic_http_private/src/workers/request/mod.rs @@ -0,0 +1,210 @@ +mod common; + +use std::cell::RefCell; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::rc::Rc; + +use aquatic_http_protocol::request::AnnounceRequest; +use rand::prelude::SmallRng; +use rand::SeedableRng; +use tokio::sync::mpsc::Receiver; +use tokio::task::LocalSet; +use tokio::time; + +use aquatic_common::{extract_response_peers, CanonicalSocketAddr, ValidUntil}; +use aquatic_http_protocol::response::{ + AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6, +}; + +use crate::common::ChannelAnnounceRequest; +use crate::config::Config; + +use common::*; + +pub fn run_request_worker( + config: Config, + request_receiver: Receiver, +) -> anyhow::Result<()> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + runtime.block_on(run_inner(config, request_receiver))?; + + Ok(()) +} + +async fn run_inner( + config: Config, + mut request_receiver: Receiver, +) -> anyhow::Result<()> { + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let mut rng = SmallRng::from_entropy(); + + LocalSet::new().spawn_local(periodically_clean_torrents( + config.clone(), + torrents.clone(), + )); + + loop { + let request = request_receiver + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("request channel closed"))?; + + let valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + let response = handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + valid_until, + request.source_addr, + request.request.into(), + ); + + let _ = request.response_sender.send(Response::Announce(response)); + } +} + +async fn periodically_clean_torrents(config: Config, torrents: Rc>) { + let mut interval = time::interval(time::Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )); + + loop { + interval.tick().await; + + torrents.borrow_mut().clean(); + } +} + +fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrent_maps: &mut TorrentMaps, + valid_until: ValidUntil, + source_addr: CanonicalSocketAddr, + request: AnnounceRequest, +) -> AnnounceResponse { + match source_addr.get().ip() { + IpAddr::V4(source_ip) => { + let torrent_data: &mut TorrentData = + torrent_maps.ipv4.entry(request.info_hash).or_default(); + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + torrent_data, + source_ip, + request, + valid_until, + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(response_peers), + peers6: ResponsePeerListV6(vec![]), + warning_message: None, + }; + + response + } + IpAddr::V6(source_ip) => { + let torrent_data: &mut TorrentData = + torrent_maps.ipv6.entry(request.info_hash).or_default(); + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + torrent_data, + source_ip, + request, + valid_until, + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(vec![]), + peers6: ResponsePeerListV6(response_peers), + warning_message: None, + }; + + response + } + } +} + +/// Insert/update peer. Return num_seeders, num_leechers and response peers +pub fn upsert_peer_and_get_response_peers( + config: &Config, + rng: &mut SmallRng, + torrent_data: &mut TorrentData, + source_ip: I, + request: AnnounceRequest, + valid_until: ValidUntil, +) -> (usize, usize, Vec>) { + // Insert/update/remove peer who sent this request + + let peer_status = + PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); + + let peer = Peer { + ip_address: source_ip, + port: request.port, + status: peer_status, + valid_until, + }; + + let peer_map_key = PeerMapKey { + peer_id: request.peer_id, + ip_address: source_ip, + }; + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_map_key.clone(), peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_map_key.clone(), peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; + + let response_peers: Vec> = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_map_key, + Peer::to_response_peer, + ); + + ( + torrent_data.num_seeders, + torrent_data.num_leechers, + response_peers, + ) +} diff --git a/aquatic_http_private/src/workers/socket/db.rs b/aquatic_http_private/src/workers/socket/db.rs new file mode 100644 index 0000000..89b7396 --- /dev/null +++ b/aquatic_http_private/src/workers/socket/db.rs @@ -0,0 +1,119 @@ +use std::net::IpAddr; + +use aquatic_common::CanonicalSocketAddr; +use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse}; +use sqlx::{Executor, MySql, Pool}; + +#[derive(Debug)] +pub struct ValidatedAnnounceRequest(AnnounceRequest); + +impl Into for ValidatedAnnounceRequest { + fn into(self) -> AnnounceRequest { + self.0 + } +} + +#[derive(Debug, sqlx::FromRow)] +struct AnnounceProcedureResults { + announce_allowed: bool, + failure_reason: Option, + warning_message: Option, +} + +pub async fn validate_announce_request( + pool: &Pool, + source_addr: CanonicalSocketAddr, + user_agent: Option, + user_token: String, + request: AnnounceRequest, +) -> Result<(ValidatedAnnounceRequest, Option), FailureResponse> { + match call_announce_procedure(pool, source_addr, user_agent, user_token, &request).await { + Ok(results) => { + if results.announce_allowed { + Ok((ValidatedAnnounceRequest(request), results.warning_message)) + } else { + Err(FailureResponse::new( + results + .failure_reason + .unwrap_or_else(|| "Not allowed".into()), + )) + } + } + Err(err) => { + ::log::error!("announce procedure error: {:#}", err); + + Err(FailureResponse::new("Internal error")) + } + } +} + +async fn call_announce_procedure( + pool: &Pool, + source_addr: CanonicalSocketAddr, + user_agent: Option, + user_token: String, // FIXME: length + request: &AnnounceRequest, +) -> anyhow::Result { + let mut t = pool.begin().await?; + + t.execute( + " + SET + @p_announce_allowed = false, + @p_failure_reason = NULL, + @p_warning_message = NULL; + ", + ) + .await?; + + let q = sqlx::query( + " + CALL aquatic_announce_v1( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + @p_announce_allowed, + @p_failure_reason, + @p_warning_message + ); + ", + ) + .bind(match source_addr.get().ip() { + IpAddr::V4(ip) => Vec::from(ip.octets()), + IpAddr::V6(ip) => Vec::from(ip.octets()), + }) + .bind(source_addr.get().port()) + .bind(user_agent) + .bind(user_token) + .bind(hex::encode(request.info_hash.0)) + .bind(&request.peer_id.0[..]) + .bind(request.event.as_str()) + .bind(request.bytes_uploaded as u64) + .bind(request.bytes_downloaded as u64) + .bind(request.bytes_left as u64); + + t.execute(q).await?; + + let response = sqlx::query_as::<_, AnnounceProcedureResults>( + " + SELECT + @p_announce_allowed as announce_allowed, + @p_failure_reason as failure_reason, + @p_warning_message as warning_message; + + ", + ) + .fetch_one(&mut t) + .await?; + + t.commit().await?; + + Ok(response) +} diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs new file mode 100644 index 0000000..2b142c7 --- /dev/null +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -0,0 +1,97 @@ +pub mod db; +mod routes; +mod tls; + +use std::{ + net::{SocketAddr, TcpListener}, + sync::Arc, +}; + +use anyhow::Context; +use aquatic_common::rustls_config::RustlsConfig; +use axum::{extract::connect_info::Connected, routing::get, Extension, Router}; +use hyper::server::conn::AddrIncoming; +use sqlx::mysql::MySqlPoolOptions; + +use self::tls::{TlsAcceptor, TlsStream}; +use crate::{common::ChannelRequestSender, config::Config}; + +impl<'a> Connected<&'a tls::TlsStream> for SocketAddr { + fn connect_info(target: &'a TlsStream) -> Self { + target.get_remote_addr() + } +} + +pub fn run_socket_worker( + config: Config, + tls_config: Arc, + request_sender: ChannelRequestSender, +) -> anyhow::Result<()> { + let tcp_listener = create_tcp_listener(config.network.address)?; + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + runtime.block_on(run_app(config, tls_config, tcp_listener, request_sender))?; + + Ok(()) +} + +async fn run_app( + config: Config, + tls_config: Arc, + tcp_listener: TcpListener, + request_sender: ChannelRequestSender, +) -> anyhow::Result<()> { + let db_url = + ::std::env::var("DATABASE_URL").with_context(|| "Retrieve env var DATABASE_URL")?; + + let tls_acceptor = TlsAcceptor::new( + tls_config, + AddrIncoming::from_listener(tokio::net::TcpListener::from_std(tcp_listener)?)?, + ); + + let pool = MySqlPoolOptions::new() + .max_connections(config.db_connections_per_worker) + .connect(&db_url) + .await?; + + let app = Router::new() + .route("/announce/:user_token/", get(routes::announce)) + .layer(Extension(Arc::new(config.clone()))) + .layer(Extension(pool)) + .layer(Extension(Arc::new(request_sender))); + + axum::Server::builder(tls_acceptor) + .http1_keepalive(config.network.keep_alive) + .serve(app.into_make_service_with_connect_info::()) + .await?; + + Ok(()) +} + +fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result { + let domain = if addr.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; + + socket + .set_reuse_port(true) + .with_context(|| "set_reuse_port")?; + socket + .set_nonblocking(true) + .with_context(|| "set_nonblocking")?; + socket + .bind(&addr.into()) + .with_context(|| format!("bind to {}", addr))?; + socket + .listen(1024) + .with_context(|| format!("listen on {}", addr))?; + + Ok(socket.into()) +} diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs new file mode 100644 index 0000000..8fd139b --- /dev/null +++ b/aquatic_http_private/src/workers/socket/routes.rs @@ -0,0 +1,65 @@ +use aquatic_common::CanonicalSocketAddr; +use axum::{ + extract::{ConnectInfo, Path, RawQuery}, + headers::UserAgent, + Extension, TypedHeader, +}; +use sqlx::mysql::MySqlPool; +use std::{net::SocketAddr, sync::Arc}; + +use aquatic_http_protocol::{ + request::AnnounceRequest, + response::{FailureResponse, Response}, +}; + +use crate::{ + common::{ChannelRequestSender, RequestWorkerIndex}, + config::Config, +}; + +use super::db; + +pub async fn announce( + Extension(config): Extension>, + Extension(pool): Extension, + Extension(request_sender): Extension>, + ConnectInfo(source_addr): ConnectInfo, + opt_user_agent: Option>, + Path(user_token): Path, + RawQuery(query): RawQuery, +) -> Result { + let query = query.ok_or_else(|| FailureResponse::new("Empty query string"))?; + + let request = AnnounceRequest::from_query_string(&query) + .map_err(|_| FailureResponse::new("Malformed request"))?; + + let request_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash); + let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); + + let source_addr = CanonicalSocketAddr::new(source_addr); + + let (validated_request, opt_warning_message) = + db::validate_announce_request(&pool, source_addr, opt_user_agent, user_token, request) + .await?; + + let response_receiver = request_sender + .send_to(request_worker_index, validated_request, source_addr) + .await + .map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?; + + let mut response = response_receiver.await.map_err(|err| { + internal_error(format!("Receiving response over channel failed: {:#}", err)) + })?; + + if let Response::Announce(ref mut r) = response { + r.warning_message = opt_warning_message; + } + + Ok(response) +} + +fn internal_error(error: String) -> FailureResponse { + ::log::error!("{}", error); + + FailureResponse::new("Internal error") +} diff --git a/aquatic_http_private/src/workers/socket/tls.rs b/aquatic_http_private/src/workers/socket/tls.rs new file mode 100644 index 0000000..3828b29 --- /dev/null +++ b/aquatic_http_private/src/workers/socket/tls.rs @@ -0,0 +1,151 @@ +//! hyper/rustls integration +//! +//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2, +//! otherwise HTTP/1.1 will be used. +//! +//! Based on https://github.com/rustls/hyper-rustls/blob/9b7b1220f74de9b249ce2b8f8b922fd00074c53b/examples/server.rs + +// ISC License (ISC) +// Copyright (c) 2016, Joseph Birr-Pixton +// +// Permission to use, copy, modify, and/or distribute this software for +// any purpose with or without fee is hereby granted, provided that the +// above copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL +// WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE +// AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL +// DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR +// PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +// ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF +// THIS SOFTWARE. + +use core::task::{Context, Poll}; +use futures_util::ready; +use hyper::server::accept::Accept; +use hyper::server::conn::{AddrIncoming, AddrStream}; +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio_rustls::rustls::ServerConfig; + +enum State { + Handshaking(tokio_rustls::Accept, SocketAddr), + Streaming(tokio_rustls::server::TlsStream), +} + +// tokio_rustls::server::TlsStream doesn't expose constructor methods, +// so we have to TlsAcceptor::accept and handshake to have access to it +// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first +pub struct TlsStream { + state: State, +} + +impl TlsStream { + fn new(stream: AddrStream, config: Arc) -> TlsStream { + let remote_addr = stream.remote_addr(); + let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream); + + TlsStream { + state: State::Handshaking(accept, remote_addr), + } + } + + pub fn get_remote_addr(&self) -> SocketAddr { + match &self.state { + State::Handshaking(_, remote_addr) => *remote_addr, + State::Streaming(stream) => stream.get_ref().0.remote_addr(), + } + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf, + ) -> Poll> { + let pin = self.get_mut(); + match pin.state { + State::Handshaking(ref mut accept, ref mut socket_addr) => { + match ready!(Pin::new(accept).poll(cx)) { + Ok(mut stream) => { + *socket_addr = stream.get_ref().0.remote_addr(); + let result = Pin::new(&mut stream).poll_read(cx, buf); + pin.state = State::Streaming(stream); + result + } + Err(err) => Poll::Ready(Err(err)), + } + } + State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let pin = self.get_mut(); + match pin.state { + State::Handshaking(ref mut accept, _) => match ready!(Pin::new(accept).poll(cx)) { + Ok(mut stream) => { + let result = Pin::new(&mut stream).poll_write(cx, buf); + pin.state = State::Streaming(stream); + result + } + Err(err) => Poll::Ready(Err(err)), + }, + State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.state { + State::Handshaking(_, _) => Poll::Ready(Ok(())), + State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx), + } + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.state { + State::Handshaking(_, _) => Poll::Ready(Ok(())), + State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx), + } + } +} + +pub struct TlsAcceptor { + config: Arc, + incoming: AddrIncoming, +} + +impl TlsAcceptor { + pub fn new(config: Arc, incoming: AddrIncoming) -> TlsAcceptor { + TlsAcceptor { config, incoming } + } +} + +impl Accept for TlsAcceptor { + type Conn = TlsStream; + type Error = io::Error; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let pin = self.get_mut(); + match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) { + Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))), + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => Poll::Ready(None), + } + } +} diff --git a/aquatic_http_protocol/Cargo.toml b/aquatic_http_protocol/Cargo.toml index 440ea54..0b31222 100644 --- a/aquatic_http_protocol/Cargo.toml +++ b/aquatic_http_protocol/Cargo.toml @@ -22,8 +22,12 @@ name = "bench_announce_response_to_bytes" path = "benches/bench_announce_response_to_bytes.rs" harness = false +[features] +with-axum = ["axum"] + [dependencies] anyhow = "1" +axum = { version = "0.5", optional = true, default-features = false } hex = { version = "0.4", default-features = false } httparse = "1" itoa = "1" diff --git a/aquatic_http_protocol/benches/bench_announce_response_to_bytes.rs b/aquatic_http_protocol/benches/bench_announce_response_to_bytes.rs index 03a8008..b871cf0 100644 --- a/aquatic_http_protocol/benches/bench_announce_response_to_bytes.rs +++ b/aquatic_http_protocol/benches/bench_announce_response_to_bytes.rs @@ -21,6 +21,7 @@ pub fn bench(c: &mut Criterion) { incomplete: 500, peers: ResponsePeerListV4(peers), peers6: ResponsePeerListV6(Vec::new()), + warning_message: None, }; let response = Response::Announce(announce_response); diff --git a/aquatic_http_protocol/src/common.rs b/aquatic_http_protocol/src/common.rs index bf77550..bfb48b5 100644 --- a/aquatic_http_protocol/src/common.rs +++ b/aquatic_http_protocol/src/common.rs @@ -24,7 +24,7 @@ pub struct InfoHash( pub [u8; 20], ); -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AnnounceEvent { Started, Stopped, @@ -52,6 +52,17 @@ impl FromStr for AnnounceEvent { } } +impl AnnounceEvent { + pub fn as_str(&self) -> Option<&str> { + match self { + Self::Started => Some("started"), + Self::Stopped => Some("stopped"), + Self::Completed => Some("completed"), + Self::Empty => None, + } + } +} + #[cfg(test)] impl quickcheck::Arbitrary for InfoHash { fn arbitrary(g: &mut quickcheck::Gen) -> Self { diff --git a/aquatic_http_protocol/src/request.rs b/aquatic_http_protocol/src/request.rs index 3197fa7..22fe7ed 100644 --- a/aquatic_http_protocol/src/request.rs +++ b/aquatic_http_protocol/src/request.rs @@ -11,17 +11,20 @@ pub struct AnnounceRequest { pub info_hash: InfoHash, pub peer_id: PeerId, pub port: u16, + pub bytes_uploaded: usize, + pub bytes_downloaded: usize, pub bytes_left: usize, pub event: AnnounceEvent, - pub compact: bool, /// Number of response peers wanted pub numwant: Option, pub key: Option>, } impl AnnounceRequest { - fn write(&self, output: &mut W) -> ::std::io::Result<()> { - output.write_all(b"GET /announce?info_hash=")?; + fn write(&self, output: &mut W, url_suffix: &[u8]) -> ::std::io::Result<()> { + output.write_all(b"GET /announce")?; + output.write_all(url_suffix)?; + output.write_all(b"?info_hash=")?; urlencode_20_bytes(self.info_hash.0, output)?; output.write_all(b"&peer_id=")?; @@ -30,7 +33,13 @@ impl AnnounceRequest { output.write_all(b"&port=")?; output.write_all(itoa::Buffer::new().format(self.port).as_bytes())?; - output.write_all(b"&uploaded=0&downloaded=0&left=")?; + output.write_all(b"&uploaded=")?; + output.write_all(itoa::Buffer::new().format(self.bytes_uploaded).as_bytes())?; + + output.write_all(b"&downloaded=")?; + output.write_all(itoa::Buffer::new().format(self.bytes_downloaded).as_bytes())?; + + output.write_all(b"&left=")?; output.write_all(itoa::Buffer::new().format(self.bytes_left).as_bytes())?; match self.event { @@ -40,9 +49,6 @@ impl AnnounceRequest { AnnounceEvent::Empty => (), }; - output.write_all(b"&compact=")?; - output.write_all(itoa::Buffer::new().format(self.compact as u8).as_bytes())?; - if let Some(numwant) = self.numwant { output.write_all(b"&numwant=")?; output.write_all(itoa::Buffer::new().format(numwant).as_bytes())?; @@ -57,6 +63,105 @@ impl AnnounceRequest { Ok(()) } + + pub fn from_query_string(query_string: &str) -> anyhow::Result { + // -- Parse key-value pairs + + let mut opt_info_hash = None; + let mut opt_peer_id = None; + let mut opt_port = None; + let mut opt_bytes_left = None; + let mut opt_bytes_uploaded = None; + let mut opt_bytes_downloaded = None; + let mut event = AnnounceEvent::default(); + let mut opt_numwant = None; + let mut opt_key = None; + + let query_string_bytes = query_string.as_bytes(); + + let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes); + let mut position = 0usize; + + for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) { + let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len()); + + let key = query_string + .get(position..equal_sign_index) + .with_context(|| format!("no key at {}..{}", position, equal_sign_index))?; + let value = query_string + .get(equal_sign_index + 1..segment_end) + .with_context(|| { + format!("no value at {}..{}", equal_sign_index + 1, segment_end) + })?; + + match key { + "info_hash" => { + let value = urldecode_20_bytes(value)?; + + opt_info_hash = Some(InfoHash(value)); + } + "peer_id" => { + let value = urldecode_20_bytes(value)?; + + opt_peer_id = Some(PeerId(value)); + } + "port" => { + opt_port = Some(value.parse::().with_context(|| "parse port")?); + } + "left" => { + opt_bytes_left = Some(value.parse::().with_context(|| "parse left")?); + } + "uploaded" => { + opt_bytes_uploaded = + Some(value.parse::().with_context(|| "parse uploaded")?); + } + "downloaded" => { + opt_bytes_downloaded = + Some(value.parse::().with_context(|| "parse downloaded")?); + } + "event" => { + event = value + .parse::() + .map_err(|err| anyhow::anyhow!("invalid event: {}", err))?; + } + "compact" => { + if value != "1" { + return Err(anyhow::anyhow!("compact set, but not to 1")); + } + } + "numwant" => { + opt_numwant = Some(value.parse::().with_context(|| "parse numwant")?); + } + "key" => { + if value.len() > 100 { + return Err(anyhow::anyhow!("'key' is too long")); + } + opt_key = Some(::urlencoding::decode(value)?.into()); + } + k => { + ::log::debug!("ignored unrecognized key: {}", k) + } + } + + if segment_end == query_string.len() { + break; + } else { + position = segment_end + 1; + } + } + + Ok(AnnounceRequest { + info_hash: opt_info_hash.with_context(|| "no info_hash")?, + peer_id: opt_peer_id.with_context(|| "no peer_id")?, + port: opt_port.with_context(|| "no port")?, + bytes_uploaded: opt_bytes_uploaded.with_context(|| "no uploaded")?, + bytes_downloaded: opt_bytes_downloaded.with_context(|| "no downloaded")?, + bytes_left: opt_bytes_left.with_context(|| "no left")?, + event, + numwant: opt_numwant, + key: opt_key, + }) + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -65,8 +170,10 @@ pub struct ScrapeRequest { } impl ScrapeRequest { - fn write(&self, output: &mut W) -> ::std::io::Result<()> { - output.write_all(b"GET /scrape?")?; + fn write(&self, output: &mut W, url_suffix: &[u8]) -> ::std::io::Result<()> { + output.write_all(b"GET /scrape")?; + output.write_all(url_suffix)?; + output.write_all(b"?")?; let mut first = true; @@ -85,6 +192,53 @@ impl ScrapeRequest { Ok(()) } + + pub fn from_query_string(query_string: &str) -> anyhow::Result { + // -- Parse key-value pairs + + let mut info_hashes = Vec::new(); + + let query_string_bytes = query_string.as_bytes(); + + let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes); + let mut position = 0usize; + + for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) { + let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len()); + + let key = query_string + .get(position..equal_sign_index) + .with_context(|| format!("no key at {}..{}", position, equal_sign_index))?; + let value = query_string + .get(equal_sign_index + 1..segment_end) + .with_context(|| { + format!("no value at {}..{}", equal_sign_index + 1, segment_end) + })?; + + match key { + "info_hash" => { + let value = urldecode_20_bytes(value)?; + + info_hashes.push(InfoHash(value)); + } + k => { + ::log::debug!("ignored unrecognized key: {}", k) + } + } + + if segment_end == query_string.len() { + break; + } else { + position = segment_end + 1; + } + } + + if info_hashes.is_empty() { + return Err(anyhow::anyhow!("No info hashes sent")); + } + + Ok(ScrapeRequest { info_hashes }) + } } #[derive(Debug)] @@ -147,111 +301,21 @@ impl Request { let location = split_parts.next().with_context(|| "no location")?; let query_string = split_parts.next().with_context(|| "no query string")?; - // -- Parse key-value pairs - - let mut info_hashes = Vec::new(); - let mut opt_peer_id = None; - let mut opt_port = None; - let mut opt_bytes_left = None; - let mut event = AnnounceEvent::default(); - let mut opt_numwant = None; - let mut opt_key = None; - - let query_string_bytes = query_string.as_bytes(); - - let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes); - let mut position = 0usize; - - for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) { - let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len()); - - let key = query_string - .get(position..equal_sign_index) - .with_context(|| format!("no key at {}..{}", position, equal_sign_index))?; - let value = query_string - .get(equal_sign_index + 1..segment_end) - .with_context(|| { - format!("no value at {}..{}", equal_sign_index + 1, segment_end) - })?; - - match key { - "info_hash" => { - let value = urldecode_20_bytes(value)?; - - info_hashes.push(InfoHash(value)); - } - "peer_id" => { - let value = urldecode_20_bytes(value)?; - - opt_peer_id = Some(PeerId(value)); - } - "port" => { - opt_port = Some(value.parse::().with_context(|| "parse port")?); - } - "left" => { - opt_bytes_left = Some(value.parse::().with_context(|| "parse left")?); - } - "event" => { - event = value - .parse::() - .map_err(|err| anyhow::anyhow!("invalid event: {}", err))?; - } - "compact" => { - if value != "1" { - return Err(anyhow::anyhow!("compact set, but not to 1")); - } - } - "numwant" => { - opt_numwant = Some(value.parse::().with_context(|| "parse numwant")?); - } - "key" => { - if value.len() > 100 { - return Err(anyhow::anyhow!("'key' is too long")); - } - opt_key = Some(::urlencoding::decode(value)?.into()); - } - k => { - ::log::debug!("ignored unrecognized key: {}", k) - } - } - - if segment_end == query_string.len() { - break; - } else { - position = segment_end + 1; - } - } - - // -- Put together request - if location == "/announce" { - let request = AnnounceRequest { - info_hash: info_hashes.pop().with_context(|| "no info_hash")?, - peer_id: opt_peer_id.with_context(|| "no peer_id")?, - port: opt_port.with_context(|| "no port")?, - bytes_left: opt_bytes_left.with_context(|| "no left")?, - event, - compact: true, - numwant: opt_numwant, - key: opt_key, - }; - - Ok(Request::Announce(request)) + Ok(Request::Announce(AnnounceRequest::from_query_string( + query_string, + )?)) } else { - if info_hashes.is_empty() { - return Err(anyhow::anyhow!("No info hashes sent")); - } - - let request = ScrapeRequest { info_hashes }; - - Ok(Request::Scrape(request)) + Ok(Request::Scrape(ScrapeRequest::from_query_string( + query_string, + )?)) } } - pub fn write(&self, output: &mut W) -> ::std::io::Result<()> { + pub fn write(&self, output: &mut W, url_suffix: &[u8]) -> ::std::io::Result<()> { match self { - Self::Announce(r) => r.write(output), - Self::Scrape(r) => r.write(output), + Self::Announce(r) => r.write(output, url_suffix), + Self::Scrape(r) => r.write(output, url_suffix), } } } @@ -262,7 +326,7 @@ mod tests { use super::*; - static ANNOUNCE_REQUEST_PATH: &str = "/announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=0&downloaded=0&left=1&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started"; + static ANNOUNCE_REQUEST_PATH: &str = "/announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=1&downloaded=2&left=3&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started"; static SCRAPE_REQUEST_PATH: &str = "/scrape?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9"; static REFERENCE_INFO_HASH: [u8; 20] = [ @@ -279,9 +343,10 @@ mod tests { info_hash: InfoHash(REFERENCE_INFO_HASH), peer_id: PeerId(REFERENCE_PEER_ID), port: 12345, - bytes_left: 1, + bytes_uploaded: 1, + bytes_downloaded: 2, + bytes_left: 3, event: AnnounceEvent::Started, - compact: true, numwant: Some(0), key: Some("4ab4b877".into()), }) @@ -325,9 +390,10 @@ mod tests { info_hash: Arbitrary::arbitrary(g), peer_id: Arbitrary::arbitrary(g), port: Arbitrary::arbitrary(g), + bytes_uploaded: Arbitrary::arbitrary(g), + bytes_downloaded: Arbitrary::arbitrary(g), bytes_left: Arbitrary::arbitrary(g), event: Arbitrary::arbitrary(g), - compact: true, numwant: Arbitrary::arbitrary(g), key: key.map(|key| key.into()), } @@ -373,7 +439,7 @@ mod tests { let mut bytes = Vec::new(); - request.write(&mut bytes).unwrap(); + request.write(&mut bytes, &[]).unwrap(); let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); diff --git a/aquatic_http_protocol/src/response.rs b/aquatic_http_protocol/src/response.rs index e14e661..b770658 100644 --- a/aquatic_http_protocol/src/response.rs +++ b/aquatic_http_protocol/src/response.rs @@ -51,10 +51,17 @@ pub struct AnnounceResponse { pub peers: ResponsePeerListV4, #[serde(default)] pub peers6: ResponsePeerListV6, + // Serialize as string if Some, otherwise skip + #[serde( + rename = "warning message", + skip_serializing_if = "Option::is_none", + serialize_with = "serialize_optional_string" + )] + pub warning_message: Option, } impl AnnounceResponse { - fn write(&self, output: &mut W) -> ::std::io::Result { + pub fn write(&self, output: &mut W) -> ::std::io::Result { let mut bytes_written = 0usize; bytes_written += output.write(b"d8:completei")?; @@ -93,12 +100,34 @@ impl AnnounceResponse { bytes_written += output.write(&u128::from(peer.ip_address).to_be_bytes())?; bytes_written += output.write(&peer.port.to_be_bytes())?; } + + if let Some(ref warning_message) = self.warning_message { + let message_bytes = warning_message.as_bytes(); + + bytes_written += output.write(b"15:warning message")?; + bytes_written += + output.write(itoa::Buffer::new().format(message_bytes.len()).as_bytes())?; + bytes_written += output.write(b":")?; + bytes_written += output.write(message_bytes)?; + } + bytes_written += output.write(b"e")?; Ok(bytes_written) } } +#[cfg(feature = "with-axum")] +impl axum::response::IntoResponse for AnnounceResponse { + fn into_response(self) -> axum::response::Response { + let mut response_bytes = Vec::with_capacity(128); + + self.write(&mut response_bytes).unwrap(); + + ([("Content-type", "text/plain")], response_bytes).into_response() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScrapeResponse { /// BTreeMap instead of HashMap since keys need to be serialized in order @@ -106,7 +135,7 @@ pub struct ScrapeResponse { } impl ScrapeResponse { - fn write(&self, output: &mut W) -> ::std::io::Result { + pub fn write(&self, output: &mut W) -> ::std::io::Result { let mut bytes_written = 0usize; bytes_written += output.write(b"d5:filesd")?; @@ -129,6 +158,17 @@ impl ScrapeResponse { } } +#[cfg(feature = "with-axum")] +impl axum::response::IntoResponse for ScrapeResponse { + fn into_response(self) -> axum::response::Response { + let mut response_bytes = Vec::with_capacity(128); + + self.write(&mut response_bytes).unwrap(); + + ([("Content-type", "text/plain")], response_bytes).into_response() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FailureResponse { #[serde(rename = "failure reason")] @@ -142,7 +182,7 @@ impl FailureResponse { } } - fn write(&self, output: &mut W) -> ::std::io::Result { + pub fn write(&self, output: &mut W) -> ::std::io::Result { let mut bytes_written = 0usize; let reason_bytes = self.failure_reason.as_bytes(); @@ -157,6 +197,17 @@ impl FailureResponse { } } +#[cfg(feature = "with-axum")] +impl axum::response::IntoResponse for FailureResponse { + fn into_response(self) -> axum::response::Response { + let mut response_bytes = Vec::with_capacity(64); + + self.write(&mut response_bytes).unwrap(); + + ([("Content-type", "text/plain")], response_bytes).into_response() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum Response { @@ -178,6 +229,17 @@ impl Response { } } +#[cfg(feature = "with-axum")] +impl axum::response::IntoResponse for Response { + fn into_response(self) -> axum::response::Response { + match self { + Self::Announce(r) => r.into_response(), + Self::Scrape(r) => r.into_response(), + Self::Failure(r) => r.into_response(), + } + } +} + #[cfg(test)] impl quickcheck::Arbitrary for ResponsePeer { fn arbitrary(g: &mut quickcheck::Gen) -> Self { @@ -232,6 +294,7 @@ impl quickcheck::Arbitrary for AnnounceResponse { incomplete: usize::arbitrary(g), peers: ResponsePeerListV4::arbitrary(g), peers6: ResponsePeerListV6::arbitrary(g), + warning_message: quickcheck::Arbitrary::arbitrary(g), } } } @@ -264,11 +327,18 @@ mod tests { fn test_announce_response_to_bytes(response: AnnounceResponse) -> bool { let reference = bendy::serde::to_bytes(&Response::Announce(response.clone())).unwrap(); - let mut output = Vec::new(); + let mut hand_written = Vec::new(); - response.write(&mut output).unwrap(); + response.write(&mut hand_written).unwrap(); - output == reference + let success = hand_written == reference; + + if !success { + println!("reference: {}", String::from_utf8_lossy(&reference)); + println!("hand_written: {}", String::from_utf8_lossy(&hand_written)); + } + + success } #[quickcheck] diff --git a/aquatic_http_protocol/src/utils.rs b/aquatic_http_protocol/src/utils.rs index 070e2bb..618eb02 100644 --- a/aquatic_http_protocol/src/utils.rs +++ b/aquatic_http_protocol/src/utils.rs @@ -57,6 +57,17 @@ pub fn urldecode_20_bytes(value: &str) -> anyhow::Result<[u8; 20]> { Ok(out_arr) } +#[inline] +pub fn serialize_optional_string(v: &Option, serializer: S) -> Result +where + S: Serializer, +{ + match v { + Some(s) => serializer.serialize_str(s.as_str()), + None => Err(serde::ser::Error::custom("use skip_serializing_if")), + } +} + #[inline] pub fn serialize_20_bytes(bytes: &[u8; 20], serializer: S) -> Result where diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index e8b2e8c..b98e16e 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -19,10 +19,10 @@ name = "aquatic_udp" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_toml_config = "0.2.0" -aquatic_udp_protocol = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common" } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } +aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } anyhow = "1" cfg-if = "1" diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index 6350d61..6a6e7e3 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -11,11 +11,11 @@ readme = "../README.md" name = "aquatic_udp_bench" [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_toml_config = "0.2.0" -aquatic_udp = "0.2.0" -aquatic_udp_protocol = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common" } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } +aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" } +aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } anyhow = "1" crossbeam-channel = "0.5" diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index 93f5c40..50c215b 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -16,10 +16,10 @@ name = "aquatic_udp_load_test" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_toml_config = "0.2.0" -aquatic_udp_protocol = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common" } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } +aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } anyhow = "1" hashbrown = "0.12" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index af02221..ee23f08 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -20,10 +20,10 @@ name = "aquatic_ws" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_toml_config = "0.2.0" -aquatic_ws_protocol = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } +aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" } anyhow = "1" async-tungstenite = "0.17" diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index 2d2f834..006ceb8 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -5,8 +5,6 @@ use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; -pub type TlsConfig = futures_rustls::rustls::ServerConfig; - #[derive(Default, Clone)] pub struct State { pub access_list: Arc, diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index 9e3deab..ad8028d 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -2,10 +2,9 @@ pub mod common; pub mod config; pub mod workers; -use std::fs::File; -use std::io::BufReader; use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::rustls_config::create_rustls_config; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use signal_hook::{consts::SIGUSR1, iterator::Signals}; @@ -63,7 +62,10 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - let tls_config = Arc::new(create_tls_config(&config).unwrap()); + let tls_config = Arc::new(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?); let mut executors = Vec::new(); @@ -150,32 +152,3 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> { Ok(()) } - -fn create_tls_config(config: &Config) -> anyhow::Result { - let certs = { - let f = File::open(&config.network.tls_certificate_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::certs(&mut f)? - .into_iter() - .map(|bytes| rustls::Certificate(bytes)) - .collect() - }; - - let private_key = { - let f = File::open(&config.network.tls_private_key_path)?; - let mut f = BufReader::new(f); - - rustls_pemfile::pkcs8_private_keys(&mut f)? - .first() - .map(|bytes| rustls::PrivateKey(bytes.clone())) - .ok_or(anyhow::anyhow!("No private keys in file"))? - }; - - let tls_config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certs, private_key)?; - - Ok(tls_config) -} diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 4557f78..7c121d4 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::rustls_config::RustlsConfig; use aquatic_common::CanonicalSocketAddr; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; @@ -49,7 +50,7 @@ struct ConnectionReference { pub async fn run_socket_worker( config: Config, state: State, - tls_config: Arc, + tls_config: Arc, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, num_bound_sockets: Arc, @@ -214,7 +215,7 @@ async fn run_connection( out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, out_message_consumer_id: ConsumerId, connection_id: ConnectionId, - tls_config: Arc, + tls_config: Arc, stream: TcpStream, ) -> anyhow::Result<()> { let peer_addr = stream diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index e6b58d4..f2d521e 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -16,13 +16,13 @@ name = "aquatic_ws_load_test" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -async-tungstenite = "0.17" -aquatic_cli_helpers = "0.2.0" -aquatic_common = "0.2.0" -aquatic_toml_config = "0.2.0" -aquatic_ws_protocol = "0.2.0" +aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" } +aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] } +aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" } +aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" } anyhow = "1" +async-tungstenite = "0.17" futures = "0.3" futures-rustls = "0.22" glommio = "0.7" diff --git a/scripts/run-aquatic-http-private.sh b/scripts/run-aquatic-http-private.sh new file mode 100755 index 0000000..0a9ed31 --- /dev/null +++ b/scripts/run-aquatic-http-private.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +. ./scripts/env-native-cpu-without-avx-512 + +cargo run --profile "release-debug" --bin aquatic_http_private -- $@ diff --git a/scripts/run-aquatic-http.sh b/scripts/run-aquatic-http.sh index 495a259..e693fca 100755 --- a/scripts/run-aquatic-http.sh +++ b/scripts/run-aquatic-http.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_http -- $@ +cargo run --profile "release-debug" --bin aquatic_http -- $@ diff --git a/scripts/run-aquatic-udp.sh b/scripts/run-aquatic-udp.sh index 256322f..0814661 100755 --- a/scripts/run-aquatic-udp.sh +++ b/scripts/run-aquatic-udp.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_udp -- $@ +cargo run --profile "release-debug" --bin aquatic_udp -- $@ diff --git a/scripts/run-aquatic-ws.sh b/scripts/run-aquatic-ws.sh index 40be253..297c71d 100755 --- a/scripts/run-aquatic-ws.sh +++ b/scripts/run-aquatic-ws.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_ws -- $@ +cargo run --profile "release-debug" --bin aquatic_ws -- $@ diff --git a/scripts/run-aquatic.sh b/scripts/run-aquatic.sh index fac0706..1ce91c4 100755 --- a/scripts/run-aquatic.sh +++ b/scripts/run-aquatic.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic -- $@ +cargo run --profile "release-debug" --bin aquatic -- $@ diff --git a/scripts/run-load-test-http.sh b/scripts/run-load-test-http.sh index 86edc66..58bfe4e 100755 --- a/scripts/run-load-test-http.sh +++ b/scripts/run-load-test-http.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_http_load_test -- $@ \ No newline at end of file +cargo run --profile "release-debug" --bin aquatic_http_load_test -- $@ \ No newline at end of file diff --git a/scripts/run-load-test-udp.sh b/scripts/run-load-test-udp.sh index 3946878..b307628 100755 --- a/scripts/run-load-test-udp.sh +++ b/scripts/run-load-test-udp.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_udp_load_test -- $@ +cargo run --profile "release-debug" --bin aquatic_udp_load_test -- $@ diff --git a/scripts/run-load-test-ws.sh b/scripts/run-load-test-ws.sh index 823c195..79dd720 100755 --- a/scripts/run-load-test-ws.sh +++ b/scripts/run-load-test-ws.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release --bin aquatic_ws_load_test -- $@ \ No newline at end of file +cargo run --profile "release-debug" --bin aquatic_ws_load_test -- $@ \ No newline at end of file