diff --git a/Cargo.lock b/Cargo.lock index b358509..f1dcab0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "addr2line" version = "0.16.0" @@ -23,9 +25,9 @@ checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217" [[package]] name = "ahash" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ "getrandom", "once_cell", @@ -43,9 +45,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.43" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" +checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" [[package]] name = "aquatic" @@ -72,8 +74,13 @@ dependencies = [ name = "aquatic_common" version = "0.1.0" dependencies = [ + "anyhow", + "arc-swap", + "hashbrown 0.11.2", + "hex", "indexmap", "rand", + "serde", ] [[package]] @@ -275,6 +282,12 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "arc-swap" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6df5aef5c5830360ce5218cecb8f018af3438af5686ae945094affc86fdec63" + [[package]] name = "arrayvec" version = "0.4.12" @@ -350,9 +363,9 @@ dependencies = [ [[package]] name = "bstr" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90682c8d613ad3373e66de8c6411e0ae2ab2571e879d2efbf73558cc66f21279" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" dependencies = [ "lazy_static", "memchr", @@ -362,9 +375,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" +checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538" [[package]] name = "byteorder" @@ -389,9 +402,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.70" +version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d26a6ce4b6a484fa3edb70f7efa6fc430fd2b87285fe8b84304fd0936faa0dc0" +checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" [[package]] name = "cfg-if" @@ -425,22 +438,22 @@ dependencies = [ [[package]] name = "console" -version = "0.14.1" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3993e6445baa160675931ec041a5e03ca84b9c6e32a056150d3aa2bdda0a1f45" +checksum = "a28b32d32ca44b70c3e4acd7db1babf555fa026e385fb95f18028f88848b3c31" dependencies = [ "encode_unicode", - "lazy_static", "libc", + "once_cell", "terminal_size", "winapi", ] [[package]] name = "core-foundation" -version = "0.9.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62" +checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3" dependencies = [ "core-foundation-sys", "libc", @@ -448,9 +461,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" [[package]] name = "cpufeatures" @@ -685,9 +698,9 @@ checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" [[package]] name = "half" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" +checksum = "ac5956d4e63858efaec57e0d6c1c2f6a41e1487f830314a324ccd7e2223a7ca0" [[package]] name = "halfbrown" @@ -715,7 +728,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" dependencies = [ - "ahash 0.7.4", + "ahash 0.7.6", "serde", ] @@ -742,9 +755,9 @@ checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" [[package]] name = "http" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" dependencies = [ "bytes", "fnv", @@ -792,9 +805,9 @@ dependencies = [ [[package]] name = "instant" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" +checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd" dependencies = [ "cfg-if", ] @@ -816,9 +829,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "js-sys" -version = "0.3.53" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4bf49d50e2961077d9c99f4b7997d770a1114f087c3c2e0069b36c13fc2979d" +checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" dependencies = [ "wasm-bindgen", ] @@ -831,9 +844,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.101" +version = "0.2.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" +checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" [[package]] name = "libm" @@ -950,14 +963,15 @@ dependencies = [ [[package]] name = "nix" -version = "0.19.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ccba0cfe4fdf15982d1674c69b1fd80bad427d293849982668dfe454bd61f2" +checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" dependencies = [ "bitflags", "cc", "cfg-if", "libc", + "memoffset", ] [[package]] @@ -1070,9 +1084,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" -version = "0.9.66" +version = "0.9.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1996d2d305e561b70d1ee0c53f1542833f4e1ac6ce9a6708b6ff2738ca67dc82" +checksum = "69df2d8dfc6ce3aaf44b40dec6f487d5a886516cf6879c49e98e0710f310a058" dependencies = [ "autocfg", "cc", @@ -1114,9 +1128,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pkg-config" -version = "0.3.19" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" +checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb" [[package]] name = "plotters" @@ -1148,15 +1162,15 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.10" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741" [[package]] name = "privdrop" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd4c2739642e70439d1c0d9545beec45c1e54128739b3cda29bf2c366028c87" +checksum = "4c02cf257b10e4b807bccadb19630d5dea7e0369c3c5e84673ee8e58dc8da6a5" dependencies = [ "libc", "nix", @@ -1164,9 +1178,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.29" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d" +checksum = "edc3358ebc67bc8b7fa0c007f945b0b18226f78437d61bec735a9eb96b61ee70" dependencies = [ "unicode-xid", ] @@ -1195,9 +1209,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05" dependencies = [ "proc-macro2", ] @@ -1235,9 +1249,9 @@ dependencies = [ [[package]] name = "rand_distr" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "051b398806e42b9cd04ad9ec8f81e355d0a382c543ac6672c62f5a5b452ef142" +checksum = "964d548f8e7d12e102ef183a0de7e98180c9f8729f555897a857b96e48122d2f" dependencies = [ "num-traits", "rand", @@ -1444,9 +1458,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.67" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7f9e390c27c3c0ce8bc5d725f6e4d30a29d26659494aa4b17535f7522c5c950" +checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8" dependencies = [ "itoa", "ryu", @@ -1468,9 +1482,9 @@ dependencies = [ [[package]] name = "simd-json" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "529edb21cdc2629d7214de58672ec9fe5678b623e8fffb03327f77d7291d8865" +checksum = "e67eb096671712144fb1357787c4312720c99444f52900ca2a20bee57a02cc64" dependencies = [ "halfbrown", "serde", @@ -1487,9 +1501,9 @@ checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559" [[package]] name = "simplelog" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59d0fe306a0ced1c88a58042dc22fc2ddd000982c26d75f6aa09a394547c41e0" +checksum = "85d04ae642154220ef00ee82c36fb07853c10a4f2a0ca6719f9991211d2eb959" dependencies = [ "chrono", "log", @@ -1498,15 +1512,15 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" +checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" [[package]] name = "smallvec" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" +checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" [[package]] name = "smartstring" @@ -1519,9 +1533,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", "winapi", @@ -1535,9 +1549,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "syn" -version = "1.0.76" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84" +checksum = "d010a1623fbd906d51d650a9916aaefc05ffa0e4053ff7fe601167f3e715d194" dependencies = [ "proc-macro2", "quote", @@ -1546,9 +1560,9 @@ dependencies = [ [[package]] name = "synstructure" -version = "0.12.5" +version = "0.12.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "474aaa926faa1603c40b7885a9eaea29b444d1cb2850cb7c0e37bb1a4182f4fa" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2", "quote", @@ -1600,18 +1614,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.29" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88" +checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.29" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" +checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" dependencies = [ "proc-macro2", "quote", @@ -1640,9 +1654,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.3.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848a1e1181b9f6753b5e96a092749e29b11d19ede67dfbbd6c7dc7e0f49b5338" +checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7" dependencies = [ "tinyvec_macros", ] @@ -1689,9 +1703,9 @@ checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec" [[package]] name = "unicode-bidi" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "246f4c42e67e7a4e3c6106ff716a5d067d4132a642840b242e357e468a2a0085" +checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" [[package]] name = "unicode-normalization" @@ -1704,9 +1718,9 @@ dependencies = [ [[package]] name = "unicode-width" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" @@ -1781,9 +1795,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.76" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce9b1b516211d33767048e5d47fa2a381ed8b76fc48d2ce4aa39877f9f183e0" +checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1791,9 +1805,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.76" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe8dc78e2326ba5f845f4b5bf548401604fa20b1dd1d365fb73b6c1d6364041" +checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" dependencies = [ "bumpalo", "lazy_static", @@ -1806,9 +1820,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.76" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44468aa53335841d9d6b6c023eaab07c0cd4bddbcfdee3e2bb1e8d2cb8069fef" +checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1816,9 +1830,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.76" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0195807922713af1e67dc66132c7328206ed9766af3858164fb583eedc25fbad" +checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" dependencies = [ "proc-macro2", "quote", @@ -1829,15 +1843,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.76" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdb075a845574a1fa5f09fd77e43f7747599301ea3417a9fbffdeedfc1f4a29" +checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" [[package]] name = "web-sys" -version = "0.3.53" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "224b2f6b67919060055ef1a67807367c2066ed520c3862cc013d26cf893a783c" +checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/README.md b/README.md index b6f1100..1538cdc 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,15 @@ The configuration file values you will most likely want to adjust are `address` under the `network` section (listening address). This goes for all three protocols. -Some documentation of the various options might be available under +Access control by info hash is supported for all protocols. Relevant part of configuration: + +```toml +[access_list] +mode = 'off' # Change to 'black' (blacklist) or 'white' (whitelist) +path = '' # Path to text file with newline-delimited hex-encoded info hashes +``` + +Some more documentation of configuration file values might be available under `src/lib/config.rs` in crates `aquatic_udp`, `aquatic_http`, `aquatic_ws`. ## Details on implementations diff --git a/TODO.md b/TODO.md index fd226d8..45e45d5 100644 --- a/TODO.md +++ b/TODO.md @@ -1,13 +1,23 @@ # TODO -* Consider turning on safety and override flags in mimalloc (mostly for - simd-json) +* access lists: + * use arc-swap Cache + * test functionality + * aquatic_udp + * aquatic_http + * aquatic_ws, including sending back new error responses + +* aquatic_ws: should it send back error on message parse error, or does that + just indicate that not enough data has been received yet? + +* Consider turning on safety and override flags in mimalloc, mostly for + simd-json. It might be faster to just stop using simd-json if I consider + it insecure, which it maybe isn't. ## General * extract response peers: extract "one extra" to compensate for removal, of sender if present in selection? maybe make criterion benchmark, - optimize. consider rerunning udp benchmark, last change (filtering out - sender) might have had an adverse impact on performance. + optimize ## aquatic_http_load_test * how handle large number of peers for "popular" torrents in keepalive mode? @@ -24,7 +34,6 @@ ## aquatic_http * test torrent transfer with real clients - * test tls * scrape: does it work (serialization etc), and with multiple hashes? * 'left' optional in magnet requests? Probably not. Transmission sends huge positive number. @@ -40,12 +49,6 @@ scrape requests I suppose. ## aquatic_ws -* panic when unwrapping peer_address after peer closes connection: - -``` -thread 'socket-01' panicked at 'called `Result::unwrap()` on an `Err` value: Os { code: 22, kind: InvalidInput, message: "Invalid argument" }', aquatic_ws/src/lib/network/connection.rs:28:59 -``` - * websocket_max_frame_size should be at least something like 64 * 1024, maybe put it and message size at 128k just to be sure * test transfer, specifically ipv6/ipv4 mapping diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index b74988f..02fbe6e 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -11,5 +11,10 @@ repository = "https://github.com/greatest-ape/aquatic" name = "aquatic_common" [dependencies] +anyhow = "1" +arc-swap = "1" +hashbrown = "0.11.2" +hex = "0.4" indexmap = "1" rand = { version = "0.8", features = ["small_rng"] } +serde = { version = "1", features = ["derive"] } diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs new file mode 100644 index 0000000..93b8373 --- /dev/null +++ b/aquatic_common/src/access_list.rs @@ -0,0 +1,93 @@ +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::PathBuf; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use hashbrown::HashSet; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AccessListMode { + /// Only serve torrents with info hash present in file + White, + /// Do not serve torrents if info hash present in file + Black, + /// Turn off access list functionality + Off, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AccessListConfig { + pub mode: AccessListMode, + /// Path to access list file consisting of newline-separated hex-encoded info hashes. + /// + /// If using chroot mode, path must be relative to new root. + pub path: PathBuf, +} + +impl Default for AccessListConfig { + fn default() -> Self { + Self { + path: "".into(), + mode: AccessListMode::Off, + } + } +} + +pub struct AccessList(ArcSwap>); + +impl Default for AccessList { + fn default() -> Self { + Self(ArcSwap::from(Arc::new(HashSet::default()))) + } +} + +impl AccessList { + fn parse_info_hash(line: String) -> anyhow::Result<[u8; 20]> { + let mut bytes = [0u8; 20]; + + hex::decode_to_slice(line, &mut bytes)?; + + Ok(bytes) + } + + pub fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { + let file = File::open(path)?; + let reader = BufReader::new(file); + + let mut new_list = HashSet::new(); + + for line in reader.lines() { + new_list.insert(Self::parse_info_hash(line?)?); + } + + self.0.store(Arc::new(new_list)); + + Ok(()) + } + + pub fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { + match list_mode { + AccessListMode::White => self.0.load().contains(info_hash_bytes), + AccessListMode::Black => !self.0.load().contains(info_hash_bytes), + AccessListMode::Off => true, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_info_hash() { + let f = AccessList::parse_info_hash; + + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeee".into()).is_err()); + assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeö".into()).is_err()); + } +} diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 7ba33c1..3e90bbc 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -4,6 +4,8 @@ use std::time::{Duration, Instant}; use indexmap::IndexMap; use rand::Rng; +pub mod access_list; + /// Peer or connection valid until this instant /// /// Used instead of "last seen" or similar to hopefully prevent arithmetic diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 5804342..e36e390 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -1,6 +1,8 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; +use std::time::Instant; +use aquatic_common::access_list::AccessList; use crossbeam_channel::{Receiver, Sender}; use either::Either; use hashbrown::HashMap; @@ -16,6 +18,8 @@ use aquatic_http_protocol::common::*; use aquatic_http_protocol::request::Request; use aquatic_http_protocol::response::{Response, ResponsePeer}; +use crate::config::Config; + pub const LISTENER_TOKEN: Token = Token(0); pub const CHANNEL_TOKEN: Token = Token(1); @@ -113,14 +117,62 @@ pub struct TorrentMaps { pub ipv6: TorrentMap, } +impl TorrentMaps { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + Self::clean_torrent_map(config, access_list, &mut self.ipv4); + Self::clean_torrent_map(config, access_list, &mut self.ipv6); + } + + fn clean_torrent_map( + config: &Config, + access_list: &Arc, + torrent_map: &mut TorrentMap, + ) { + let now = Instant::now(); + + torrent_map.retain(|info_hash, torrent_data| { + if !access_list.allows(config.access_list.mode, &info_hash.0) { + return false; + } + + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + #[derive(Clone)] pub struct State { + pub access_list: Arc, pub torrent_maps: Arc>, } impl Default for State { fn default() -> Self { Self { + access_list: Arc::new(Default::default()), torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())), } } diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index 9b40fd0..fe75932 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; +use aquatic_common::access_list::AccessListConfig; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -21,6 +22,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub statistics: StatisticsConfig, pub privileges: PrivilegeConfig, + pub access_list: AccessListConfig, } impl aquatic_cli_helpers::Config for Config { @@ -111,6 +113,7 @@ impl Default for Config { cleaning: CleaningConfig::default(), statistics: StatisticsConfig::default(), privileges: PrivilegeConfig::default(), + access_list: AccessListConfig::default(), } } } diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index f96fbb7..2662b2b 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -22,12 +22,16 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; pub fn run(config: Config) -> anyhow::Result<()> { let state = State::default(); + tasks::update_access_list(&config, &state); + start_workers(config.clone(), state.clone())?; loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_torrents(&state); + tasks::update_access_list(&config, &state); + + state.torrent_maps.lock().clean(&config, &state.access_list); } } @@ -51,6 +55,7 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { for i in 0..config.socket_workers { let config = config.clone(); + let state = state.clone(); let socket_worker_statuses = socket_worker_statuses.clone(); let request_channel_sender = request_channel_sender.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone(); @@ -67,6 +72,7 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { .spawn(move || { network::run_socket_worker( config, + state, i, socket_worker_statuses, request_channel_sender, diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 56fba90..c14ed92 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec::Drain; +use aquatic_http_protocol::request::Request; use hashbrown::HashMap; use log::{debug, error, info}; use mio::net::TcpListener; @@ -25,6 +26,7 @@ const CONNECTION_CLEAN_INTERVAL: usize = 2 ^ 22; pub fn run_socket_worker( config: Config, + state: State, socket_worker_index: usize, socket_worker_statuses: SocketWorkerStatuses, request_channel_sender: RequestChannelSender, @@ -38,6 +40,7 @@ pub fn run_socket_worker( run_poll_loop( config, + &state, socket_worker_index, request_channel_sender, response_channel_receiver, @@ -55,6 +58,7 @@ pub fn run_socket_worker( pub fn run_poll_loop( config: Config, + state: &State, socket_worker_index: usize, request_channel_sender: RequestChannelSender, response_channel_receiver: ResponseChannelReceiver, @@ -100,6 +104,7 @@ pub fn run_poll_loop( } else if token != CHANNEL_TOKEN { handle_connection_read_event( &config, + &state, socket_worker_index, &mut poll, &request_channel_sender, @@ -179,6 +184,7 @@ fn accept_new_streams( /// then read requests and pass on through channel. pub fn handle_connection_read_event( config: &Config, + state: &State, socket_worker_index: usize, poll: &mut Poll, request_channel_sender: &RequestChannelSender, @@ -187,6 +193,7 @@ pub fn handle_connection_read_event( poll_token: Token, ) { let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + let access_list_mode = config.access_list.mode; loop { // Get connection, updating valid_until @@ -203,6 +210,22 @@ pub fn handle_connection_read_event( if let Some(established) = connection.get_established() { match established.read_request() { + Ok(Request::Announce(ref r)) + if !state.access_list.allows(access_list_mode, &r.info_hash.0) => + { + let meta = ConnectionMeta { + worker_index: socket_worker_index, + poll_token, + peer_addr: established.peer_addr, + }; + let response = FailureResponse::new("Info hash not allowed"); + + debug!("read disallowed request, sending back error response"); + + local_responses.push((meta, Response::Failure(response))); + + break; + } Ok(request) => { let meta = ConnectionMeta { worker_index: socket_worker_index, @@ -210,7 +233,7 @@ pub fn handle_connection_read_event( peer_addr: established.peer_addr, }; - debug!("read request, sending to handler"); + debug!("read allowed request, sending on to channel"); if let Err(err) = request_channel_sender.send((meta, request)) { error!("RequestChannelSender: couldn't send message: {:?}", err); @@ -233,9 +256,7 @@ pub fn handle_connection_read_event( peer_addr: established.peer_addr, }; - let response = FailureResponse { - failure_reason: "invalid request".to_string(), - }; + let response = FailureResponse::new("Invalid request"); local_responses.push((meta, Response::Failure(response))); diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs index 0ec2275..3273ebd 100644 --- a/aquatic_http/src/lib/tasks.rs +++ b/aquatic_http/src/lib/tasks.rs @@ -1,45 +1,18 @@ -use std::time::Instant; - use histogram::Histogram; -use crate::common::*; +use aquatic_common::access_list::AccessListMode; -pub fn clean_torrents(state: &State) { - let mut torrent_maps = state.torrent_maps.lock(); +use crate::{common::*, config::Config}; - clean_torrent_map(&mut torrent_maps.ipv4); - clean_torrent_map(&mut torrent_maps.ipv6); -} - -fn clean_torrent_map(torrent_map: &mut TorrentMap) { - let now = Instant::now(); - - torrent_map.retain(|_, torrent_data| { - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; +pub fn update_access_list(config: &Config, state: &State) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = state.access_list.update_from_path(&config.access_list.path) { + ::log::error!("Couldn't update access list: {:?}", err); } - - keep - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); + } + AccessListMode::Off => {} + } } pub fn print_statistics(state: &State) { diff --git a/aquatic_http_protocol/src/response.rs b/aquatic_http_protocol/src/response.rs index b8e7598..e14e661 100644 --- a/aquatic_http_protocol/src/response.rs +++ b/aquatic_http_protocol/src/response.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::io::Write; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -131,10 +132,16 @@ impl ScrapeResponse { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FailureResponse { #[serde(rename = "failure reason")] - pub failure_reason: String, + pub failure_reason: Cow<'static, str>, } impl FailureResponse { + pub fn new>>(reason: S) -> Self { + Self { + failure_reason: reason.into(), + } + } + fn write(&self, output: &mut W) -> ::std::io::Result { let mut bytes_written = 0usize; @@ -242,7 +249,7 @@ impl quickcheck::Arbitrary for ScrapeResponse { impl quickcheck::Arbitrary for FailureResponse { fn arbitrary(g: &mut quickcheck::Gen) -> Self { Self { - failure_reason: String::arbitrary(g), + failure_reason: String::arbitrary(g).into(), } } } diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 83e3385..67f0f4b 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -1,14 +1,17 @@ use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::{atomic::AtomicUsize, Arc}; +use std::time::Instant; use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; -pub use aquatic_common::ValidUntil; +pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_udp_protocol::*; +use crate::config::Config; + pub const MAX_PACKET_SIZE: usize = 4096; pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { @@ -33,6 +36,15 @@ pub struct ConnectionKey { pub socket_addr: SocketAddr, } +impl ConnectionKey { + pub fn new(connection_id: ConnectionId, socket_addr: SocketAddr) -> Self { + Self { + connection_id, + socket_addr, + } + } +} + pub type ConnectionMap = HashMap; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] @@ -108,6 +120,54 @@ pub struct TorrentMaps { pub ipv6: TorrentMap, } +impl TorrentMaps { + /// Remove disallowed and inactive torrents + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + let now = Instant::now(); + + let access_list_mode = config.access_list.mode; + + self.ipv4.retain(|info_hash, torrent| { + access_list.allows(access_list_mode, &info_hash.0) + && Self::clean_torrent_and_peers(now, torrent) + }); + self.ipv4.shrink_to_fit(); + + self.ipv6.retain(|info_hash, torrent| { + access_list.allows(access_list_mode, &info_hash.0) + && Self::clean_torrent_and_peers(now, torrent) + }); + self.ipv6.shrink_to_fit(); + } + + /// Returns true if torrent is to be kept + #[inline] + fn clean_torrent_and_peers(now: Instant, torrent: &mut TorrentData) -> bool { + let num_seeders = &mut torrent.num_seeders; + let num_leechers = &mut torrent.num_leechers; + + torrent.peers.retain(|_, peer| { + let keep = peer.valid_until.0 > now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent.peers.is_empty() + } +} + #[derive(Default)] pub struct Statistics { pub requests_received: AtomicUsize, @@ -119,6 +179,7 @@ pub struct Statistics { #[derive(Clone)] pub struct State { + pub access_list: Arc, pub connections: Arc>, pub torrents: Arc>, pub statistics: Arc, @@ -127,6 +188,7 @@ pub struct State { impl Default for State { fn default() -> Self { Self { + access_list: Arc::new(AccessList::default()), connections: Arc::new(Mutex::new(HashMap::new())), torrents: Arc::new(Mutex::new(TorrentMaps::default())), statistics: Arc::new(Statistics::default()), diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index cb75ea9..5ce9685 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; +use aquatic_common::access_list::AccessListConfig; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -21,6 +22,7 @@ pub struct Config { pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, + pub access_list: AccessListConfig, } impl aquatic_cli_helpers::Config for Config { @@ -113,6 +115,7 @@ impl Default for Config { statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), + access_list: AccessListConfig::default(), } } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs deleted file mode 100644 index 52f800b..0000000 --- a/aquatic_udp/src/lib/handlers.rs +++ /dev/null @@ -1,436 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::time::Duration; -use std::vec::Drain; - -use crossbeam_channel::{Receiver, Sender}; -use parking_lot::MutexGuard; -use rand::{ - rngs::{SmallRng, StdRng}, - Rng, SeedableRng, -}; - -use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -pub fn run_request_worker( - state: State, - config: Config, - request_receiver: Receiver<(Request, SocketAddr)>, - response_sender: Sender<(Response, SocketAddr)>, -) { - let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); - let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); - let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - - let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); - - let mut std_rng = StdRng::from_entropy(); - let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); - - let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); - - loop { - let mut opt_connections = None; - - // Collect requests from channel, divide them by type - // - // Collect a maximum number of request. Stop collecting before that - // number is reached if having waited for too long for a request, but - // only if ConnectionMap mutex isn't locked. - for i in 0..config.handlers.max_requests_per_iter { - let (request, src): (Request, SocketAddr) = if i == 0 { - match request_receiver.recv() { - Ok(r) => r, - Err(_) => break, // Really shouldn't happen - } - } else { - match request_receiver.recv_timeout(timeout) { - Ok(r) => r, - Err(_) => { - if let Some(guard) = state.connections.try_lock() { - opt_connections = Some(guard); - - break; - } else { - continue; - } - } - } - }; - - match request { - Request::Connect(r) => connect_requests.push((r, src)), - Request::Announce(r) => announce_requests.push((r, src)), - Request::Scrape(r) => scrape_requests.push((r, src)), - } - } - - let mut connections: MutexGuard = - opt_connections.unwrap_or_else(|| state.connections.lock()); - - handle_connect_requests( - &config, - &mut connections, - &mut std_rng, - connect_requests.drain(..), - &mut responses, - ); - - announce_requests.retain(|(request, src)| { - let connection_key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; - - if connections.contains_key(&connection_key) { - true - } else { - let response = ErrorResponse { - transaction_id: request.transaction_id, - message: "Connection invalid or expired".to_string(), - }; - - responses.push((response.into(), *src)); - - false - } - }); - - scrape_requests.retain(|(request, src)| { - let connection_key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; - - if connections.contains_key(&connection_key) { - true - } else { - let response = ErrorResponse { - transaction_id: request.transaction_id, - message: "Connection invalid or expired".to_string(), - }; - - responses.push((response.into(), *src)); - - false - } - }); - - ::std::mem::drop(connections); - - if !(announce_requests.is_empty() && scrape_requests.is_empty()) { - let mut torrents = state.torrents.lock(); - - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); - } - - for r in responses.drain(..) { - if let Err(err) = response_sender.send(r) { - ::log::error!("error sending response to channel: {}", err); - } - } - } -} - -#[inline] -pub fn handle_connect_requests( - config: &Config, - connections: &mut MutexGuard, - rng: &mut StdRng, - requests: Drain<(ConnectRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - responses.extend(requests.map(|(request, src)| { - let connection_id = ConnectionId(rng.gen()); - - let key = ConnectionKey { - connection_id, - socket_addr: src, - }; - - connections.insert(key, valid_until); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - (response, src) - })); -} - -#[inline] -pub fn handle_announce_requests( - config: &Config, - torrents: &mut MutexGuard, - rng: &mut SmallRng, - requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - responses.extend(requests.map(|(request, src)| { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), - }; - - (response.into(), src) - })); -} - -fn handle_announce_request( - config: &Config, - rng: &mut SmallRng, - torrents: &mut TorrentMap, - request: AnnounceRequest, - peer_ip: I, - peer_valid_until: ValidUntil, -) -> AnnounceResponse { - let peer_key = PeerMapKey { - ip: peer_ip, - peer_id: request.peer_id, - }; - - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); - - let peer = Peer { - ip_address: peer_ip, - port: request.port, - status: peer_status, - valid_until: peer_valid_until, - }; - - let torrent_data = torrents.entry(request.info_hash).or_default(); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_key, - Peer::to_response_peer, - ); - - AnnounceResponse { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), - peers: response_peers, - } -} - -#[inline] -pub fn handle_scrape_requests( - torrents: &mut MutexGuard, - requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let empty_stats = create_torrent_scrape_statistics(0, 0); - - responses.extend(requests.map(|(request, src)| { - let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); - - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv4.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } else { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv6.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } - - let response = Response::Scrape(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats: stats, - }); - - (response, src) - })); -} - -#[inline] -fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { - if peers_wanted <= 0 { - config.protocol.max_response_peers as usize - } else { - ::std::cmp::min( - config.protocol.max_response_peers as usize, - peers_wanted as usize, - ) - } -} - -#[inline(always)] -pub fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders), - completed: NumberOfDownloads(0), // No implementation planned - leechers: NumberOfPeers(leechers), - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::net::Ipv4Addr; - - use indexmap::IndexMap; - use quickcheck::{quickcheck, TestResult}; - use rand::thread_rng; - - use super::*; - - fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { - let ip_address = Ipv4Addr::from(i.to_be_bytes()); - let peer_id = PeerId([0; 20]); - - let key = PeerMapKey { - ip: ip_address, - peer_id, - }; - let value = Peer { - ip_address, - port: Port(1), - status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), - }; - - (key, value) - } - - #[test] - fn test_extract_response_peers() { - fn prop(data: (u16, u16)) -> TestResult { - let gen_num_peers = data.0 as u32; - let req_num_peers = data.1 as usize; - - let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); - - let mut opt_sender_key = None; - let mut opt_sender_peer = None; - - for i in 0..gen_num_peers { - let (key, value) = gen_peer_map_key_and_value((i << 16) + i); - - if i == 0 { - opt_sender_key = Some(key); - opt_sender_peer = Some(value.to_response_peer()); - } - - peer_map.insert(key, value); - } - - let mut rng = thread_rng(); - - let peers = extract_response_peers( - &mut rng, - &peer_map, - req_num_peers, - opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), - Peer::to_response_peer, - ); - - // Check that number of returned peers is correct - - let mut success = peers.len() <= req_num_peers; - - if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize - || peers.len() + 1 == gen_num_peers as usize; - } - - // Check that returned peers are unique (no overlap) and that sender - // isn't returned - - let mut ip_addresses = HashSet::with_capacity(peers.len()); - - for peer in peers { - if peer == opt_sender_peer.clone().unwrap() - || ip_addresses.contains(&peer.ip_address) - { - success = false; - - break; - } - - ip_addresses.insert(peer.ip_address); - } - - TestResult::from_bool(success) - } - - quickcheck(prop as fn((u16, u16)) -> TestResult); - } -} diff --git a/aquatic_udp/src/lib/handlers/announce.rs b/aquatic_udp/src/lib/handlers/announce.rs new file mode 100644 index 0000000..bda60d6 --- /dev/null +++ b/aquatic_udp/src/lib/handlers/announce.rs @@ -0,0 +1,220 @@ +use std::net::{IpAddr, SocketAddr}; +use std::vec::Drain; + +use parking_lot::MutexGuard; +use rand::rngs::SmallRng; + +use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +#[inline] +pub fn handle_announce_requests( + config: &Config, + torrents: &mut MutexGuard, + rng: &mut SmallRng, + requests: Drain<(AnnounceRequest, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, +) { + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + responses.extend(requests.map(|(request, src)| { + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + let response = match peer_ip { + IpAddr::V4(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + }; + + (Response::Announce(response), src) + })); +} + +fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMap, + request: AnnounceRequest, + peer_ip: I, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + let peer_key = PeerMapKey { + ip: peer_ip, + peer_id: request.peer_id, + }; + + let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: peer_status, + valid_until: peer_valid_until, + }; + + let torrent_data = torrents.entry(request.info_hash).or_default(); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); + + let response_peers = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_key, + Peer::to_response_peer, + ); + + AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), + leechers: NumberOfPeers(torrent_data.num_leechers as i32), + seeders: NumberOfPeers(torrent_data.num_seeders as i32), + peers: response_peers, + } +} + +#[inline] +fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { + if peers_wanted <= 0 { + config.protocol.max_response_peers as usize + } else { + ::std::cmp::min( + config.protocol.max_response_peers as usize, + peers_wanted as usize, + ) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::Ipv4Addr; + + use indexmap::IndexMap; + use quickcheck::{quickcheck, TestResult}; + use rand::thread_rng; + + use super::*; + + fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { + let ip_address = Ipv4Addr::from(i.to_be_bytes()); + let peer_id = PeerId([0; 20]); + + let key = PeerMapKey { + ip: ip_address, + peer_id, + }; + let value = Peer { + ip_address, + port: Port(1), + status: PeerStatus::Leeching, + valid_until: ValidUntil::new(0), + }; + + (key, value) + } + + #[test] + fn test_extract_response_peers() { + fn prop(data: (u16, u16)) -> TestResult { + let gen_num_peers = data.0 as u32; + let req_num_peers = data.1 as usize; + + let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); + + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + + for i in 0..gen_num_peers { + let (key, value) = gen_peer_map_key_and_value((i << 16) + i); + + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(value.to_response_peer()); + } + + peer_map.insert(key, value); + } + + let mut rng = thread_rng(); + + let peers = extract_response_peers( + &mut rng, + &peer_map, + req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), + Peer::to_response_peer, + ); + + // Check that number of returned peers is correct + + let mut success = peers.len() <= req_num_peers; + + if req_num_peers >= gen_num_peers as usize { + success &= peers.len() == gen_num_peers as usize + || peers.len() + 1 == gen_num_peers as usize; + } + + // Check that returned peers are unique (no overlap) and that sender + // isn't returned + + let mut ip_addresses = HashSet::with_capacity(peers.len()); + + for peer in peers { + if peer == opt_sender_peer.clone().unwrap() + || ip_addresses.contains(&peer.ip_address) + { + success = false; + + break; + } + + ip_addresses.insert(peer.ip_address); + } + + TestResult::from_bool(success) + } + + quickcheck(prop as fn((u16, u16)) -> TestResult); + } +} diff --git a/aquatic_udp/src/lib/handlers/connect.rs b/aquatic_udp/src/lib/handlers/connect.rs new file mode 100644 index 0000000..e058241 --- /dev/null +++ b/aquatic_udp/src/lib/handlers/connect.rs @@ -0,0 +1,39 @@ +use std::net::SocketAddr; +use std::vec::Drain; + +use parking_lot::MutexGuard; +use rand::{rngs::StdRng, Rng}; + +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +#[inline] +pub fn handle_connect_requests( + config: &Config, + connections: &mut MutexGuard, + rng: &mut StdRng, + requests: Drain<(ConnectRequest, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, +) { + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + + responses.extend(requests.map(|(request, src)| { + let connection_id = ConnectionId(rng.gen()); + + let key = ConnectionKey { + connection_id, + socket_addr: src, + }; + + connections.insert(key, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + (response, src) + })); +} diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/handlers/mod.rs new file mode 100644 index 0000000..a1906d9 --- /dev/null +++ b/aquatic_udp/src/lib/handlers/mod.rs @@ -0,0 +1,148 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use crossbeam_channel::{Receiver, Sender}; +use parking_lot::MutexGuard; +use rand::{ + rngs::{SmallRng, StdRng}, + SeedableRng, +}; + +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +mod announce; +mod connect; +mod scrape; + +use announce::handle_announce_requests; +use connect::handle_connect_requests; +use scrape::handle_scrape_requests; + +pub fn run_request_worker( + state: State, + config: Config, + request_receiver: Receiver<(Request, SocketAddr)>, + response_sender: Sender<(Response, SocketAddr)>, +) { + let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); + let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); + let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); + + let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); + + let mut std_rng = StdRng::from_entropy(); + let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); + + let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); + + loop { + let mut opt_connections = None; + + // Collect requests from channel, divide them by type + // + // Collect a maximum number of request. Stop collecting before that + // number is reached if having waited for too long for a request, but + // only if ConnectionMap mutex isn't locked. + for i in 0..config.handlers.max_requests_per_iter { + let (request, src): (Request, SocketAddr) = if i == 0 { + match request_receiver.recv() { + Ok(r) => r, + Err(_) => break, // Really shouldn't happen + } + } else { + match request_receiver.recv_timeout(timeout) { + Ok(r) => r, + Err(_) => { + if let Some(guard) = state.connections.try_lock() { + opt_connections = Some(guard); + + break; + } else { + continue; + } + } + } + }; + + match request { + Request::Connect(r) => connect_requests.push((r, src)), + Request::Announce(r) => announce_requests.push((r, src)), + Request::Scrape(r) => scrape_requests.push((r, src)), + } + } + + let mut connections: MutexGuard = + opt_connections.unwrap_or_else(|| state.connections.lock()); + + handle_connect_requests( + &config, + &mut connections, + &mut std_rng, + connect_requests.drain(..), + &mut responses, + ); + + // Check announce and scrape requests for valid connections + + announce_requests.retain(|(request, src)| { + let connection_valid = + connections.contains_key(&ConnectionKey::new(request.connection_id, *src)); + + if !connection_valid { + responses.push(( + create_invalid_connection_response(request.transaction_id), + *src, + )); + } + + connection_valid + }); + + scrape_requests.retain(|(request, src)| { + let connection_valid = + connections.contains_key(&ConnectionKey::new(request.connection_id, *src)); + + if !connection_valid { + responses.push(( + create_invalid_connection_response(request.transaction_id), + *src, + )); + } + + connection_valid + }); + + ::std::mem::drop(connections); + + // Generate responses for announce and scrape requests + + if !(announce_requests.is_empty() && scrape_requests.is_empty()) { + let mut torrents = state.torrents.lock(); + + handle_announce_requests( + &config, + &mut torrents, + &mut small_rng, + announce_requests.drain(..), + &mut responses, + ); + handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + } + + for r in responses.drain(..) { + if let Err(err) = response_sender.send(r) { + ::log::error!("error sending response to channel: {}", err); + } + } + } +} + +fn create_invalid_connection_response(transaction_id: TransactionId) -> Response { + Response::Error(ErrorResponse { + transaction_id, + message: "Connection invalid or expired".into(), + }) +} diff --git a/aquatic_udp/src/lib/handlers/scrape.rs b/aquatic_udp/src/lib/handlers/scrape.rs new file mode 100644 index 0000000..8198bd8 --- /dev/null +++ b/aquatic_udp/src/lib/handlers/scrape.rs @@ -0,0 +1,64 @@ +use std::net::SocketAddr; +use std::vec::Drain; + +use parking_lot::MutexGuard; + +use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_udp_protocol::*; + +use crate::common::*; + +#[inline] +pub fn handle_scrape_requests( + torrents: &mut MutexGuard, + requests: Drain<(ScrapeRequest, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, +) { + let empty_stats = create_torrent_scrape_statistics(0, 0); + + responses.extend(requests.map(|(request, src)| { + let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); + + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv4.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } else { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv6.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } + + let response = Response::Scrape(ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: stats, + }); + + (response, src) + })); +} + +#[inline(always)] +fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers), + } +} diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index bdc2159..5e32d83 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -23,29 +23,42 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); + tasks::update_access_list(&config, &state); + let num_bound_sockets = start_workers(config.clone(), state.clone())?; if config.privileges.drop_privileges { + let mut counter = 0usize; + loop { let sockets = num_bound_sockets.load(Ordering::SeqCst); if sockets == config.socket_workers { PrivDrop::default() - .chroot(config.privileges.chroot_path) - .user(config.privileges.user) + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) .apply()?; break; } ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } } } loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_connections_and_torrents(&state); + tasks::clean_connections(&state); + tasks::update_access_list(&config, &state); + + state.torrents.lock().clean(&config, &state.access_list); } } diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 6f64885..b1baf65 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -131,6 +131,8 @@ fn read_requests( let mut requests_received: usize = 0; let mut bytes_received: usize = 0; + let access_list_mode = config.access_list.mode; + loop { match socket.recv_from(&mut buffer[..]) { Ok((amt, src)) => { @@ -144,6 +146,18 @@ fn read_requests( } match request { + Ok(Request::Announce(AnnounceRequest { + info_hash, + transaction_id, + .. + })) if !state.access_list.allows(access_list_mode, &info_hash.0) => { + let response = Response::Error(ErrorResponse { + transaction_id, + message: "Info hash not allowed".into(), + }); + + local_responses.push((response, src)) + } Ok(request) => { requests.push((request, src)); } @@ -152,9 +166,9 @@ fn read_requests( if let Some(transaction_id) = err.transaction_id { let opt_message = if err.error.is_some() { - Some("Parse error".to_string()) + Some("Parse error".into()) } else if let Some(message) = err.message { - Some(message) + Some(message.into()) } else { None }; diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 8a1a153..83acd3b 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -3,53 +3,29 @@ use std::time::Instant; use histogram::Histogram; +use aquatic_common::access_list::AccessListMode; + use crate::common::*; use crate::config::Config; -pub fn clean_connections_and_torrents(state: &State) { - let now = Instant::now(); - - { - let mut connections = state.connections.lock(); - - connections.retain(|_, v| v.0 > now); - connections.shrink_to_fit(); +pub fn update_access_list(config: &Config, state: &State) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = state.access_list.update_from_path(&config.access_list.path) { + ::log::error!("Update access list from path: {:?}", err); + } + } + AccessListMode::Off => {} } - - let mut torrents = state.torrents.lock(); - - clean_torrent_map(&mut torrents.ipv4, now); - clean_torrent_map(&mut torrents.ipv6, now); } -#[inline] -fn clean_torrent_map(torrents: &mut TorrentMap, now: Instant) { - torrents.retain(|_, torrent| { - let num_seeders = &mut torrent.num_seeders; - let num_leechers = &mut torrent.num_leechers; +pub fn clean_connections(state: &State) { + let now = Instant::now(); - torrent.peers.retain(|_, peer| { - let keep = peer.valid_until.0 > now; + let mut connections = state.connections.lock(); - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - !torrent.peers.is_empty() - }); - - torrents.shrink_to_fit(); + connections.retain(|_, v| v.0 > now); + connections.shrink_to_fit(); } pub fn gather_and_print_statistics(state: &State, config: &Config) { diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index abe76ff..b8a514a 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::convert::TryInto; use std::io::{self, Cursor, Write}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; @@ -37,7 +38,7 @@ pub struct ScrapeResponse { #[derive(PartialEq, Eq, Clone, Debug)] pub struct ErrorResponse { pub transaction_id: TransactionId, - pub message: String, + pub message: Cow<'static, str>, } #[derive(PartialEq, Eq, Clone, Debug)] @@ -224,7 +225,9 @@ impl Response { Ok((ErrorResponse { transaction_id: TransactionId(transaction_id), - message: String::from_utf8_lossy(&inner[position..]).into(), + message: String::from_utf8_lossy(&inner[position..]) + .into_owned() + .into(), }) .into()) } @@ -262,7 +265,7 @@ impl Response { } _ => Ok((ErrorResponse { transaction_id: TransactionId(transaction_id), - message: "Invalid action".to_string(), + message: "Invalid action".into(), }) .into()), } diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index b524390..4f30d8b 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -1,6 +1,8 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; +use std::time::Instant; +use aquatic_common::access_list::AccessList; use crossbeam_channel::{Receiver, Sender}; use hashbrown::HashMap; use indexmap::IndexMap; @@ -12,6 +14,8 @@ pub use aquatic_common::ValidUntil; use aquatic_ws_protocol::*; +use crate::config::Config; + pub const LISTENER_TOKEN: Token = Token(0); pub const CHANNEL_TOKEN: Token = Token(1); @@ -84,14 +88,62 @@ pub struct TorrentMaps { pub ipv6: TorrentMap, } +impl TorrentMaps { + pub fn clean(&mut self, config: &Config, access_list: &Arc) { + Self::clean_torrent_map(config, access_list, &mut self.ipv4); + Self::clean_torrent_map(config, access_list, &mut self.ipv6); + } + + fn clean_torrent_map( + config: &Config, + access_list: &Arc, + torrent_map: &mut TorrentMap, + ) { + let now = Instant::now(); + + torrent_map.retain(|info_hash, torrent_data| { + if !access_list.allows(config.access_list.mode, &info_hash.0) { + return false; + } + + let num_seeders = &mut torrent_data.num_seeders; + let num_leechers = &mut torrent_data.num_leechers; + + torrent_data.peers.retain(|_, peer| { + let keep = peer.valid_until.0 >= now; + + if !keep { + match peer.status { + PeerStatus::Seeding => { + *num_seeders -= 1; + } + PeerStatus::Leeching => { + *num_leechers -= 1; + } + _ => (), + }; + } + + keep + }); + + !torrent_data.peers.is_empty() + }); + + torrent_map.shrink_to_fit(); + } +} + #[derive(Clone)] pub struct State { + pub access_list: Arc, pub torrent_maps: Arc>, } impl Default for State { fn default() -> Self { Self { + access_list: Arc::new(Default::default()), torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())), } } diff --git a/aquatic_ws/src/lib/config.rs b/aquatic_ws/src/lib/config.rs index 1be0c47..b7ef10c 100644 --- a/aquatic_ws/src/lib/config.rs +++ b/aquatic_ws/src/lib/config.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; +use aquatic_common::access_list::AccessListConfig; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -21,6 +22,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub statistics: StatisticsConfig, pub privileges: PrivilegeConfig, + pub access_list: AccessListConfig, } impl aquatic_cli_helpers::Config for Config { @@ -105,6 +107,7 @@ impl Default for Config { cleaning: CleaningConfig::default(), statistics: StatisticsConfig::default(), privileges: PrivilegeConfig::default(), + access_list: AccessListConfig::default(), } } } diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index e27c21b..f6ee599 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -24,12 +24,16 @@ pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker"; pub fn run(config: Config) -> anyhow::Result<()> { let state = State::default(); + tasks::update_access_list(&config, &state); + start_workers(config.clone(), state.clone())?; loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_torrents(&state); + tasks::update_access_list(&config, &state); + + state.torrent_maps.lock().clean(&config, &state.access_list); } } @@ -53,6 +57,7 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { for i in 0..config.socket_workers { let config = config.clone(); + let state = state.clone(); let socket_worker_statuses = socket_worker_statuses.clone(); let in_message_sender = in_message_sender.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone(); @@ -69,6 +74,7 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { .spawn(move || { network::run_socket_worker( config, + state, i, socket_worker_statuses, poll, diff --git a/aquatic_ws/src/lib/network/connection.rs b/aquatic_ws/src/lib/network/connection.rs index fbaf299..4769dc6 100644 --- a/aquatic_ws/src/lib/network/connection.rs +++ b/aquatic_ws/src/lib/network/connection.rs @@ -21,10 +21,10 @@ pub enum Stream { impl Stream { #[inline] - pub fn get_peer_addr(&self) -> SocketAddr { + pub fn get_peer_addr(&self) -> ::std::io::Result { match self { - Self::TcpStream(stream) => stream.peer_addr().unwrap(), - Self::TlsStream(stream) => stream.get_ref().peer_addr().unwrap(), + Self::TcpStream(stream) => stream.peer_addr(), + Self::TlsStream(stream) => stream.get_ref().peer_addr(), } } @@ -145,7 +145,7 @@ impl HandshakeMachine { ); (Some(Either::Right(Self::TlsStream(stream))), false) - }, + } Err(native_tls::HandshakeError::WouldBlock(handshake)) => { (Some(Either::Right(Self::TlsMidHandshake(handshake))), true) } @@ -162,21 +162,26 @@ impl HandshakeMachine { result: Result, HandshakeError>>, ) -> (Option>, bool) { match result { - Ok(mut ws) => { - let peer_addr = ws.get_mut().get_peer_addr(); + Ok(mut ws) => match ws.get_mut().get_peer_addr() { + Ok(peer_addr) => { + ::log::trace!( + "established ws handshake with peer with addr: {:?}", + peer_addr + ); - ::log::trace!( - "established ws handshake with peer with addr: {:?}", - peer_addr - ); + let established_ws = EstablishedWs { ws, peer_addr }; - let established_ws = EstablishedWs { - ws, - peer_addr, - }; + (Some(Either::Left(established_ws)), false) + } + Err(err) => { + ::log::info!( + "get_peer_addr failed during handshake, removing connection: {:?}", + err + ); - (Some(Either::Left(established_ws)), false) - } + (None, false) + } + }, Err(HandshakeError::Interrupted(handshake)) => ( Some(Either::Right(HandshakeMachine::WsMidHandshake(handshake))), true, diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index 4d0e457..f12859d 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -1,5 +1,6 @@ use std::io::ErrorKind; use std::time::Duration; +use std::vec::Drain; use crossbeam_channel::Receiver; use hashbrown::HashMap; @@ -23,6 +24,7 @@ use utils::*; pub fn run_socket_worker( config: Config, + state: State, socket_worker_index: usize, socket_worker_statuses: SocketWorkerStatuses, poll: Poll, @@ -36,6 +38,7 @@ pub fn run_socket_worker( run_poll_loop( config, + &state, socket_worker_index, poll, in_message_sender, @@ -53,6 +56,7 @@ pub fn run_socket_worker( pub fn run_poll_loop( config: Config, + state: &State, socket_worker_index: usize, mut poll: Poll, in_message_sender: InMessageSender, @@ -76,6 +80,7 @@ pub fn run_poll_loop( .unwrap(); let mut connections: ConnectionMap = HashMap::new(); + let mut local_responses = Vec::new(); let mut poll_token_counter = Token(0usize); let mut iter_counter = 0usize; @@ -100,7 +105,10 @@ pub fn run_poll_loop( ); } else if token != CHANNEL_TOKEN { run_handshakes_and_read_messages( + &config, + state, socket_worker_index, + &mut local_responses, &in_message_sender, &opt_tls_acceptor, &mut poll, @@ -110,7 +118,12 @@ pub fn run_poll_loop( ); } - send_out_messages(&mut poll, &out_message_receiver, &mut connections); + send_out_messages( + &mut poll, + local_responses.drain(..), + &out_message_receiver, + &mut connections, + ); } // Remove inactive connections, but not every iteration @@ -165,7 +178,10 @@ fn accept_new_streams( /// On the stream given by poll_token, get TLS (if requested) and tungstenite /// up and running, then read messages and pass on through channel. pub fn run_handshakes_and_read_messages( + config: &Config, + state: &State, socket_worker_index: usize, + local_responses: &mut Vec<(ConnectionMeta, OutMessage)>, in_message_sender: &InMessageSender, opt_tls_acceptor: &Option, // If set, run TLS poll: &mut Poll, @@ -173,6 +189,8 @@ pub fn run_handshakes_and_read_messages( poll_token: Token, valid_until: ValidUntil, ) { + let access_list_mode = config.access_list.mode; + loop { if let Some(established_ws) = connections .get_mut(&poll_token) @@ -188,21 +206,50 @@ pub fn run_handshakes_and_read_messages( match established_ws.ws.read_message() { Ok(ws_message) => { - if let Ok(in_message) = InMessage::from_ws_message(ws_message) { - let naive_peer_addr = established_ws.peer_addr; - let converted_peer_ip = convert_ipv4_mapped_ipv6(naive_peer_addr.ip()); + let naive_peer_addr = established_ws.peer_addr; + let converted_peer_ip = convert_ipv4_mapped_ipv6(naive_peer_addr.ip()); - let meta = ConnectionMeta { - worker_index: socket_worker_index, - poll_token, - naive_peer_addr, - converted_peer_ip, - }; + let meta = ConnectionMeta { + worker_index: socket_worker_index, + poll_token, + naive_peer_addr, + converted_peer_ip, + }; - debug!("read message"); + debug!("read message"); - if let Err(err) = in_message_sender.send((meta, in_message)) { - error!("InMessageSender: couldn't send message: {:?}", err); + match InMessage::from_ws_message(ws_message) { + Ok(InMessage::AnnounceRequest(ref request)) + if !state + .access_list + .allows(access_list_mode, &request.info_hash.0) => + { + let out_message = OutMessage::ErrorResponse(ErrorResponse { + failure_reason: "Info hash not allowed".into(), + action: Some(ErrorResponseAction::Announce), + info_hash: Some(request.info_hash), + }); + + local_responses.push((meta, out_message)); + } + Ok(in_message) => { + if let Err(err) = in_message_sender.send((meta, in_message)) { + error!("InMessageSender: couldn't send message: {:?}", err); + } + } + Err(_) => { + // FIXME: maybe this condition just occurs when enough data hasn't been recevied? + /* + info!("error parsing message: {:?}", err); + + let out_message = OutMessage::ErrorResponse(ErrorResponse { + failure_reason: "Error parsing message".into(), + action: None, + info_hash: None, + }); + + local_responses.push((meta, out_message)); + */ } } } @@ -242,12 +289,13 @@ pub fn run_handshakes_and_read_messages( /// Read messages from channel, send to peers pub fn send_out_messages( poll: &mut Poll, + local_responses: Drain<(ConnectionMeta, OutMessage)>, out_message_receiver: &Receiver<(ConnectionMeta, OutMessage)>, connections: &mut ConnectionMap, ) { let len = out_message_receiver.len(); - for (meta, out_message) in out_message_receiver.try_iter().take(len) { + for (meta, out_message) in local_responses.chain(out_message_receiver.try_iter().take(len)) { let opt_established_ws = connections .get_mut(&meta.poll_token) .and_then(Connection::get_established_ws); diff --git a/aquatic_ws/src/lib/tasks.rs b/aquatic_ws/src/lib/tasks.rs index 64b5338..55cb62d 100644 --- a/aquatic_ws/src/lib/tasks.rs +++ b/aquatic_ws/src/lib/tasks.rs @@ -1,45 +1,18 @@ -use std::time::Instant; - +use aquatic_common::access_list::AccessListMode; use histogram::Histogram; use crate::common::*; +use crate::config::Config; -pub fn clean_torrents(state: &State) { - fn clean_torrent_map(torrent_map: &mut TorrentMap) { - let now = Instant::now(); - - torrent_map.retain(|_, torrent_data| { - let num_seeders = &mut torrent_data.num_seeders; - let num_leechers = &mut torrent_data.num_leechers; - - torrent_data.peers.retain(|_, peer| { - let keep = peer.valid_until.0 >= now; - - if !keep { - match peer.status { - PeerStatus::Seeding => { - *num_seeders -= 1; - } - PeerStatus::Leeching => { - *num_leechers -= 1; - } - _ => (), - }; - } - - keep - }); - - !torrent_data.peers.is_empty() - }); - - torrent_map.shrink_to_fit(); +pub fn update_access_list(config: &Config, state: &State) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = state.access_list.update_from_path(&config.access_list.path) { + ::log::error!("Couldn't update access list: {:?}", err); + } + } + AccessListMode::Off => {} } - - let mut torrent_maps = state.torrent_maps.lock(); - - clean_torrent_map(&mut torrent_maps.ipv4); - clean_torrent_map(&mut torrent_maps.ipv6); } pub fn print_statistics(state: &State) { diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 3b36011..200d04f 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -12,6 +12,7 @@ pub struct Statistics { pub responses_offer: AtomicUsize, pub responses_answer: AtomicUsize, pub responses_scrape: AtomicUsize, + pub responses_error: AtomicUsize, } #[derive(Clone)] diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 68b5c89..27c51a8 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -85,13 +85,16 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { statistics.responses_answer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_scrape_per_second = statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_error_per_second = + statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_announce_per_second = responses_announce / interval_f64; let responses_per_second = responses_announce_per_second + responses_offer_per_second + responses_answer_per_second - + responses_scrape_per_second; + + responses_scrape_per_second + + responses_error_per_second; report_avg_response_vec.push(responses_per_second); @@ -105,6 +108,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { println!(" - Offer responses: {:.2}", responses_offer_per_second); println!(" - Answer responses: {:.2}", responses_answer_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second); + println!(" - Error responses: {:.2}", responses_error_per_second); let time_elapsed = start_time.elapsed(); let duration = Duration::from_secs(config.duration as u64); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 1611fb5..0a6cf8c 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -151,6 +151,16 @@ impl Connection { self.can_send = true; } + Ok(OutMessage::ErrorResponse(response)) => { + state + .statistics + .responses_error + .fetch_add(1, Ordering::SeqCst); + + eprintln!("received error response: {:?}", response.failure_reason); + + self.can_send = true; + } Err(err) => { eprintln!("error deserializing offer: {:?}", err); } diff --git a/aquatic_ws_protocol/src/serde_helpers.rs b/aquatic_ws_protocol/src/common.rs similarity index 66% rename from aquatic_ws_protocol/src/serde_helpers.rs rename to aquatic_ws_protocol/src/common.rs index a5848ae..93565af 100644 --- a/aquatic_ws_protocol/src/serde_helpers.rs +++ b/aquatic_ws_protocol/src/common.rs @@ -1,6 +1,81 @@ -use serde::{de::Visitor, Deserializer, Serializer}; +use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; -use super::{AnnounceAction, ScrapeAction}; +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct PeerId( + #[serde( + deserialize_with = "deserialize_20_bytes", + serialize_with = "serialize_20_bytes" + )] + pub [u8; 20], +); + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct InfoHash( + #[serde( + deserialize_with = "deserialize_20_bytes", + serialize_with = "serialize_20_bytes" + )] + pub [u8; 20], +); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct OfferId( + #[serde( + deserialize_with = "deserialize_20_bytes", + serialize_with = "serialize_20_bytes" + )] + pub [u8; 20], +); + +/// Some kind of nested structure from https://www.npmjs.com/package/simple-peer +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct JsonValue(pub ::serde_json::Value); + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AnnounceAction; + +impl Serialize for AnnounceAction { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str("announce") + } +} + +impl<'de> Deserialize<'de> for AnnounceAction { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_str(AnnounceActionVisitor) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ScrapeAction; + +impl Serialize for ScrapeAction { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str("scrape") + } +} + +impl<'de> Deserialize<'de> for ScrapeAction { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_str(ScrapeActionVisitor) + } +} pub struct AnnounceActionVisitor; @@ -44,7 +119,7 @@ impl<'de> Visitor<'de> for ScrapeActionVisitor { } } -pub fn serialize_20_bytes(data: &[u8; 20], serializer: S) -> Result +fn serialize_20_bytes(data: &[u8; 20], serializer: S) -> Result where S: Serializer, { @@ -102,7 +177,7 @@ impl<'de> Visitor<'de> for TwentyByteVisitor { } #[inline] -pub fn deserialize_20_bytes<'de, D>(deserializer: D) -> Result<[u8; 20], D::Error> +fn deserialize_20_bytes<'de, D>(deserializer: D) -> Result<[u8; 20], D::Error> where D: Deserializer<'de>, { @@ -113,7 +188,7 @@ where mod tests { use quickcheck_macros::quickcheck; - use crate::InfoHash; + use crate::common::InfoHash; fn info_hash_from_bytes(bytes: &[u8]) -> InfoHash { let mut arr = [0u8; 20]; diff --git a/aquatic_ws_protocol/src/lib.rs b/aquatic_ws_protocol/src/lib.rs index 8af7c15..e2971f7 100644 --- a/aquatic_ws_protocol/src/lib.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -1,281 +1,22 @@ -use anyhow::Context; -use hashbrown::HashMap; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +//! WebTorrent protocol implementation +//! +//! Typical announce workflow: +//! - Peer A sends announce request with info hash and offers +//! - Tracker sends on offers to other peers announcing with that info hash and +//! sends back announce response to peer A +//! - Tracker receives answers to those offers from other peers and send them +//! on to peer A +//! +//! Typical scrape workflow +//! - Peer sends scrape request and receives scrape response -mod serde_helpers; +pub mod common; +pub mod request; +pub mod response; -use serde_helpers::*; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct AnnounceAction; - -impl Serialize for AnnounceAction { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str("announce") - } -} - -impl<'de> Deserialize<'de> for AnnounceAction { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_str(AnnounceActionVisitor) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ScrapeAction; - -impl Serialize for ScrapeAction { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str("scrape") - } -} - -impl<'de> Deserialize<'de> for ScrapeAction { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - deserializer.deserialize_str(ScrapeActionVisitor) - } -} - -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct PeerId( - #[serde( - deserialize_with = "deserialize_20_bytes", - serialize_with = "serialize_20_bytes" - )] - pub [u8; 20], -); - -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct InfoHash( - #[serde( - deserialize_with = "deserialize_20_bytes", - serialize_with = "serialize_20_bytes" - )] - pub [u8; 20], -); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct OfferId( - #[serde( - deserialize_with = "deserialize_20_bytes", - serialize_with = "serialize_20_bytes" - )] - pub [u8; 20], -); - -/// Some kind of nested structure from https://www.npmjs.com/package/simple-peer -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct JsonValue(pub ::serde_json::Value); - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum AnnounceEvent { - Started, - Stopped, - Completed, - Update, -} - -impl Default for AnnounceEvent { - fn default() -> Self { - Self::Update - } -} - -/// Apparently, these are sent to a number of peers when they are set -/// in an AnnounceRequest -/// action = "announce" -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct MiddlemanOfferToPeer { - pub action: AnnounceAction, - /// Peer id of peer sending offer - /// Note: if equal to client peer_id, client ignores offer - pub peer_id: PeerId, - pub info_hash: InfoHash, - /// Gets copied from AnnounceRequestOffer - pub offer: JsonValue, - /// Gets copied from AnnounceRequestOffer - pub offer_id: OfferId, -} - -/// If announce request has answer = true, send this to peer with -/// peer id == "to_peer_id" field -/// Action field should be 'announce' -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct MiddlemanAnswerToPeer { - pub action: AnnounceAction, - /// Note: if equal to client peer_id, client ignores answer - pub peer_id: PeerId, - pub info_hash: InfoHash, - pub answer: JsonValue, - pub offer_id: OfferId, -} - -/// Element of AnnounceRequest.offers -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct AnnounceRequestOffer { - pub offer: JsonValue, - pub offer_id: OfferId, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct AnnounceRequest { - pub action: AnnounceAction, - pub info_hash: InfoHash, - pub peer_id: PeerId, - /// Just called "left" in protocol. Is set to None in some cases, such as - /// when opening a magnet link - #[serde(rename = "left")] - pub bytes_left: Option, - /// Can be empty. Then, default is "update" - #[serde(skip_serializing_if = "Option::is_none")] - pub event: Option, - - /// Only when this is an array offers are sent to random peers - /// Length of this is number of peers wanted? - /// Max length of this is 10 in reference client code - /// Not sent when announce event is stopped or completed - pub offers: Option>, - /// Seems to only get sent by client when sending offers, and is also same - /// as length of offers vector (or at least never less) - /// Max length of this is 10 in reference client code - /// Could probably be ignored, `offers.len()` should provide needed info - pub numwant: Option, - - /// If empty, send response before sending offers (or possibly "skip sending update back"?) - /// Else, send MiddlemanAnswerToPeer to peer with "to_peer_id" as peer_id. - /// I think using Option is good, it seems like this isn't always set - /// (same as `offers`) - pub answer: Option, - /// Likely undefined if !(answer == true) - pub to_peer_id: Option, - /// Sent if answer is set - pub offer_id: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct AnnounceResponse { - pub action: AnnounceAction, - pub info_hash: InfoHash, - /// Client checks if this is null, not clear why - pub complete: usize, - pub incomplete: usize, - #[serde(rename = "interval")] - pub announce_interval: usize, // Default 2 min probably -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum ScrapeRequestInfoHashes { - Single(InfoHash), - Multiple(Vec), -} - -impl ScrapeRequestInfoHashes { - pub fn as_vec(self) -> Vec { - match self { - Self::Single(info_hash) => vec![info_hash], - Self::Multiple(info_hashes) => info_hashes, - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ScrapeRequest { - pub action: ScrapeAction, - // If omitted, scrape for all torrents, apparently - // There is some kind of parsing here too which accepts a single info hash - // and puts it into a vector - #[serde(rename = "info_hash")] - pub info_hashes: Option, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ScrapeStatistics { - pub complete: usize, - pub incomplete: usize, - pub downloaded: usize, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ScrapeResponse { - pub action: ScrapeAction, - pub files: HashMap, - // Looks like `flags` field is ignored in reference client - // pub flags: HashMap, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum InMessage { - AnnounceRequest(AnnounceRequest), - ScrapeRequest(ScrapeRequest), -} - -impl InMessage { - #[inline] - pub fn to_ws_message(&self) -> ::tungstenite::Message { - ::tungstenite::Message::from(::serde_json::to_string(&self).unwrap()) - } - - #[inline] - pub fn from_ws_message(ws_message: tungstenite::Message) -> ::anyhow::Result { - use tungstenite::Message::Text; - - let mut text = if let Text(text) = ws_message { - text - } else { - return Err(anyhow::anyhow!("Message is not text")); - }; - - return ::simd_json::serde::from_str(&mut text).context("deserialize with serde"); - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum OutMessage { - Offer(MiddlemanOfferToPeer), - Answer(MiddlemanAnswerToPeer), - AnnounceResponse(AnnounceResponse), - ScrapeResponse(ScrapeResponse), -} - -impl OutMessage { - #[inline] - pub fn to_ws_message(&self) -> tungstenite::Message { - ::tungstenite::Message::from(::serde_json::to_string(&self).unwrap()) - } - - #[inline] - pub fn from_ws_message(message: ::tungstenite::Message) -> ::anyhow::Result { - use tungstenite::Message::{Binary, Text}; - - let mut text = match message { - Text(text) => text, - Binary(bytes) => String::from_utf8(bytes)?, - _ => return Err(anyhow::anyhow!("Message is neither text nor bytes")), - }; - - Ok(::simd_json::serde::from_str(&mut text)?) - } -} +pub use common::*; +pub use request::*; +pub use response::*; #[cfg(test)] mod tests { diff --git a/aquatic_ws_protocol/src/request/announce.rs b/aquatic_ws_protocol/src/request/announce.rs new file mode 100644 index 0000000..5634ca3 --- /dev/null +++ b/aquatic_ws_protocol/src/request/announce.rs @@ -0,0 +1,60 @@ +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AnnounceEvent { + Started, + Stopped, + Completed, + Update, +} + +impl Default for AnnounceEvent { + fn default() -> Self { + Self::Update + } +} + +/// Element of AnnounceRequest.offers +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AnnounceRequestOffer { + pub offer: JsonValue, + pub offer_id: OfferId, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AnnounceRequest { + pub action: AnnounceAction, + pub info_hash: InfoHash, + pub peer_id: PeerId, + /// Just called "left" in protocol. Is set to None in some cases, such as + /// when opening a magnet link + #[serde(rename = "left")] + pub bytes_left: Option, + /// Can be empty. Then, default is "update" + #[serde(skip_serializing_if = "Option::is_none")] + pub event: Option, + + /// Only when this is an array offers are sent to random peers + /// Length of this is number of peers wanted? + /// Max length of this is 10 in reference client code + /// Not sent when announce event is stopped or completed + pub offers: Option>, + /// Seems to only get sent by client when sending offers, and is also same + /// as length of offers vector (or at least never less) + /// Max length of this is 10 in reference client code + /// Could probably be ignored, `offers.len()` should provide needed info + pub numwant: Option, + + /// If empty, send response before sending offers (or possibly "skip sending update back"?) + /// Else, send MiddlemanAnswerToPeer to peer with "to_peer_id" as peer_id. + /// I think using Option is good, it seems like this isn't always set + /// (same as `offers`) + pub answer: Option, + /// Likely undefined if !(answer == true) + pub to_peer_id: Option, + /// Sent if answer is set + pub offer_id: Option, +} diff --git a/aquatic_ws_protocol/src/request/mod.rs b/aquatic_ws_protocol/src/request/mod.rs new file mode 100644 index 0000000..8f067c3 --- /dev/null +++ b/aquatic_ws_protocol/src/request/mod.rs @@ -0,0 +1,36 @@ +use anyhow::Context; +use serde::{Deserialize, Serialize}; + +pub mod announce; +pub mod scrape; + +pub use announce::*; +pub use scrape::*; + +/// Message received by tracker +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum InMessage { + AnnounceRequest(AnnounceRequest), + ScrapeRequest(ScrapeRequest), +} + +impl InMessage { + #[inline] + pub fn to_ws_message(&self) -> ::tungstenite::Message { + ::tungstenite::Message::from(::serde_json::to_string(&self).unwrap()) + } + + #[inline] + pub fn from_ws_message(ws_message: tungstenite::Message) -> ::anyhow::Result { + use tungstenite::Message::Text; + + let mut text = if let Text(text) = ws_message { + text + } else { + return Err(anyhow::anyhow!("Message is not text")); + }; + + return ::simd_json::serde::from_str(&mut text).context("deserialize with serde"); + } +} diff --git a/aquatic_ws_protocol/src/request/scrape.rs b/aquatic_ws_protocol/src/request/scrape.rs new file mode 100644 index 0000000..d016c82 --- /dev/null +++ b/aquatic_ws_protocol/src/request/scrape.rs @@ -0,0 +1,29 @@ +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum ScrapeRequestInfoHashes { + Single(InfoHash), + Multiple(Vec), +} + +impl ScrapeRequestInfoHashes { + pub fn as_vec(self) -> Vec { + match self { + Self::Single(info_hash) => vec![info_hash], + Self::Multiple(info_hashes) => info_hashes, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ScrapeRequest { + pub action: ScrapeAction, + // If omitted, scrape for all torrents, apparently + // There is some kind of parsing here too which accepts a single info hash + // and puts it into a vector + #[serde(rename = "info_hash")] + pub info_hashes: Option, +} diff --git a/aquatic_ws_protocol/src/response/announce.rs b/aquatic_ws_protocol/src/response/announce.rs new file mode 100644 index 0000000..25d4b9e --- /dev/null +++ b/aquatic_ws_protocol/src/response/announce.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AnnounceResponse { + pub action: AnnounceAction, + pub info_hash: InfoHash, + /// Client checks if this is null, not clear why + pub complete: usize, + pub incomplete: usize, + #[serde(rename = "interval")] + pub announce_interval: usize, // Default 2 min probably +} diff --git a/aquatic_ws_protocol/src/response/answer.rs b/aquatic_ws_protocol/src/response/answer.rs new file mode 100644 index 0000000..3846c7e --- /dev/null +++ b/aquatic_ws_protocol/src/response/answer.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +/// If announce request has answer = true, send this to peer with +/// peer id == "to_peer_id" field +/// Action field should be 'announce' +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct MiddlemanAnswerToPeer { + pub action: AnnounceAction, + /// Note: if equal to client peer_id, client ignores answer + pub peer_id: PeerId, + pub info_hash: InfoHash, + pub answer: JsonValue, + pub offer_id: OfferId, +} diff --git a/aquatic_ws_protocol/src/response/error.rs b/aquatic_ws_protocol/src/response/error.rs new file mode 100644 index 0000000..c5be603 --- /dev/null +++ b/aquatic_ws_protocol/src/response/error.rs @@ -0,0 +1,24 @@ +use std::borrow::Cow; + +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ErrorResponseAction { + Announce, + Scrape, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ErrorResponse { + #[serde(rename = "failure reason")] + pub failure_reason: Cow<'static, str>, + /// Action of original request + #[serde(skip_serializing_if = "Option::is_none")] + pub action: Option, + // Should not be renamed + #[serde(skip_serializing_if = "Option::is_none")] + pub info_hash: Option, +} diff --git a/aquatic_ws_protocol/src/response/mod.rs b/aquatic_ws_protocol/src/response/mod.rs new file mode 100644 index 0000000..363eeae --- /dev/null +++ b/aquatic_ws_protocol/src/response/mod.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; + +pub mod announce; +pub mod answer; +pub mod error; +pub mod offer; +pub mod scrape; + +pub use announce::*; +pub use answer::*; +pub use error::*; +pub use offer::*; +pub use scrape::*; + +/// Message sent by tracker +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum OutMessage { + Offer(MiddlemanOfferToPeer), + Answer(MiddlemanAnswerToPeer), + AnnounceResponse(AnnounceResponse), + ScrapeResponse(ScrapeResponse), + ErrorResponse(ErrorResponse), +} + +impl OutMessage { + #[inline] + pub fn to_ws_message(&self) -> tungstenite::Message { + ::tungstenite::Message::from(::serde_json::to_string(&self).unwrap()) + } + + #[inline] + pub fn from_ws_message(message: ::tungstenite::Message) -> ::anyhow::Result { + use tungstenite::Message::{Binary, Text}; + + let mut text = match message { + Text(text) => text, + Binary(bytes) => String::from_utf8(bytes)?, + _ => return Err(anyhow::anyhow!("Message is neither text nor bytes")), + }; + + Ok(::simd_json::serde::from_str(&mut text)?) + } +} diff --git a/aquatic_ws_protocol/src/response/offer.rs b/aquatic_ws_protocol/src/response/offer.rs new file mode 100644 index 0000000..2c47abc --- /dev/null +++ b/aquatic_ws_protocol/src/response/offer.rs @@ -0,0 +1,19 @@ +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +/// Apparently, these are sent to a number of peers when they are set +/// in an AnnounceRequest +/// action = "announce" +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct MiddlemanOfferToPeer { + pub action: AnnounceAction, + /// Peer id of peer sending offer + /// Note: if equal to client peer_id, client ignores offer + pub peer_id: PeerId, + pub info_hash: InfoHash, + /// Gets copied from AnnounceRequestOffer + pub offer: JsonValue, + /// Gets copied from AnnounceRequestOffer + pub offer_id: OfferId, +} diff --git a/aquatic_ws_protocol/src/response/scrape.rs b/aquatic_ws_protocol/src/response/scrape.rs new file mode 100644 index 0000000..d7ace9f --- /dev/null +++ b/aquatic_ws_protocol/src/response/scrape.rs @@ -0,0 +1,19 @@ +use hashbrown::HashMap; +use serde::{Deserialize, Serialize}; + +use crate::common::*; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ScrapeResponse { + pub action: ScrapeAction, + pub files: HashMap, + // Looks like `flags` field is ignored in reference client + // pub flags: HashMap, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ScrapeStatistics { + pub complete: usize, + pub incomplete: usize, + pub downloaded: usize, +}