diff --git a/Cargo.lock b/Cargo.lock index c580828..669f9bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,9 +51,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.65" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" +checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "aquatic" @@ -376,15 +376,6 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544" -[[package]] -name = "arrayvec" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd9fd44efafa8690358b7408d253adf110036b88f55672a933f01d616ad9b1b9" -dependencies = [ - "nodrop", -] - [[package]] name = "arrayvec" version = "0.7.2" @@ -393,9 +384,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" dependencies = [ "proc-macro2", "quote", @@ -449,9 +440,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.16" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043" +checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" dependencies = [ "async-trait", "axum-core", @@ -479,9 +470,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" +checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" dependencies = [ "async-trait", "bytes", @@ -510,15 +501,15 @@ dependencies = [ [[package]] name = "base64" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64ct" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea2b2456fd614d856680dcd9fcc660a51a820fa09daef2e49772b56a193c8474" +checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" [[package]] name = "bendy" @@ -556,7 +547,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" dependencies = [ "arrayref", - "arrayvec 0.7.2", + "arrayvec", "cc", "cfg-if", "constant_time_eq 0.1.5", @@ -592,9 +583,9 @@ checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5" [[package]] name = "bumpalo" -version = "3.11.0" +version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" [[package]] name = "byteorder" @@ -894,26 +885,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" -dependencies = [ - "dirs-sys", -] - -[[package]] -name = "dirs-sys" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" -dependencies = [ - "libc", - "redox_users", - "winapi 0.3.9", -] - [[package]] name = "dotenv" version = "0.15.0" @@ -922,12 +893,9 @@ checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" [[package]] name = "dotenvy" -version = "0.15.5" +version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9155c8f4dc55c7470ae9da3f63c6785245093b3f6aeb0f5bf2e968efbba314" -dependencies = [ - "dirs", -] +checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0" [[package]] name = "duplicate" @@ -1074,9 +1042,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" dependencies = [ "futures-channel", "futures-core", @@ -1089,9 +1057,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", "futures-sink", @@ -1099,15 +1067,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] name = "futures-executor" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" dependencies = [ "futures-core", "futures-task", @@ -1116,20 +1084,20 @@ dependencies = [ [[package]] name = "futures-intrusive" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e" +checksum = "1b6bdbb8c5a42b2bb5ee8dd9dc2c7d73ce3e15d26dfe100fb347ffa3f58c672b" dependencies = [ "futures-core", "lock_api", - "parking_lot 0.11.2", + "parking_lot", ] [[package]] name = "futures-io" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" [[package]] name = "futures-lite" @@ -1148,9 +1116,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", @@ -1170,21 +1138,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" [[package]] name = "futures-task" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" [[package]] name = "futures-util" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-channel", "futures-core", @@ -1210,9 +1178,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", "js-sys", @@ -1589,9 +1557,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.134" +version = "0.2.136" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb" +checksum = "55edcf6c0bb319052dea84732cf99db461780fd5e8d3eb46ab6ff312ab31f197" [[package]] name = "libm" @@ -1601,9 +1569,9 @@ checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" [[package]] name = "libmimalloc-sys" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11ca136052550448f55df7898c6dbe651c6b574fe38a0d9ea687a9f8088a2e2c" +checksum = "8fc093ab289b0bfda3aa1bdfab9c9542be29c7ef385cfcbe77f8c9813588eb48" dependencies = [ "cc", ] @@ -1679,9 +1647,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f64ad83c969af2e732e907564deb0d0ed393cec4af80776f77dd77a1a427698" +checksum = "76ce6a4b40d3bff9eb3ce9881ca0737a85072f9f975886082640cd46a75cdb35" dependencies = [ "libmimalloc-sys", ] @@ -1709,9 +1677,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" +checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", @@ -1759,12 +1727,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" -[[package]] -name = "nodrop" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" - [[package]] name = "nom" version = "7.1.1" @@ -1816,12 +1778,12 @@ dependencies = [ [[package]] name = "num-format" -version = "0.4.0" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465" +checksum = "54b862ff8df690cf089058c98b183676a7ed0f974cc08b426800093227cbff3b" dependencies = [ - "arrayvec 0.4.12", - "itoa 0.4.8", + "arrayvec", + "itoa 1.0.4", ] [[package]] @@ -1913,17 +1875,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" -[[package]] -name = "parking_lot" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" -dependencies = [ - "instant", - "lock_api", - "parking_lot_core 0.8.5", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -1931,28 +1882,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core 0.9.3", + "parking_lot_core", ] [[package]] name = "parking_lot_core" -version = "0.8.5" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" -dependencies = [ - "cfg-if", - "instant", - "libc", - "redox_syscall", - "smallvec", - "winapi 0.3.9", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +checksum = "4dc9e0dc2adc1c69d09143aff38d3d30c5c3f0df0dad82e6d25547af174ebec0" dependencies = [ "cfg-if", "libc", @@ -2112,9 +2049,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.46" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" dependencies = [ "unicode-ident", ] @@ -2223,17 +2160,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "redox_users" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" -dependencies = [ - "getrandom", - "redox_syscall", - "thiserror", -] - [[package]] name = "regex" version = "1.6.0" @@ -2309,9 +2235,9 @@ checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" [[package]] name = "rustls" -version = "0.20.6" +version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" dependencies = [ "log", "ring", @@ -2367,9 +2293,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.145" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" +checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" dependencies = [ "serde_derive", ] @@ -2405,9 +2331,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.145" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" +checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" dependencies = [ "proc-macro2", "quote", @@ -2416,9 +2342,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.86" +version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41feea4228a6f1cd09ec7a3593a682276702cd67b5273544757dae23c096f074" +checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" dependencies = [ "itoa 1.0.4", "ryu", @@ -2711,9 +2637,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.102" +version = "1.0.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1" +checksum = "a864042229133ada95abf3b54fdc62ef5ccabe9515b64717bcb9a1919e59445d" dependencies = [ "proc-macro2", "quote", @@ -2779,21 +2705,32 @@ dependencies = [ [[package]] name = "time" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d634a985c4d4238ec39cacaed2e7ae552fbd3c476b552c1deac3021b7d7eaf0c" +checksum = "0fab5c8b9980850e06d92ddbe3ab839c062c801f3927c0fb8abd6fc8e918fbca" dependencies = [ "itoa 1.0.4", "libc", "num_threads", + "serde", + "time-core", "time-macros", ] [[package]] -name = "time-macros" -version = "0.2.4" +name = "time-core" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bb801831d812c562ae7d2bfb531f26e66e4e1f6b17307ba4149c5064710e5b" +dependencies = [ + "time-core", +] [[package]] name = "tinytemplate" @@ -2832,7 +2769,7 @@ dependencies = [ "memchr", "mio", "num_cpus", - "parking_lot 0.12.1", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.4.7", @@ -2864,9 +2801,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6edf2d6bc038a43d31353570e27270603f4648d18f5ed10c0e179abe43255af" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" dependencies = [ "futures-core", "pin-project-lite", @@ -2919,9 +2856,9 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" [[package]] name = "tower-service" @@ -3246,46 +3183,60 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ + "windows_aarch64_gnullvm", "windows_aarch64_msvc", "windows_i686_gnu", "windows_i686_msvc", "windows_x86_64_gnu", + "windows_x86_64_gnullvm", "windows_x86_64_msvc", ] [[package]] -name = "windows_aarch64_msvc" -version = "0.36.1" +name = "windows_aarch64_gnullvm" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" [[package]] name = "windows_i686_gnu" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" [[package]] name = "windows_i686_msvc" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" [[package]] name = "windows_x86_64_gnu" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" [[package]] name = "windows_x86_64_msvc" -version = "0.36.1" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" [[package]] name = "zeroize" diff --git a/aquatic_http_load_test/src/common.rs b/aquatic_http_load_test/src/common.rs index ad5ad6b..f187ad5 100644 --- a/aquatic_http_load_test/src/common.rs +++ b/aquatic_http_load_test/src/common.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use rand_distr::Pareto; +use rand_distr::Gamma; pub use aquatic_http_protocol::common::*; pub use aquatic_http_protocol::request::*; @@ -29,7 +29,7 @@ pub struct Statistics { pub struct LoadTestState { pub info_hashes: Arc>, pub statistics: Arc, - pub pareto: Arc>, + pub gamma: Arc>, } #[derive(PartialEq, Eq, Clone, Copy)] diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 5c1a35d..060b401 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -33,24 +33,6 @@ impl aquatic_common::cli::Config for Config { } } -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default, deny_unknown_fields)] -pub struct TorrentConfig { - pub number_of_torrents: usize, - /// Pareto shape - /// - /// Fake peers choose torrents according to Pareto distribution. - pub torrent_selection_pareto_shape: f64, - /// Probability that a generated peer is a seeder - pub peer_seeder_probability: f64, - /// Probability that a generated request is a announce request, as part - /// of sum of the various weight arguments. - pub weight_announce: usize, - /// Probability that a generated request is a scrape request, as part - /// of sum of the various weight arguments. - pub weight_scrape: usize, -} - impl Default for Config { fn default() -> Self { Self { @@ -68,14 +50,33 @@ impl Default for Config { } } +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct TorrentConfig { + pub number_of_torrents: usize, + /// Probability that a generated peer is a seeder + pub peer_seeder_probability: f64, + /// Probability that a generated request is a announce request, as part + /// of sum of the various weight arguments. + pub weight_announce: usize, + /// Probability that a generated request is a scrape request, as part + /// of sum of the various weight arguments. + pub weight_scrape: usize, + /// Peers choose torrents according to this Gamma distribution shape + pub torrent_gamma_shape: f64, + /// Peers choose torrents according to this Gamma distribution scale + pub torrent_gamma_scale: f64, +} + impl Default for TorrentConfig { fn default() -> Self { Self { number_of_torrents: 10_000, peer_seeder_probability: 0.25, - torrent_selection_pareto_shape: 2.0, weight_announce: 5, weight_scrape: 0, + torrent_gamma_shape: 0.2, + torrent_gamma_scale: 100.0, } } } diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 1955885..5dfc0d0 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -6,7 +6,7 @@ use ::glommio::LocalExecutorBuilder; use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker}; use aquatic_common::cpu_pinning::WorkerIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; mod common; mod config; @@ -47,12 +47,16 @@ fn run(config: Config) -> ::anyhow::Result<()> { info_hashes.push(InfoHash(rng.gen())); } - let pareto = Pareto::new(1.0, config.torrents.torrent_selection_pareto_shape).unwrap(); + let gamma = Gamma::new( + config.torrents.torrent_gamma_shape, + config.torrents.torrent_gamma_scale, + ) + .unwrap(); let state = LoadTestState { info_hashes: Arc::new(info_hashes), statistics: Arc::new(Statistics::default()), - pareto: Arc::new(pareto), + gamma: Arc::new(gamma), }; let tls_config = create_tls_config().unwrap(); diff --git a/aquatic_http_load_test/src/utils.rs b/aquatic_http_load_test/src/utils.rs index 313657a..c3dab09 100644 --- a/aquatic_http_load_test/src/utils.rs +++ b/aquatic_http_load_test/src/utils.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rand::distributions::WeightedIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; use crate::common::*; use crate::config::*; @@ -69,12 +69,12 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl #[inline] fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { - pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) + gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) } #[inline] -fn pareto_usize(rng: &mut impl Rng, pareto: &Arc>, max: usize) -> usize { - let p: f64 = pareto.sample(rng); +fn gamma_usize(rng: &mut impl Rng, gamma: &Arc>, max: usize) -> usize { + let p: f64 = gamma.sample(rng); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 2a735b1..a6b22f8 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -21,6 +21,7 @@ use common::{ }; use config::Config; use workers::socket::validator::ConnectionValidator; +use workers::socket::SocketWorker; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -121,11 +122,10 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i), ); - workers::socket::run_socket_worker( + SocketWorker::run( sentinel, state, config, - i, connection_validator, server_start_instant, request_sender, diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index 6daeeb2..f5a4efb 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -1,11 +1,12 @@ -mod requests; -mod responses; mod storage; pub mod validator; +use std::io::{Cursor, ErrorKind}; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use anyhow::Context; +use aquatic_common::access_list::AccessListCache; use aquatic_common::ServerStartInstant; use crossbeam_channel::Receiver; use mio::net::UdpSocket; @@ -21,105 +22,406 @@ use aquatic_udp_protocol::*; use crate::common::*; use crate::config::Config; -use requests::read_requests; -use responses::send_responses; use storage::PendingScrapeResponseSlab; use validator::ConnectionValidator; -pub fn run_socket_worker( - _sentinel: PanicSentinel, - state: State, +pub struct SocketWorker { config: Config, - token_num: usize, - mut connection_validator: ConnectionValidator, - server_start_instant: ServerStartInstant, + shared_state: State, request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - priv_dropper: PrivilegeDropper, -) { - let mut buffer = [0u8; BUFFER_SIZE]; + access_list_cache: AccessListCache, + validator: ConnectionValidator, + server_start_instant: ServerStartInstant, + pending_scrape_responses: PendingScrapeResponseSlab, + socket: UdpSocket, + buffer: [u8; BUFFER_SIZE], +} - let mut socket = - UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); - let mut poll = Poll::new().expect("create poll"); +impl SocketWorker { + pub fn run( + _sentinel: PanicSentinel, + shared_state: State, + config: Config, + validator: ConnectionValidator, + server_start_instant: ServerStartInstant, + request_sender: ConnectedRequestSender, + response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>, + priv_dropper: PrivilegeDropper, + ) { + let socket = + UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket")); + let access_list_cache = create_access_list_cache(&shared_state.access_list); - let interests = Interest::READABLE; + let mut worker = Self { + config, + shared_state, + validator, + server_start_instant, + request_sender, + response_receiver, + access_list_cache, + pending_scrape_responses: Default::default(), + socket, + buffer: [0; BUFFER_SIZE], + }; - poll.registry() - .register(&mut socket, Token(token_num), interests) - .unwrap(); + worker.run_inner(); + } - let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); - let mut access_list_cache = create_access_list_cache(&state.access_list); + pub fn run_inner(&mut self) { + let mut local_responses = Vec::new(); + let mut opt_resend_buffer = + (self.config.network.resend_buffer_max_len > 0).then_some(Vec::new()); - let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); - let mut opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new()); + let mut events = Events::with_capacity(self.config.network.poll_event_capacity); + let mut poll = Poll::new().expect("create poll"); - let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); + poll.registry() + .register(&mut self.socket, Token(0), Interest::READABLE) + .expect("register poll"); - let pending_scrape_cleaning_duration = - Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval); + let poll_timeout = Duration::from_millis(self.config.network.poll_timeout_ms); - let mut pending_scrape_valid_until = - ValidUntil::new(server_start_instant, config.cleaning.max_pending_scrape_age); - let mut last_pending_scrape_cleaning = Instant::now(); + let pending_scrape_cleaning_duration = + Duration::from_secs(self.config.cleaning.pending_scrape_cleaning_interval); - let mut iter_counter = 0usize; + let mut pending_scrape_valid_until = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_pending_scrape_age, + ); + let mut last_pending_scrape_cleaning = Instant::now(); - loop { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); + let mut iter_counter = 0usize; - for event in events.iter() { - let token = event.token(); + loop { + poll.poll(&mut events, Some(poll_timeout)) + .expect("failed polling"); - if (token.0 == token_num) & event.is_readable() { - read_requests( - &config, - &state, - &mut connection_validator, - &mut pending_scrape_responses, - &mut access_list_cache, - &mut socket, - &mut buffer, - &request_sender, - &mut local_responses, - pending_scrape_valid_until, + for event in events.iter() { + if event.is_readable() { + self.read_and_handle_requests(&mut local_responses, pending_scrape_valid_until); + } + } + + // If resend buffer is enabled, send any responses in it + if let Some(resend_buffer) = opt_resend_buffer.as_mut() { + for (response, addr) in resend_buffer.drain(..) { + Self::send_response( + &self.config, + &self.shared_state, + &mut self.socket, + &mut self.buffer, + &mut None, + response, + addr, + ); + } + } + + // Send any connect and error responses generated by this socket worker + for (response, addr) in local_responses.drain(..) { + Self::send_response( + &self.config, + &self.shared_state, + &mut self.socket, + &mut self.buffer, + &mut opt_resend_buffer, + response, + addr, ); } + + // Check channel for any responses generated by swarm workers + for (response, addr) in self.response_receiver.try_iter() { + let opt_response = match response { + ConnectedResponse::Scrape(r) => self + .pending_scrape_responses + .add_and_get_finished(r) + .map(Response::Scrape), + ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), + ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), + }; + + if let Some(response) = opt_response { + Self::send_response( + &self.config, + &self.shared_state, + &mut self.socket, + &mut self.buffer, + &mut opt_resend_buffer, + response, + addr, + ); + } + } + + // Run periodic ValidUntil updates and state cleaning + if iter_counter % 256 == 0 { + let seconds_since_start = self.server_start_instant.seconds_elapsed(); + + pending_scrape_valid_until = ValidUntil::new_with_now( + seconds_since_start, + self.config.cleaning.max_pending_scrape_age, + ); + + let now = Instant::now(); + + if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { + self.pending_scrape_responses.clean(seconds_since_start); + + last_pending_scrape_cleaning = now; + } + } + + iter_counter = iter_counter.wrapping_add(1); } + } - send_responses( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - &mut opt_resend_buffer, - ); + fn read_and_handle_requests( + &mut self, + local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, + pending_scrape_valid_until: ValidUntil, + ) { + let mut requests_received_ipv4: usize = 0; + let mut requests_received_ipv6: usize = 0; + let mut bytes_received_ipv4: usize = 0; + let mut bytes_received_ipv6 = 0; - // Run periodic ValidUntil updates and state cleaning - if iter_counter % 256 == 0 { - let seconds_since_start = server_start_instant.seconds_elapsed(); + loop { + match self.socket.recv_from(&mut self.buffer[..]) { + Ok((bytes_read, src)) => { + if src.port() == 0 { + ::log::info!("Ignored request from {} because source port is zero", src); - pending_scrape_valid_until = ValidUntil::new_with_now( - seconds_since_start, - config.cleaning.max_pending_scrape_age, - ); + continue; + } - let now = Instant::now(); + let src = CanonicalSocketAddr::new(src); - if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration { - pending_scrape_responses.clean(seconds_since_start); + let request_parsable = match Request::from_bytes( + &self.buffer[..bytes_read], + self.config.protocol.max_scrape_torrents, + ) { + Ok(request) => { + self.handle_request( + local_responses, + pending_scrape_valid_until, + request, + src, + ); - last_pending_scrape_cleaning = now; + true + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if self.validator.connection_id_valid(src, connection_id) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_responses.push((response.into(), src)); + } + } + + false + } + }; + + // Update statistics for converted address + if src.is_ipv4() { + if request_parsable { + requests_received_ipv4 += 1; + } + bytes_received_ipv4 += bytes_read; + } else { + if request_parsable { + requests_received_ipv6 += 1; + } + bytes_received_ipv6 += bytes_read; + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { + ::log::warn!("recv_from error: {:#}", err); + } } } - iter_counter = iter_counter.wrapping_add(1); + if self.config.statistics.active() { + self.shared_state + .statistics_ipv4 + .requests_received + .fetch_add(requests_received_ipv4, Ordering::Relaxed); + self.shared_state + .statistics_ipv6 + .requests_received + .fetch_add(requests_received_ipv6, Ordering::Relaxed); + self.shared_state + .statistics_ipv4 + .bytes_received + .fetch_add(bytes_received_ipv4, Ordering::Relaxed); + self.shared_state + .statistics_ipv6 + .bytes_received + .fetch_add(bytes_received_ipv6, Ordering::Relaxed); + } + } + + fn handle_request( + &mut self, + local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, + pending_scrape_valid_until: ValidUntil, + request: Request, + src: CanonicalSocketAddr, + ) { + let access_list_mode = self.config.access_list.mode; + + match request { + Request::Connect(request) => { + let connection_id = self.validator.create_connection_id(src); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_responses.push((response, src)) + } + Request::Announce(request) => { + if self + .validator + .connection_id_valid(src, request.connection_id) + { + if self + .access_list_cache + .load() + .allows(access_list_mode, &request.info_hash.0) + { + let worker_index = + SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash); + + self.request_sender.try_send_to( + worker_index, + ConnectedRequest::Announce(request), + src, + ); + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_responses.push((response, src)) + } + } + } + Request::Scrape(request) => { + if self + .validator + .connection_id_valid(src, request.connection_id) + { + let split_requests = self.pending_scrape_responses.prepare_split_requests( + &self.config, + request, + pending_scrape_valid_until, + ); + + for (swarm_worker_index, request) in split_requests { + self.request_sender.try_send_to( + swarm_worker_index, + ConnectedRequest::Scrape(request), + src, + ); + } + } + } + } + } + + fn send_response( + config: &Config, + shared_state: &State, + socket: &mut UdpSocket, + buffer: &mut [u8], + opt_resend_buffer: &mut Option>, + response: Response, + canonical_addr: CanonicalSocketAddr, + ) { + let mut cursor = Cursor::new(buffer); + + if let Err(err) = response.write(&mut cursor) { + ::log::error!("Converting response to bytes failed: {:#}", err); + + return; + } + + let bytes_written = cursor.position() as usize; + + let addr = if config.network.address.is_ipv4() { + canonical_addr + .get_ipv4() + .expect("found peer ipv6 address while running bound to ipv4 address") + } else { + canonical_addr.get_ipv6_mapped() + }; + + match socket.send_to(&cursor.get_ref()[..bytes_written], addr) { + Ok(amt) if config.statistics.active() => { + let stats = if canonical_addr.is_ipv4() { + &shared_state.statistics_ipv4 + } else { + &shared_state.statistics_ipv6 + }; + + stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); + + match response { + Response::Connect(_) => { + stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); + } + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats + .responses_sent_announce + .fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); + } + } + } + Ok(_) => (), + Err(err) => match opt_resend_buffer.as_mut() { + Some(resend_buffer) + if (err.raw_os_error() == Some(libc::ENOBUFS)) + || (err.kind() == ErrorKind::WouldBlock) => + { + if resend_buffer.len() < config.network.resend_buffer_max_len { + ::log::info!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); + + resend_buffer.push((response, canonical_addr)); + } else { + ::log::warn!("Response resend buffer full, dropping response"); + } + } + _ => { + ::log::warn!("Sending response to {} failed: {:#}", addr, err); + } + }, + } } } diff --git a/aquatic_udp/src/workers/socket/requests.rs b/aquatic_udp/src/workers/socket/requests.rs deleted file mode 100644 index eb995bd..0000000 --- a/aquatic_udp/src/workers/socket/requests.rs +++ /dev/null @@ -1,184 +0,0 @@ -use std::io::ErrorKind; -use std::sync::atomic::Ordering; - -use mio::net::UdpSocket; - -use aquatic_common::{access_list::AccessListCache, CanonicalSocketAddr, ValidUntil}; -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -use super::storage::PendingScrapeResponseSlab; -use super::validator::ConnectionValidator; - -pub fn read_requests( - config: &Config, - state: &State, - connection_validator: &mut ConnectionValidator, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - access_list_cache: &mut AccessListCache, - socket: &mut UdpSocket, - buffer: &mut [u8], - request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, - pending_scrape_valid_until: ValidUntil, -) { - let mut requests_received_ipv4: usize = 0; - let mut requests_received_ipv6: usize = 0; - let mut bytes_received_ipv4: usize = 0; - let mut bytes_received_ipv6 = 0; - - loop { - match socket.recv_from(&mut buffer[..]) { - Ok((bytes_read, src)) => { - if src.port() == 0 { - ::log::info!("Ignored request from {} because source port is zero", src); - - continue; - } - - let res_request = - Request::from_bytes(&buffer[..bytes_read], config.protocol.max_scrape_torrents); - - let src = CanonicalSocketAddr::new(src); - - // Update statistics for converted address - if src.is_ipv4() { - if res_request.is_ok() { - requests_received_ipv4 += 1; - } - bytes_received_ipv4 += bytes_read; - } else { - if res_request.is_ok() { - requests_received_ipv6 += 1; - } - bytes_received_ipv6 += bytes_read; - } - - handle_request( - config, - connection_validator, - pending_scrape_responses, - access_list_cache, - request_sender, - local_responses, - pending_scrape_valid_until, - res_request, - src, - ); - } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - break; - } - Err(err) => { - ::log::warn!("recv_from error: {:#}", err); - } - } - } - - if config.statistics.active() { - state - .statistics_ipv4 - .requests_received - .fetch_add(requests_received_ipv4, Ordering::Release); - state - .statistics_ipv6 - .requests_received - .fetch_add(requests_received_ipv6, Ordering::Release); - state - .statistics_ipv4 - .bytes_received - .fetch_add(bytes_received_ipv4, Ordering::Release); - state - .statistics_ipv6 - .bytes_received - .fetch_add(bytes_received_ipv6, Ordering::Release); - } -} - -fn handle_request( - config: &Config, - connection_validator: &mut ConnectionValidator, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - access_list_cache: &mut AccessListCache, - request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, - pending_scrape_valid_until: ValidUntil, - res_request: Result, - src: CanonicalSocketAddr, -) { - let access_list_mode = config.access_list.mode; - - match res_request { - Ok(Request::Connect(request)) => { - let connection_id = connection_validator.create_connection_id(src); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - local_responses.push((response, src)) - } - Ok(Request::Announce(request)) => { - if connection_validator.connection_id_valid(src, request.connection_id) { - if access_list_cache - .load() - .allows(access_list_mode, &request.info_hash.0) - { - let worker_index = SwarmWorkerIndex::from_info_hash(config, request.info_hash); - - request_sender.try_send_to( - worker_index, - ConnectedRequest::Announce(request), - src, - ); - } else { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".into(), - }); - - local_responses.push((response, src)) - } - } - } - Ok(Request::Scrape(request)) => { - if connection_validator.connection_id_valid(src, request.connection_id) { - let split_requests = pending_scrape_responses.prepare_split_requests( - config, - request, - pending_scrape_valid_until, - ); - - for (swarm_worker_index, request) in split_requests { - request_sender.try_send_to( - swarm_worker_index, - ConnectedRequest::Scrape(request), - src, - ); - } - } - } - Err(err) => { - ::log::debug!("Request::from_bytes error: {:?}", err); - - if let RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } = err - { - if connection_validator.connection_id_valid(src, connection_id) { - let response = ErrorResponse { - transaction_id, - message: err.right_or("Parse error").into(), - }; - - local_responses.push((response.into(), src)); - } - } - } - } -} diff --git a/aquatic_udp/src/workers/socket/responses.rs b/aquatic_udp/src/workers/socket/responses.rs deleted file mode 100644 index 6ed78d1..0000000 --- a/aquatic_udp/src/workers/socket/responses.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::io::{Cursor, ErrorKind}; -use std::sync::atomic::Ordering; -use std::vec::Drain; - -use crossbeam_channel::Receiver; -use libc::ENOBUFS; -use mio::net::UdpSocket; - -use aquatic_common::CanonicalSocketAddr; -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -use super::storage::PendingScrapeResponseSlab; - -pub fn send_responses( - state: &State, - config: &Config, - socket: &mut UdpSocket, - buffer: &mut [u8], - response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - local_responses: Drain<(Response, CanonicalSocketAddr)>, - opt_resend_buffer: &mut Option>, -) { - if let Some(resend_buffer) = opt_resend_buffer { - for (response, addr) in resend_buffer.drain(..) { - send_response(state, config, socket, buffer, response, addr, &mut None); - } - } - - for (response, addr) in local_responses { - send_response( - state, - config, - socket, - buffer, - response, - addr, - opt_resend_buffer, - ); - } - - for (response, addr) in response_receiver.try_iter() { - let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses - .add_and_get_finished(r) - .map(Response::Scrape), - ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), - ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), - }; - - if let Some(response) = opt_response { - send_response( - state, - config, - socket, - buffer, - response, - addr, - opt_resend_buffer, - ); - } - } -} - -fn send_response( - state: &State, - config: &Config, - socket: &mut UdpSocket, - buffer: &mut [u8], - response: Response, - canonical_addr: CanonicalSocketAddr, - resend_buffer: &mut Option>, -) { - let mut cursor = Cursor::new(buffer); - - if let Err(err) = response.write(&mut cursor) { - ::log::error!("Converting response to bytes failed: {:#}", err); - - return; - } - - let bytes_written = cursor.position() as usize; - - let addr = if config.network.address.is_ipv4() { - canonical_addr - .get_ipv4() - .expect("found peer ipv6 address while running bound to ipv4 address") - } else { - canonical_addr.get_ipv6_mapped() - }; - - match socket.send_to(&cursor.get_ref()[..bytes_written], addr) { - Ok(amt) if config.statistics.active() => { - let stats = if canonical_addr.is_ipv4() { - &state.statistics_ipv4 - } else { - &state.statistics_ipv6 - }; - - stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); - - match response { - Response::Connect(_) => { - stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); - } - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { - stats - .responses_sent_announce - .fetch_add(1, Ordering::Relaxed); - } - Response::Scrape(_) => { - stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); - } - Response::Error(_) => { - stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); - } - } - } - Ok(_) => (), - Err(err) => { - match resend_buffer { - Some(resend_buffer) - if (err.raw_os_error() == Some(ENOBUFS)) - || (err.kind() == ErrorKind::WouldBlock) => - { - if resend_buffer.len() < config.network.resend_buffer_max_len { - ::log::info!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); - - resend_buffer.push((response, canonical_addr)); - } else { - ::log::warn!("Response resend buffer full, dropping response"); - } - } - _ => { - ::log::warn!("Sending response to {} failed: {:#}", addr, err); - } - } - } - } -} diff --git a/aquatic_udp/src/workers/swarm/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs index 9122373..6db4ae3 100644 --- a/aquatic_udp/src/workers/swarm/storage.rs +++ b/aquatic_udp/src/workers/swarm/storage.rs @@ -41,7 +41,6 @@ type PeerMap = IndexMap>; pub struct TorrentData { peers: PeerMap, num_seeders: usize, - num_leechers: usize, } impl TorrentData { @@ -62,8 +61,6 @@ impl TorrentData { valid_until, }; - self.num_leechers += 1; - self.peers.insert(peer_id, peer) } PeerStatus::Seeding => { @@ -81,14 +78,11 @@ impl TorrentData { PeerStatus::Stopped => self.peers.remove(&peer_id), }; - match opt_removed_peer.map(|peer| peer.is_seeder) { - Some(true) => { - self.num_seeders -= 1; - } - Some(false) => { - self.num_leechers -= 1; - } - None => {} + if let Some(Peer { + is_seeder: true, .. + }) = opt_removed_peer + { + self.num_seeders -= 1; } } @@ -108,7 +102,7 @@ impl TorrentData { } pub fn num_leechers(&self) -> usize { - self.num_leechers + self.peers.len() - self.num_seeders } pub fn num_seeders(&self) -> usize { @@ -118,24 +112,20 @@ impl TorrentData { pub fn scrape_statistics(&self) -> TorrentScrapeStatistics { create_torrent_scrape_statistics( self.num_seeders.try_into().unwrap_or(i32::MAX), - self.num_leechers.try_into().unwrap_or(i32::MAX), + self.num_leechers().try_into().unwrap_or(i32::MAX), ) } /// Remove inactive peers and reclaim space fn clean(&mut self, now: SecondsSinceServerStart) { self.peers.retain(|_, peer| { - if peer.valid_until.valid(now) { - true - } else { - if peer.is_seeder { - self.num_seeders -= 1; - } else { - self.num_leechers -= 1; - } + let keep = peer.valid_until.valid(now); - false + if (!keep) & peer.is_seeder { + self.num_seeders -= 1; } + + keep }); if !self.peers.is_empty() { @@ -149,7 +139,6 @@ impl Default for TorrentData { Self { peers: Default::default(), num_seeders: 0, - num_leechers: 0, } } } diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index e048bf4..9302764 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -5,7 +5,7 @@ use aquatic_common::CanonicalSocketAddr; use crossbeam_channel::{Receiver, Sender}; use indicatif::ProgressIterator; use rand::Rng; -use rand_distr::Pareto; +use rand_distr::Gamma; use aquatic_udp::common::*; use aquatic_udp_protocol::*; @@ -81,14 +81,14 @@ pub fn create_requests( info_hashes: &[InfoHash], number: usize, ) -> Vec<(AnnounceRequest, CanonicalSocketAddr)> { - let pareto = Pareto::new(1., PARETO_SHAPE).unwrap(); + let gamma = Gamma::new(GAMMA_SHAPE, GAMMA_SCALE).unwrap(); let max_index = info_hashes.len() - 1; let mut requests = Vec::new(); for _ in 0..number { - let info_hash_index = pareto_usize(rng, pareto, max_index); + let info_hash_index = gamma_usize(rng, gamma, max_index); let request = AnnounceRequest { connection_id: ConnectionId(0), diff --git a/aquatic_udp_bench/src/common.rs b/aquatic_udp_bench/src/common.rs index f444166..e3885d0 100644 --- a/aquatic_udp_bench/src/common.rs +++ b/aquatic_udp_bench/src/common.rs @@ -1,8 +1,10 @@ use indicatif::{ProgressBar, ProgressStyle}; use rand::Rng; -use rand_distr::Pareto; +use rand_distr::Gamma; + +pub const GAMMA_SHAPE: f64 = 0.2; +pub const GAMMA_SCALE: f64 = 100.0; -pub const PARETO_SHAPE: f64 = 0.1; pub const NUM_INFO_HASHES: usize = 10_000; pub fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { @@ -12,8 +14,8 @@ pub fn create_progress_bar(name: &str, iterations: u64) -> ProgressBar { ProgressBar::new(iterations).with_style(style) } -pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto, max: usize) -> usize { - let p: f64 = rng.sample(pareto); +pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma, max: usize) -> usize { + let p: f64 = rng.sample(gamma); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index e233779..0a1229f 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -5,7 +5,7 @@ use aquatic_common::CanonicalSocketAddr; use crossbeam_channel::{Receiver, Sender}; use indicatif::ProgressIterator; use rand::Rng; -use rand_distr::Pareto; +use rand_distr::Gamma; use aquatic_udp::common::*; use aquatic_udp_protocol::*; @@ -93,7 +93,7 @@ pub fn create_requests( number: usize, hashes_per_request: usize, ) -> Vec<(ScrapeRequest, CanonicalSocketAddr)> { - let pareto = Pareto::new(1., PARETO_SHAPE).unwrap(); + let gamma = Gamma::new(GAMMA_SHAPE, GAMMA_SCALE).unwrap(); let max_index = info_hashes.len() - 1; @@ -103,7 +103,7 @@ pub fn create_requests( let mut request_info_hashes = Vec::new(); for _ in 0..hashes_per_request { - let info_hash_index = pareto_usize(rng, pareto, max_index); + let info_hash_index = gamma_usize(rng, gamma, max_index); request_info_hashes.push(info_hashes[info_hash_index]) } diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index b2566df..963c80f 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -101,10 +101,10 @@ pub struct RequestConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, - /// Pareto shape - /// - /// Fake peers choose torrents according to Pareto distribution. - pub torrent_selection_pareto_shape: f64, + /// Peers choose torrents according to this Gamma distribution shape + pub torrent_gamma_shape: f64, + /// Peers choose torrents according to this Gamma distribution scale + pub torrent_gamma_scale: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, /// Probability that an additional connect request will be sent for each @@ -116,12 +116,13 @@ impl Default for RequestConfig { fn default() -> Self { Self { number_of_torrents: 10_000, - peer_seeder_probability: 0.25, scrape_max_torrents: 50, weight_connect: 0, - weight_announce: 5, + weight_announce: 100, weight_scrape: 1, - torrent_selection_pareto_shape: 2.0, + torrent_gamma_shape: 0.2, + torrent_gamma_scale: 100.0, + peer_seeder_probability: 0.25, additional_request_probability: 0.5, } } diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 7066b5f..48af77b 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use rand_distr::Pareto; +use rand_distr::Gamma; mod common; mod config; @@ -58,7 +58,11 @@ fn run(config: Config) -> ::anyhow::Result<()> { statistics: Arc::new(Statistics::default()), }; - let pareto = Pareto::new(1.0, config.requests.torrent_selection_pareto_shape).unwrap(); + let gamma = Gamma::new( + config.requests.torrent_gamma_shape, + config.requests.torrent_gamma_scale, + ) + .unwrap(); // Start workers @@ -88,7 +92,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i as usize), ); - run_worker_thread(state, pareto, &config, addr) + run_worker_thread(state, gamma, &config, addr) })?; } diff --git a/aquatic_udp_load_test/src/utils.rs b/aquatic_udp_load_test/src/utils.rs index bae1391..fd32443 100644 --- a/aquatic_udp_load_test/src/utils.rs +++ b/aquatic_udp_load_test/src/utils.rs @@ -1,10 +1,10 @@ use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; use aquatic_udp_protocol::*; -pub fn pareto_usize(rng: &mut impl Rng, pareto: Pareto, max: usize) -> usize { - let p: f64 = rng.sample(pareto); +pub fn gamma_usize(rng: &mut impl Rng, gamma: Gamma, max: usize) -> usize { + let p: f64 = rng.sample(gamma); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize diff --git a/aquatic_udp_load_test/src/worker/mod.rs b/aquatic_udp_load_test/src/worker/mod.rs index 40b0a62..b863660 100644 --- a/aquatic_udp_load_test/src/worker/mod.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -8,7 +8,7 @@ use std::time::Duration; use mio::{net::UdpSocket, Events, Interest, Poll, Token}; use rand::Rng; use rand::{prelude::SmallRng, thread_rng, SeedableRng}; -use rand_distr::Pareto; +use rand_distr::Gamma; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::*; @@ -21,7 +21,7 @@ const MAX_PACKET_SIZE: usize = 8192; pub fn run_worker_thread( state: LoadTestState, - pareto: Pareto, + gamma: Gamma, config: &Config, addr: SocketAddr, ) { @@ -80,7 +80,7 @@ pub fn run_worker_thread( let opt_request = process_response( &mut rng, - pareto, + gamma, &state.info_hashes, &config, &mut torrent_peers, diff --git a/aquatic_udp_load_test/src/worker/request_gen.rs b/aquatic_udp_load_test/src/worker/request_gen.rs index ce99889..8f504fd 100644 --- a/aquatic_udp_load_test/src/worker/request_gen.rs +++ b/aquatic_udp_load_test/src/worker/request_gen.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rand::distributions::WeightedIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; use aquatic_udp_protocol::*; @@ -12,7 +12,7 @@ use crate::utils::*; pub fn process_response( rng: &mut impl Rng, - pareto: Pareto, + gamma: Gamma, info_hashes: &Arc>, config: &Config, torrent_peers: &mut TorrentPeerMap, @@ -32,7 +32,7 @@ pub fn process_response( torrent_peer }) .unwrap_or_else(|| { - create_torrent_peer(config, rng, pareto, info_hashes, r.connection_id) + create_torrent_peer(config, rng, gamma, info_hashes, r.connection_id) }); let new_transaction_id = generate_transaction_id(rng); @@ -190,7 +190,7 @@ fn create_scrape_request( fn create_torrent_peer( config: &Config, rng: &mut impl Rng, - pareto: Pareto, + gamma: Gamma, info_hashes: &Arc>, connection_id: ConnectionId, ) -> TorrentPeer { @@ -199,10 +199,10 @@ fn create_torrent_peer( let mut scrape_hash_indeces = Vec::new(); for _ in 0..num_scape_hashes { - scrape_hash_indeces.push(select_info_hash_index(config, rng, pareto)) + scrape_hash_indeces.push(select_info_hash_index(config, rng, gamma)) } - let info_hash_index = select_info_hash_index(config, rng, pareto); + let info_hash_index = select_info_hash_index(config, rng, gamma); TorrentPeer { info_hash: info_hashes[info_hash_index], @@ -213,6 +213,6 @@ fn create_torrent_peer( } } -fn select_info_hash_index(config: &Config, rng: &mut impl Rng, pareto: Pareto) -> usize { - pareto_usize(rng, pareto, config.requests.number_of_torrents - 1) +fn select_info_hash_index(config: &Config, rng: &mut impl Rng, gamma: Gamma) -> usize { + gamma_usize(rng, gamma, config.requests.number_of_torrents - 1) } diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 200d04f..288f484 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; -use rand_distr::Pareto; +use rand_distr::Gamma; pub use aquatic_ws_protocol::*; @@ -19,7 +19,7 @@ pub struct Statistics { pub struct LoadTestState { pub info_hashes: Arc>, pub statistics: Arc, - pub pareto: Arc>, + pub gamma: Arc>, } #[derive(PartialEq, Eq, Clone, Copy)] diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 370769d..eff0ae5 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -45,10 +45,6 @@ impl Default for Config { pub struct TorrentConfig { pub offers_per_request: usize, pub number_of_torrents: usize, - /// Pareto shape - /// - /// Fake peers choose torrents according to Pareto distribution. - pub torrent_selection_pareto_shape: f64, /// Probability that a generated peer is a seeder pub peer_seeder_probability: f64, /// Probability that a generated request is a announce request, as part @@ -57,6 +53,10 @@ pub struct TorrentConfig { /// Probability that a generated request is a scrape request, as part /// of sum of the various weight arguments. pub weight_scrape: usize, + /// Peers choose torrents according to this Gamma distribution shape + pub torrent_gamma_shape: f64, + /// Peers choose torrents according to this Gamma distribution scale + pub torrent_gamma_scale: f64, } impl Default for TorrentConfig { @@ -65,9 +65,10 @@ impl Default for TorrentConfig { offers_per_request: 10, number_of_torrents: 10_000, peer_seeder_probability: 0.25, - torrent_selection_pareto_shape: 2.0, weight_announce: 5, weight_scrape: 0, + torrent_gamma_shape: 0.2, + torrent_gamma_scale: 100.0, } } } diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index c660267..67a4a00 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -6,7 +6,7 @@ use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_fo use aquatic_common::cpu_pinning::WorkerIndex; use glommio::LocalExecutorBuilder; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; mod common; mod config; @@ -44,12 +44,16 @@ fn run(config: Config) -> ::anyhow::Result<()> { info_hashes.push(InfoHash(rng.gen())); } - let pareto = Pareto::new(1.0, config.torrents.torrent_selection_pareto_shape).unwrap(); + let gamma = Gamma::new( + config.torrents.torrent_gamma_shape, + config.torrents.torrent_gamma_scale, + ) + .unwrap(); let state = LoadTestState { info_hashes: Arc::new(info_hashes), statistics: Arc::new(Statistics::default()), - pareto: Arc::new(pareto), + gamma: Arc::new(gamma), }; let tls_config = create_tls_config().unwrap(); diff --git a/aquatic_ws_load_test/src/utils.rs b/aquatic_ws_load_test/src/utils.rs index a7e568a..74646b1 100644 --- a/aquatic_ws_load_test/src/utils.rs +++ b/aquatic_ws_load_test/src/utils.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use rand::distributions::WeightedIndex; use rand::prelude::*; -use rand_distr::Pareto; +use rand_distr::Gamma; use crate::common::*; use crate::config::*; @@ -88,12 +88,12 @@ fn create_scrape_request(config: &Config, state: &LoadTestState, rng: &mut impl #[inline] fn select_info_hash_index(config: &Config, state: &LoadTestState, rng: &mut impl Rng) -> usize { - pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) + gamma_usize(rng, &state.gamma, config.torrents.number_of_torrents - 1) } #[inline] -fn pareto_usize(rng: &mut impl Rng, pareto: &Arc>, max: usize) -> usize { - let p: f64 = pareto.sample(rng); +fn gamma_usize(rng: &mut impl Rng, gamma: &Arc>, max: usize) -> usize { + let p: f64 = gamma.sample(rng); let p = (p.min(101.0f64) - 1.0) / 100.0; (p * max as f64) as usize diff --git a/scripts/bench-udp-handlers.sh b/scripts/bench-udp-handlers.sh index fc3dfc4..0142c20 100755 --- a/scripts/bench-udp-handlers.sh +++ b/scripts/bench-udp-handlers.sh @@ -2,4 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -cargo run --release -p aquatic_udp_bench -- $@ +cargo run --profile "release-debug" -p aquatic_udp_bench -- $@