diff --git a/.github/actions/test-transfer/action.yml b/.github/actions/test-transfer/action.yml index aa5afef..c04e4e8 100644 --- a/.github/actions/test-transfer/action.yml +++ b/.github/actions/test-transfer/action.yml @@ -1,8 +1,8 @@ name: 'test-transfer' description: 'test aquatic file transfer' outputs: - http_ipv4: - description: 'HTTP IPv4 status' + # http_ipv4: + # description: 'HTTP IPv4 status' http_tls_ipv4: description: 'HTTP IPv4 over TLS status' udp_ipv4: diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index 1cdc7b0..d6a7eea 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -50,6 +50,7 @@ $SUDO echo "127.0.0.1 example.com" >> /etc/hosts openssl ecparam -genkey -name prime256v1 -out key.pem openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com" openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt +openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8 # rustls $SUDO cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt $SUDO update-ca-certificates @@ -60,19 +61,19 @@ openssl pkcs12 -export -passout "pass:p" -out identity.pfx -inkey key.pem -in ce cargo build --bin aquatic -echo "log_level = 'debug' - -[network] -address = '127.0.0.1:3000'" > http.toml -./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 & +# echo "log_level = 'debug' +# +# [network] +# address = '127.0.0.1:3000'" > http.toml +# ./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 & echo "log_level = 'debug' [network] address = '127.0.0.1:3001' use_tls = true -tls_pkcs12_path = './identity.pfx' -tls_pkcs12_password = 'p' +tls_certificate_path = './cert.crt' +tls_private_key_path = './key.pk8' " > tls.toml ./target/debug/aquatic http -c tls.toml > "$HOME/tls.log" 2>&1 & @@ -100,12 +101,12 @@ mkdir torrents # Create torrents -echo "http-test-ipv4" > seed/http-test-ipv4 +# echo "http-test-ipv4" > seed/http-test-ipv4 echo "tls-test-ipv4" > seed/tls-test-ipv4 echo "udp-test-ipv4" > seed/udp-test-ipv4 echo "wss-test-ipv4" > seed/wss-test-ipv4 -mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4" +# mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4" mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4" mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4" mktorrent -p -o "torrents/wss-ipv4.torrent" -a "wss://example.com:3002" "seed/wss-test-ipv4" @@ -148,7 +149,7 @@ cd .. # Check for completion -HTTP_IPv4="Failed" +# HTTP_IPv4="Ok" TLS_IPv4="Failed" UDP_IPv4="Failed" WSS_IPv4="Failed" @@ -159,14 +160,14 @@ echo "Watching for finished files.." while [ $i -lt 60 ] do - if test -f "leech/http-test-ipv4"; then - if grep -q "http-test-ipv4" "leech/http-test-ipv4"; then - if [ "$HTTP_IPv4" != "Ok" ]; then - HTTP_IPv4="Ok" - echo "HTTP_IPv4 is Ok" - fi - fi - fi + # if test -f "leech/http-test-ipv4"; then + # if grep -q "http-test-ipv4" "leech/http-test-ipv4"; then + # if [ "$HTTP_IPv4" != "Ok" ]; then + # HTTP_IPv4="Ok" + # echo "HTTP_IPv4 is Ok" + # fi + # fi + # fi if test -f "leech/tls-test-ipv4"; then if grep -q "tls-test-ipv4" "leech/tls-test-ipv4"; then if [ "$TLS_IPv4" != "Ok" ]; then @@ -192,7 +193,8 @@ do fi fi - if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then + # if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then + if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then break fi @@ -203,14 +205,14 @@ done echo "Waited for $i seconds" -echo "::set-output name=http_ipv4::$HTTP_IPv4" +# echo "::set-output name=http_ipv4::$HTTP_IPv4" echo "::set-output name=http_tls_ipv4::$TLS_IPv4" echo "::set-output name=udp_ipv4::$UDP_IPv4" echo "::set-output name=wss_ipv4::$WSS_IPv4" -echo "" -echo "# --- HTTP log --- #" -cat "http.log" +# echo "" +# echo "# --- HTTP log --- #" +# cat "http.log" sleep 1 @@ -246,11 +248,12 @@ sleep 1 echo "" echo "# --- Test results --- #" -echo "HTTP (IPv4): $HTTP_IPv4" +# echo "HTTP (IPv4): $HTTP_IPv4" echo "HTTP over TLS (IPv4): $TLS_IPv4" echo "UDP (IPv4): $UDP_IPv4" echo "WSS (IPv4): $WSS_IPv4" -if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then +# if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then +if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then exit 1 fi \ No newline at end of file diff --git a/.github/workflows/test-transfer.yml b/.github/workflows/test-transfer.yml index 2679356..c1ae76b 100644 --- a/.github/workflows/test-transfer.yml +++ b/.github/workflows/test-transfer.yml @@ -1,4 +1,4 @@ -name: "Test HTTP, UDP and WSS file transfer" +name: "Test UDP, TLS and WSS file transfer" on: push: @@ -9,7 +9,7 @@ on: jobs: test-transfer-http: runs-on: ubuntu-latest - name: "Test BitTorrent file transfer over HTTP (with and without TLS), UDP and WSS" + name: "Test BitTorrent file transfer over UDP, TLS and WSS" timeout-minutes: 20 container: image: rust:1-bullseye diff --git a/Cargo.lock b/Cargo.lock index c83d439..826d684 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ dependencies = [ "hashbrown 0.11.2", "hex", "indexmap", + "privdrop", "rand", "serde", ] @@ -91,25 +92,27 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", - "crossbeam-channel", + "cfg-if", + "core_affinity", "either", + "futures-lite", + "glommio", "hashbrown 0.11.2", - "histogram", "indexmap", "itoa", "log", "memchr", "mimalloc", - "mio", - "native-tls", "parking_lot", "privdrop", "quickcheck", "quickcheck_macros", "rand", + "rustls", + "rustls-pemfile", "serde", + "slab", "smartstring", - "socket2 0.4.2", ] [[package]] @@ -119,13 +122,15 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_http_protocol", + "futures-lite", + "glommio", "hashbrown 0.11.2", "mimalloc", - "mio", "quickcheck", "quickcheck_macros", "rand", "rand_distr", + "rustls", "serde", ] @@ -172,7 +177,6 @@ dependencies = [ "mimalloc", "mio", "parking_lot", - "privdrop", "quickcheck", "quickcheck_macros", "rand", @@ -1542,6 +1546,21 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "rlimit" version = "0.6.2" @@ -1566,6 +1585,27 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.5" @@ -1603,6 +1643,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.4.2" @@ -1777,6 +1827,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "static_assertions" version = "1.1.0" @@ -1996,6 +2052,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.2.2" @@ -2131,6 +2193,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.2.8" diff --git a/README.md b/README.md index 895292e..9bc7e20 100644 --- a/README.md +++ b/README.md @@ -2,70 +2,77 @@ [![CargoBuildAndTest](https://github.com/greatest-ape/aquatic/actions/workflows/cargo-build-and-test.yml/badge.svg)](https://github.com/greatest-ape/aquatic/actions/workflows/cargo-build-and-test.yml) [![Test HTTP, UDP and WSS file transfer](https://github.com/greatest-ape/aquatic/actions/workflows/test-transfer.yml/badge.svg)](https://github.com/greatest-ape/aquatic/actions/workflows/test-transfer.yml) -Blazingly fast, multi-threaded BitTorrent tracker written in Rust. +Blazingly fast, multi-threaded BitTorrent tracker written in Rust, consisting +of sub-implementations for different protocols: -Consists of three sub-implementations for different protocols: - * `aquatic_udp`: BitTorrent over UDP. Implementation achieves 45% higher throughput - than opentracker (see benchmarks below) - * `aquatic_http`: BitTorrent over HTTP/TLS (slightly experimental) - * `aquatic_ws`: WebTorrent (experimental) +[BitTorrent over UDP]: https://libtorrent.org/udp_tracker_protocol.html +[BitTorrent over HTTP]: https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol +[WebTorrent]: https://github.com/webtorrent +[rustls]: https://github.com/rustls/rustls +[native-tls]: https://github.com/sfackler/rust-native-tls +[mio]: https://github.com/tokio-rs/mio +[glommio]: https://github.com/DataDog/glommio -## Copyright and license +| Name | Protocol | OS requirements | +|--------------|------------------------------------------------|-----------------------------------------------------------------| +| aquatic_udp | [BitTorrent over UDP] | Cross-platform with [mio] (default) / Linux 5.8+ with [glommio] | +| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ | +| aquatic_ws | [WebTorrent], plain or with TLS ([native-tls]) | Cross-platform | -Copyright (c) 2020-2021 Joakim FrostegÄrd +## Usage -Distributed under Apache 2.0 license (details in `LICENSE` file.) - -## Installation prerequisites +### Prerequisites - Install Rust with [rustup](https://rustup.rs/) (stable is recommended) - Install cmake with your package manager (e.g., `apt-get install cmake`) -- On GNU/Linux, also install the OpenSSL components necessary for dynamic - linking (e.g., `apt-get install libssl-dev`) -- Clone the git repository and refer to the next section. +- If you want to run aquatic_ws and are on Linux or BSD, install OpenSSL + components necessary for dynamic linking (e.g., `apt-get install libssl-dev`) +- Clone this git repository and enter it -## Compile and run +### Compiling -To compile the master executable for all protocols, run: +Compile the implementations that you are interested in: ```sh -./scripts/build-aquatic.sh +cargo build --release -p aquatic_udp +cargo build --release -p aquatic_udp --features "with-glommio" --no-default-features +cargo build --release -p aquatic_http +cargo build --release -p aquatic_ws ``` -To start the tracker for a protocol with default settings, run: +### Running + +Begin by generating configuration files. They differ between protocols. ```sh -./target/release/aquatic udp -./target/release/aquatic http -./target/release/aquatic ws +./target/release/aquatic_udp -p > "aquatic-udp-config.toml" +./target/release/aquatic_http -p > "aquatic-http-config.toml" +./target/release/aquatic_ws -p > "aquatic-ws-config.toml" ``` -To print default settings to standard output, pass the "-p" flag to the binary: +Make adjustments to the files. The values you will most likely want to adjust +are `socket_workers` (number of threads reading from and writing to sockets) +and `address` under the `network` section (listening address). This goes for +all three protocols. + +`aquatic_http` requires configuring a TLS certificate file and a private key file +to run. More information is available futher down in this document. + +Once done, run the tracker: ```sh -./target/release/aquatic udp -p -./target/release/aquatic http -p -./target/release/aquatic ws -p +./target/release/aquatic_udp -c "aquatic-udp-config.toml" +./target/release/aquatic_http -c "aquatic-http-config.toml" +./target/release/aquatic_ws -c "aquatic-ws-config.toml" ``` -Note that the configuration files differ between protocols. +More documentation of configuration file values might be available under +`src/lib/config.rs` in crates `aquatic_udp`, `aquatic_http`, `aquatic_ws`. -To adjust the settings, save the output of the relevant previous command to a -file and make your changes. Then run `aquatic` with a "-c" argument pointing to -the file, e.g.: +#### General settings -```sh -./target/release/aquatic udp -c "/path/to/aquatic-udp-config.toml" -./target/release/aquatic http -c "/path/to/aquatic-http-config.toml" -./target/release/aquatic ws -c "/path/to/aquatic-ws-config.toml" -``` - -The configuration file values you will most likely want to adjust are -`socket_workers` (number of threads reading from and writing to sockets) and -`address` under the `network` section (listening address). This goes for all -three protocols. - -Access control by info hash is supported for all protocols. Relevant part of configuration: +Access control by info hash is supported for all protocols. The relevant part +of configuration is: ```toml [access_list] @@ -73,9 +80,6 @@ mode = 'off' # Change to 'black' (blacklist) or 'white' (whitelist) path = '' # Path to text file with newline-delimited hex-encoded info hashes ``` -Some more documentation of configuration file values might be available under -`src/lib/config.rs` in crates `aquatic_udp`, `aquatic_http`, `aquatic_ws`. - ## Details on implementations ### aquatic_udp: UDP BitTorrent tracker @@ -121,20 +125,14 @@ There is an alternative implementation that utilizes [io_uring] by running on [glommio]. It only runs on Linux and requires a recent kernel (version 5.8 or later). In some cases, it performs even better than the cross-platform implementation. -To use it, pass the `with-glommio` feature when building, e.g.: - -```sh -cargo build --release -p aquatic_udp --features "with-glommio" --no-default-features -./target/release/aquatic_udp -``` - ### aquatic_http: HTTP BitTorrent tracker -Aims for compatibility with the HTTP BitTorrent protocol, as described -[here](https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol), -including TLS and scrape request support. There are some exceptions: +[HTTP BitTorrent protocol]: https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol - * Doesn't track of the number of torrent downloads (0 is always sent). +Aims for compatibility with the [HTTP BitTorrent protocol], with some exceptions: + + * Only runs over TLS + * Doesn't track of the number of torrent downloads (0 is always sent) * Doesn't allow full scrapes, i.e. of all registered info hashes `aquatic_http` has not been tested as much as `aquatic_udp` but likely works @@ -142,6 +140,28 @@ fine. #### TLS +A TLS certificate file (DER-encoded X.509) and a corresponding private key file +(DER-encoded ASN.1 in either PKCS#8 or PKCS#1 format) are required. Set their +paths in the configuration file, e.g.: + +```toml +[network] +address = '0.0.0.0:3000' +tls_certificate_path = './cert.crt' +tls_private_key_path = './key.pk8' +``` + +### aquatic_ws: WebTorrent tracker + +Aims for compatibility with [WebTorrent](https://github.com/webtorrent) +clients, including `wss` protocol support (WebSockets over TLS), with some +exceptions: + + * Doesn't track of the number of torrent downloads (0 is always sent). + * Doesn't allow full scrapes, i.e. of all registered info hashes + +#### TLS + To run over TLS, a pkcs12 file (`.pkx`) is needed. It can be generated from Let's Encrypt certificates as follows, assuming you are in the directory where they are stored: @@ -154,24 +174,14 @@ Enter a password when prompted. Then move `identity.pfx` somewhere suitable, and enter the path into the tracker configuration field `tls_pkcs12_path`. Set the password in the field `tls_pkcs12_password` and set `use_tls` to true. -### aquatic_ws: WebTorrent tracker - -Aims for compatibility with [WebTorrent](https://github.com/webtorrent) -clients, including `wss` protocol support (WebSockets over TLS), with some -exceptions: - - * Doesn't track of the number of torrent downloads (0 is always sent). - * Doesn't allow full scrapes, i.e. of all registered info hashes - -For information about running over TLS, please refer to the TLS subsection -of the `aquatic_http` section above. - #### Benchmarks [wt-tracker]: https://github.com/Novage/wt-tracker [bittorrent-tracker]: https://github.com/webtorrent/bittorrent-tracker -The following benchmark is not very realistic, as it simulates a small number of clients, each sending a large number of requests. Nonetheless, I think that it gives a useful indication of relative performance. +The following benchmark is not very realistic, as it simulates a small number +of clients, each sending a large number of requests. Nonetheless, I think that +it gives a useful indication of relative performance. Server responses per second, best result in bold: @@ -220,6 +230,12 @@ This design means little waiting for locks on internal state occurs, while network work can be efficiently distributed over multiple threads, making use of SO_REUSEPORT setting. +## Copyright and license + +Copyright (c) 2020-2021 Joakim FrostegÄrd + +Distributed under Apache 2.0 license (details in `LICENSE` file.) + ## Trivia The tracker is called aquatic because it thrives under a torrent of bits ;-) diff --git a/TODO.md b/TODO.md index d7ceb85..1a1d3ce 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,22 @@ # TODO +* aquatic_http glommio: + * optimize? + * get_peer_addr only once (takes 1.2% of runtime) + * queue response: allocating takes 2.8% of runtime + * clean out connections regularly + * timeout inside of task for "it took to long to receive request, send response"? + * handle panicked/cancelled tasks + * consider better error type for request parsing, so that better error + messages can be sent back (e.g., "full scrapes are not supported") + * Scrape: should stats with only zeroes be sent back for non-registered info hashes? + Relevant for mio implementation too. + * Don't return read request immediately. Set it as self.read_request + and continue looping to wait for any new input. Then check after + read_tls is finished. This might prevent issues when using plain HTTP + where only part of request is read, but that part is valid, and reading + is stopped, which might lead to various issues. + * aquatic_udp glommio * Add to file transfer CI * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 02fbe6e..e86e0d0 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -16,5 +16,6 @@ arc-swap = "1" hashbrown = "0.11.2" hex = "0.4" indexmap = "1" +privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs new file mode 100644 index 0000000..a13fc73 --- /dev/null +++ b/aquatic_common/src/cpu_pinning.rs @@ -0,0 +1,17 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct CpuPinningConfig { + pub active: bool, + pub offset: usize, +} + +impl Default for CpuPinningConfig { + fn default() -> Self { + Self { + active: false, + offset: 0, + } + } +} diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 3e90bbc..8866e45 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -5,6 +5,8 @@ use indexmap::IndexMap; use rand::Rng; pub mod access_list; +pub mod cpu_pinning; +pub mod privileges; /// Peer or connection valid until this instant /// diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs new file mode 100644 index 0000000..a898969 --- /dev/null +++ b/aquatic_common/src/privileges.rs @@ -0,0 +1,64 @@ +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use privdrop::PrivDrop; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct PrivilegeConfig { + /// Chroot and switch user after binding to sockets + pub drop_privileges: bool, + /// Chroot to this path + pub chroot_path: String, + /// User to switch to after chrooting + pub user: String, +} + +impl Default for PrivilegeConfig { + fn default() -> Self { + Self { + drop_privileges: false, + chroot_path: ".".to_string(), + user: "nobody".to_string(), + } + } +} + +pub fn drop_privileges_after_socket_binding( + config: &PrivilegeConfig, + num_bound_sockets: Arc, + target_num: usize, +) -> anyhow::Result<()> { + if config.drop_privileges { + let mut counter = 0usize; + + loop { + let num_bound = num_bound_sockets.load(Ordering::SeqCst); + + if num_bound == target_num { + PrivDrop::default() + .chroot(config.chroot_path.clone()) + .user(config.user.clone()) + .apply()?; + + break; + } + + ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } + } + } + + Ok(()) +} diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index f604413..e969d83 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -20,23 +20,25 @@ anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" -crossbeam-channel = "0.5" +cfg-if = "1" +core_affinity = "0.5" either = "1" +futures-lite = "1" +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } hashbrown = "0.11.2" -histogram = "0.6" indexmap = "1" itoa = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } memchr = "2" -mio = { version = "0.7", features = ["tcp", "os-poll", "os-util"] } -native-tls = "0.2" parking_lot = "0.11" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } +rustls = "0.20" +rustls-pemfile = "0.2" serde = { version = "1", features = ["derive"] } +slab = "0.4" smartstring = "0.2" -socket2 = { version = "0.4.1", features = ["all"] } [dev-dependencies] quickcheck = "1.0" diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index e4a9832..e814c36 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -1,27 +1,124 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::sync::Arc; use std::time::Instant; -use aquatic_common::access_list::{AccessList, AccessListArcSwap}; -use crossbeam_channel::{Receiver, Sender}; +use aquatic_common::access_list::AccessList; use either::Either; use hashbrown::HashMap; use indexmap::IndexMap; -use log::error; -use mio::Token; -use parking_lot::Mutex; use smartstring::{LazyCompact, SmartString}; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; use aquatic_http_protocol::common::*; -use aquatic_http_protocol::request::Request; -use aquatic_http_protocol::response::{Response, ResponsePeer}; +use aquatic_http_protocol::response::ResponsePeer; use crate::config::Config; -pub const LISTENER_TOKEN: Token = Token(0); -pub const CHANNEL_TOKEN: Token = Token(1); +use std::borrow::Borrow; +use std::cell::RefCell; +use std::rc::Rc; + +use futures_lite::AsyncBufReadExt; +use glommio::io::{BufferedFile, StreamReaderBuilder}; +use glommio::prelude::*; + +use aquatic_http_protocol::{ + request::{AnnounceRequest, ScrapeRequest}, + response::{AnnounceResponse, ScrapeResponse}, +}; + +#[derive(Copy, Clone, Debug)] +pub struct ConsumerId(pub usize); + +#[derive(Clone, Copy, Debug)] +pub struct ConnectionId(pub usize); + +#[derive(Debug)] +pub enum ChannelRequest { + Announce { + request: AnnounceRequest, + peer_addr: SocketAddr, + connection_id: ConnectionId, + response_consumer_id: ConsumerId, + }, + Scrape { + request: ScrapeRequest, + peer_addr: SocketAddr, + connection_id: ConnectionId, + response_consumer_id: ConsumerId, + }, +} + +#[derive(Debug)] +pub enum ChannelResponse { + Announce { + response: AnnounceResponse, + peer_addr: SocketAddr, + connection_id: ConnectionId, + }, + Scrape { + response: ScrapeResponse, + peer_addr: SocketAddr, + connection_id: ConnectionId, + }, +} + +impl ChannelResponse { + pub fn get_connection_id(&self) -> ConnectionId { + match self { + Self::Announce { connection_id, .. } => *connection_id, + Self::Scrape { connection_id, .. } => *connection_id, + } + } + pub fn get_peer_addr(&self) -> SocketAddr { + match self { + Self::Announce { peer_addr, .. } => *peer_addr, + Self::Scrape { peer_addr, .. } => *peer_addr, + } + } +} + +pub async fn update_access_list>( + config: C, + access_list: Rc>, +) { + if config.borrow().access_list.mode.is_on() { + match BufferedFile::open(&config.borrow().access_list.path).await { + Ok(file) => { + let mut reader = StreamReaderBuilder::new(file).build(); + let mut new_access_list = AccessList::default(); + + loop { + let mut buf = String::with_capacity(42); + + match reader.read_line(&mut buf).await { + Ok(_) => { + if let Err(err) = new_access_list.insert_from_line(&buf) { + ::log::error!( + "Couln't parse access list line '{}': {:?}", + buf, + err + ); + } + } + Err(err) => { + ::log::error!("Couln't read access list line {:?}", err); + + break; + } + } + + yield_if_needed().await; + } + + *access_list.borrow_mut() = new_access_list; + } + Err(err) => { + ::log::error!("Couldn't open access list file: {:?}", err) + } + }; + } +} pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {} @@ -32,15 +129,16 @@ impl Ip for Ipv6Addr {} pub struct ConnectionMeta { /// Index of socket worker responsible for this connection. Required for /// sending back response through correct channel to correct worker. - pub worker_index: usize, + pub response_consumer_id: ConsumerId, pub peer_addr: SocketAddr, - pub poll_token: Token, + /// Connection id local to socket worker + pub connection_id: ConnectionId, } #[derive(Clone, Copy, Debug)] pub struct PeerConnectionMeta { - pub worker_index: usize, - pub poll_token: Token, + pub response_consumer_id: ConsumerId, + pub connection_id: ConnectionId, pub peer_ip_address: I, } @@ -118,14 +216,14 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean(&mut self, config: &Config, access_list: &AccessList) { Self::clean_torrent_map(config, access_list, &mut self.ipv4); Self::clean_torrent_map(config, access_list, &mut self.ipv6); } fn clean_torrent_map( config: &Config, - access_list: &Arc, + access_list: &AccessList, torrent_map: &mut TorrentMap, ) { let now = Instant::now(); @@ -163,42 +261,34 @@ impl TorrentMaps { } } -#[derive(Clone)] -pub struct State { - pub access_list: Arc, - pub torrent_maps: Arc>, -} +pub fn num_digits_in_usize(mut number: usize) -> usize { + let mut num_digits = 1usize; -impl Default for State { - fn default() -> Self { - Self { - access_list: Arc::new(Default::default()), - torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())), - } - } -} + while number >= 10 { + num_digits += 1; -pub type RequestChannelSender = Sender<(ConnectionMeta, Request)>; -pub type RequestChannelReceiver = Receiver<(ConnectionMeta, Request)>; -pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>; - -#[derive(Clone)] -pub struct ResponseChannelSender { - senders: Vec>, -} - -impl ResponseChannelSender { - pub fn new(senders: Vec>) -> Self { - Self { senders } + number /= 10; } - #[inline] - pub fn send(&self, meta: ConnectionMeta, message: Response) { - if let Err(err) = self.senders[meta.worker_index].send((meta, message)) { - error!("ResponseChannelSender: couldn't send message: {:?}", err); - } - } + num_digits } -pub type SocketWorkerStatus = Option>; -pub type SocketWorkerStatuses = Arc>>; +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_num_digits_in_usize() { + let f = num_digits_in_usize; + + assert_eq!(f(0), 1); + assert_eq!(f(1), 1); + assert_eq!(f(9), 1); + assert_eq!(f(10), 2); + assert_eq!(f(11), 2); + assert_eq!(f(99), 2); + assert_eq!(f(100), 3); + assert_eq!(f(101), 3); + assert_eq!(f(1000), 4); + } +} diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index fe75932..e2e3131 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -1,6 +1,7 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, path::PathBuf}; -use aquatic_common::access_list::AccessListConfig; +use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -18,11 +19,10 @@ pub struct Config { pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, - pub handlers: HandlerConfig, pub cleaning: CleaningConfig, - pub statistics: StatisticsConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, + pub cpu_pinning: CpuPinningConfig, } impl aquatic_cli_helpers::Config for Config { @@ -31,25 +31,15 @@ impl aquatic_cli_helpers::Config for Config { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct TlsConfig { - pub use_tls: bool, - pub tls_pkcs12_path: String, - pub tls_pkcs12_password: String, -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, + pub tls_certificate_path: PathBuf, + pub tls_private_key_path: PathBuf, pub ipv6_only: bool, - #[serde(flatten)] - pub tls: TlsConfig, pub keep_alive: bool, - pub poll_event_capacity: usize, - pub poll_timeout_microseconds: u64, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -63,15 +53,6 @@ pub struct ProtocolConfig { pub peer_announce_interval: usize, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct HandlerConfig { - /// Maximum number of requests to receive from channel before locking - /// mutex and starting work - pub max_requests_per_iter: usize, - pub channel_recv_timeout_microseconds: u64, -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct CleaningConfig { @@ -83,24 +64,6 @@ pub struct CleaningConfig { pub max_connection_age: u64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct StatisticsConfig { - /// Print statistics this often (seconds). Don't print when set to zero. - pub interval: u64, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct PrivilegeConfig { - /// Chroot and switch user after binding to sockets - pub drop_privileges: bool, - /// Chroot to this path - pub chroot_path: String, - /// User to switch to after chrooting - pub user: String, -} - impl Default for Config { fn default() -> Self { Self { @@ -109,11 +72,10 @@ impl Default for Config { log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), - handlers: HandlerConfig::default(), cleaning: CleaningConfig::default(), - statistics: StatisticsConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), + cpu_pinning: Default::default(), } } } @@ -122,11 +84,10 @@ impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), + tls_certificate_path: "".into(), + tls_private_key_path: "".into(), ipv6_only: false, - tls: TlsConfig::default(), keep_alive: true, - poll_event_capacity: 4096, - poll_timeout_microseconds: 200_000, } } } @@ -141,15 +102,6 @@ impl Default for ProtocolConfig { } } -impl Default for HandlerConfig { - fn default() -> Self { - Self { - max_requests_per_iter: 10_000, - channel_recv_timeout_microseconds: 200, - } - } -} - impl Default for CleaningConfig { fn default() -> Self { Self { @@ -159,29 +111,3 @@ impl Default for CleaningConfig { } } } - -impl Default for StatisticsConfig { - fn default() -> Self { - Self { interval: 0 } - } -} - -impl Default for PrivilegeConfig { - fn default() -> Self { - Self { - drop_privileges: false, - chroot_path: ".".to_string(), - user: "nobody".to_string(), - } - } -} - -impl Default for TlsConfig { - fn default() -> Self { - Self { - use_tls: false, - tls_pkcs12_path: "".into(), - tls_pkcs12_password: "".into(), - } - } -} diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs deleted file mode 100644 index cd823d8..0000000 --- a/aquatic_http/src/lib/handler.rs +++ /dev/null @@ -1,311 +0,0 @@ -use std::collections::BTreeMap; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use std::sync::Arc; -use std::time::Duration; -use std::vec::Drain; - -use either::Either; -use mio::Waker; -use parking_lot::MutexGuard; -use rand::{rngs::SmallRng, Rng, SeedableRng}; - -use aquatic_common::extract_response_peers; -use aquatic_http_protocol::request::*; -use aquatic_http_protocol::response::*; - -use crate::common::*; -use crate::config::Config; - -pub fn run_request_worker( - config: Config, - state: State, - request_channel_receiver: RequestChannelReceiver, - response_channel_sender: ResponseChannelSender, - wakers: Vec>, -) { - let mut wake_socket_workers: Vec = (0..config.socket_workers).map(|_| false).collect(); - - let mut announce_requests = Vec::new(); - let mut scrape_requests = Vec::new(); - - let mut rng = SmallRng::from_entropy(); - - let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); - - loop { - let mut opt_torrent_map_guard: Option> = None; - - // If torrent state mutex is locked, just keep collecting requests - // and process them later. This can happen with either multiple - // request workers or while cleaning is underway. - for i in 0..config.handlers.max_requests_per_iter { - let opt_in_message = if i == 0 { - request_channel_receiver.recv().ok() - } else { - request_channel_receiver.recv_timeout(timeout).ok() - }; - - match opt_in_message { - Some((meta, Request::Announce(r))) => { - announce_requests.push((meta, r)); - } - Some((meta, Request::Scrape(r))) => { - scrape_requests.push((meta, r)); - } - None => { - if let Some(torrent_guard) = state.torrent_maps.try_lock() { - opt_torrent_map_guard = Some(torrent_guard); - - break; - } - } - } - } - - let mut torrent_map_guard = - opt_torrent_map_guard.unwrap_or_else(|| state.torrent_maps.lock()); - - handle_announce_requests( - &config, - &mut rng, - &mut torrent_map_guard, - &response_channel_sender, - &mut wake_socket_workers, - announce_requests.drain(..), - ); - - handle_scrape_requests( - &config, - &mut torrent_map_guard, - &response_channel_sender, - &mut wake_socket_workers, - scrape_requests.drain(..), - ); - - for (worker_index, wake) in wake_socket_workers.iter_mut().enumerate() { - if *wake { - if let Err(err) = wakers[worker_index].wake() { - ::log::error!("request handler couldn't wake poll: {:?}", err); - } - - *wake = false; - } - } - } -} - -pub fn handle_announce_requests( - config: &Config, - rng: &mut impl Rng, - torrent_maps: &mut TorrentMaps, - response_channel_sender: &ResponseChannelSender, - wake_socket_workers: &mut Vec, - requests: Drain<(ConnectionMeta, AnnounceRequest)>, -) { - let valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - for (meta, request) in requests { - let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); - - ::log::debug!("peer ip: {:?}", peer_ip); - - let response = match peer_ip { - IpAddr::V4(peer_ip_address) => { - let torrent_data: &mut TorrentData = - torrent_maps.ipv4.entry(request.info_hash).or_default(); - - let peer_connection_meta = PeerConnectionMeta { - worker_index: meta.worker_index, - poll_token: meta.poll_token, - peer_ip_address, - }; - - let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( - config, - rng, - peer_connection_meta, - torrent_data, - request, - valid_until, - ); - - let response = AnnounceResponse { - complete: seeders, - incomplete: leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(response_peers), - peers6: ResponsePeerListV6(vec![]), - }; - - Response::Announce(response) - } - IpAddr::V6(peer_ip_address) => { - let torrent_data: &mut TorrentData = - torrent_maps.ipv6.entry(request.info_hash).or_default(); - - let peer_connection_meta = PeerConnectionMeta { - worker_index: meta.worker_index, - poll_token: meta.poll_token, - peer_ip_address, - }; - - let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( - config, - rng, - peer_connection_meta, - torrent_data, - request, - valid_until, - ); - - let response = AnnounceResponse { - complete: seeders, - incomplete: leechers, - announce_interval: config.protocol.peer_announce_interval, - peers: ResponsePeerListV4(vec![]), - peers6: ResponsePeerListV6(response_peers), - }; - - Response::Announce(response) - } - }; - - response_channel_sender.send(meta, response); - wake_socket_workers[meta.worker_index] = true; - } -} - -/// Insert/update peer. Return num_seeders, num_leechers and response peers -fn upsert_peer_and_get_response_peers( - config: &Config, - rng: &mut impl Rng, - request_sender_meta: PeerConnectionMeta, - torrent_data: &mut TorrentData, - request: AnnounceRequest, - valid_until: ValidUntil, -) -> (usize, usize, Vec>) { - // Insert/update/remove peer who sent this request - - let peer_status = - PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); - - let peer = Peer { - connection_meta: request_sender_meta, - port: request.port, - status: peer_status, - valid_until, - }; - - ::log::debug!("peer: {:?}", peer); - - let ip_or_key = request - .key - .map(Either::Right) - .unwrap_or_else(|| Either::Left(request_sender_meta.peer_ip_address)); - - let peer_map_key = PeerMapKey { - peer_id: request.peer_id, - ip_or_key, - }; - - ::log::debug!("peer map key: {:?}", peer_map_key); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_map_key.clone(), peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_map_key.clone(), peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), - }; - - ::log::debug!("opt_removed_peer: {:?}", opt_removed_peer); - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - ::log::debug!("peer request numwant: {:?}", request.numwant); - - let max_num_peers_to_take = match request.numwant { - Some(0) | None => config.protocol.max_peers, - Some(numwant) => numwant.min(config.protocol.max_peers), - }; - - let response_peers: Vec> = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_map_key, - Peer::to_response_peer, - ); - - ( - torrent_data.num_seeders, - torrent_data.num_leechers, - response_peers, - ) -} - -pub fn handle_scrape_requests( - config: &Config, - torrent_maps: &mut TorrentMaps, - response_channel_sender: &ResponseChannelSender, - wake_socket_workers: &mut Vec, - requests: Drain<(ConnectionMeta, ScrapeRequest)>, -) { - for (meta, request) in requests { - let num_to_take = request - .info_hashes - .len() - .min(config.protocol.max_scrape_torrents); - - let mut response = ScrapeResponse { - files: BTreeMap::new(), - }; - - let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); - - // If request.info_hashes is empty, don't return scrape for all - // torrents, even though reference server does it. It is too expensive. - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash) { - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, - }; - - response.files.insert(info_hash, stats); - } - } - } else { - for info_hash in request.info_hashes.into_iter().take(num_to_take) { - if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash) { - let stats = ScrapeStatistics { - complete: torrent_data.num_seeders, - downloaded: 0, // No implementation planned - incomplete: torrent_data.num_leechers, - }; - - response.files.insert(info_hash, stats); - } - } - }; - - response_channel_sender.send(meta, Response::Scrape(response)); - wake_socket_workers[meta.worker_index] = true; - } -} diff --git a/aquatic_http/src/lib/handlers.rs b/aquatic_http/src/lib/handlers.rs new file mode 100644 index 0000000..d30fe64 --- /dev/null +++ b/aquatic_http/src/lib/handlers.rs @@ -0,0 +1,360 @@ +use std::collections::BTreeMap; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use either::Either; +use rand::Rng; + +use aquatic_common::extract_response_peers; +use aquatic_http_protocol::request::*; +use aquatic_http_protocol::response::*; + +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; + +use aquatic_common::access_list::AccessList; +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; +use rand::prelude::SmallRng; +use rand::SeedableRng; + +use crate::common::*; +use crate::config::Config; + +pub async fn run_request_worker( + config: Config, + request_mesh_builder: MeshBuilder, + response_mesh_builder: MeshBuilder, + access_list: AccessList, +) { + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); + + let response_senders = Rc::new(response_senders); + + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let access_list = Rc::new(RefCell::new(access_list)); + + // Periodically clean torrents and update access list + TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { + enclose!((config, torrents, access_list) move || async move { + update_access_list(&config, access_list.clone()).await; + + torrents.borrow_mut().clean(&config, &*access_list.borrow()); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + let mut handles = Vec::new(); + + for (_, receiver) in request_receivers.streams() { + let handle = spawn_local(handle_request_stream( + config.clone(), + torrents.clone(), + response_senders.clone(), + receiver, + )) + .detach(); + + handles.push(handle); + } + + for handle in handles { + handle.await; + } +} + +async fn handle_request_stream( + config: Config, + torrents: Rc>, + response_senders: Rc>, + mut stream: S, +) where + S: Stream + ::std::marker::Unpin, +{ + let mut rng = SmallRng::from_entropy(); + + let max_peer_age = config.cleaning.max_peer_age; + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + + TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { + enclose!((peer_valid_until) move || async move { + *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + + Some(Duration::from_secs(1)) + })() + })); + + while let Some(channel_request) = stream.next().await { + let (response, consumer_id) = match channel_request { + ChannelRequest::Announce { + request, + peer_addr, + response_consumer_id, + connection_id, + } => { + let meta = ConnectionMeta { + response_consumer_id, + connection_id, + peer_addr, + }; + + let response = handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + peer_valid_until.borrow().to_owned(), + meta, + request, + ); + + let response = ChannelResponse::Announce { + response, + peer_addr, + connection_id, + }; + + (response, response_consumer_id) + } + ChannelRequest::Scrape { + request, + peer_addr, + response_consumer_id, + connection_id, + } => { + let meta = ConnectionMeta { + response_consumer_id, + connection_id, + peer_addr, + }; + + let response = + handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request); + + let response = ChannelResponse::Scrape { + response, + peer_addr, + connection_id, + }; + + (response, response_consumer_id) + } + }; + + ::log::debug!("preparing to send response to channel: {:?}", response); + + if let Err(err) = response_senders.try_send_to(consumer_id.0, response) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + + yield_if_needed().await; + } +} + +pub fn handle_announce_request( + config: &Config, + rng: &mut impl Rng, + torrent_maps: &mut TorrentMaps, + valid_until: ValidUntil, + meta: ConnectionMeta, + request: AnnounceRequest, +) -> AnnounceResponse { + let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); + + ::log::debug!("peer ip: {:?}", peer_ip); + + match peer_ip { + IpAddr::V4(peer_ip_address) => { + let torrent_data: &mut TorrentData = + torrent_maps.ipv4.entry(request.info_hash).or_default(); + + let peer_connection_meta = PeerConnectionMeta { + response_consumer_id: meta.response_consumer_id, + connection_id: meta.connection_id, + peer_ip_address, + }; + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + peer_connection_meta, + torrent_data, + request, + valid_until, + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(response_peers), + peers6: ResponsePeerListV6(vec![]), + }; + + response + } + IpAddr::V6(peer_ip_address) => { + let torrent_data: &mut TorrentData = + torrent_maps.ipv6.entry(request.info_hash).or_default(); + + let peer_connection_meta = PeerConnectionMeta { + response_consumer_id: meta.response_consumer_id, + connection_id: meta.connection_id, + peer_ip_address, + }; + + let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers( + config, + rng, + peer_connection_meta, + torrent_data, + request, + valid_until, + ); + + let response = AnnounceResponse { + complete: seeders, + incomplete: leechers, + announce_interval: config.protocol.peer_announce_interval, + peers: ResponsePeerListV4(vec![]), + peers6: ResponsePeerListV6(response_peers), + }; + + response + } + } +} + +/// Insert/update peer. Return num_seeders, num_leechers and response peers +pub fn upsert_peer_and_get_response_peers( + config: &Config, + rng: &mut impl Rng, + request_sender_meta: PeerConnectionMeta, + torrent_data: &mut TorrentData, + request: AnnounceRequest, + valid_until: ValidUntil, +) -> (usize, usize, Vec>) { + // Insert/update/remove peer who sent this request + + let peer_status = + PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left)); + + let peer = Peer { + connection_meta: request_sender_meta, + port: request.port, + status: peer_status, + valid_until, + }; + + ::log::debug!("peer: {:?}", peer); + + let ip_or_key = request + .key + .map(Either::Right) + .unwrap_or_else(|| Either::Left(request_sender_meta.peer_ip_address)); + + let peer_map_key = PeerMapKey { + peer_id: request.peer_id, + ip_or_key, + }; + + ::log::debug!("peer map key: {:?}", peer_map_key); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_map_key.clone(), peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_map_key.clone(), peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key), + }; + + ::log::debug!("opt_removed_peer: {:?}", opt_removed_peer); + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + ::log::debug!("peer request numwant: {:?}", request.numwant); + + let max_num_peers_to_take = match request.numwant { + Some(0) | None => config.protocol.max_peers, + Some(numwant) => numwant.min(config.protocol.max_peers), + }; + + let response_peers: Vec> = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_map_key, + Peer::to_response_peer, + ); + + ( + torrent_data.num_seeders, + torrent_data.num_leechers, + response_peers, + ) +} + +pub fn handle_scrape_request( + config: &Config, + torrent_maps: &mut TorrentMaps, + meta: ConnectionMeta, + request: ScrapeRequest, +) -> ScrapeResponse { + let num_to_take = request + .info_hashes + .len() + .min(config.protocol.max_scrape_torrents); + + let mut response = ScrapeResponse { + files: BTreeMap::new(), + }; + + let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip()); + + // If request.info_hashes is empty, don't return scrape for all + // torrents, even though reference server does it. It is too expensive. + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.into_iter().take(num_to_take) { + if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash) { + let stats = ScrapeStatistics { + complete: torrent_data.num_seeders, + downloaded: 0, // No implementation planned + incomplete: torrent_data.num_leechers, + }; + + response.files.insert(info_hash, stats); + } + } + } else { + for info_hash in request.info_hashes.into_iter().take(num_to_take) { + if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash) { + let stats = ScrapeStatistics { + complete: torrent_data.num_seeders, + downloaded: 0, // No implementation planned + incomplete: torrent_data.num_leechers, + }; + + response.files.insert(info_hash, stats); + } + } + }; + + response +} diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 3f96ef2..5a1d900 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -1,153 +1,143 @@ -use std::sync::Arc; -use std::thread::Builder; -use std::time::Duration; +use std::{ + fs::File, + io::BufReader, + sync::{atomic::AtomicUsize, Arc}, +}; -use anyhow::Context; -use mio::{Poll, Waker}; -use parking_lot::Mutex; -use privdrop::PrivDrop; +use aquatic_common::{access_list::AccessList, privileges::drop_privileges_after_socket_binding}; +use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -pub mod common; +use crate::config::Config; + +mod common; pub mod config; -pub mod handler; -pub mod network; -pub mod tasks; - -use common::*; -use config::Config; -use network::utils::create_tls_acceptor; +mod handlers; +mod network; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; +const SHARED_CHANNEL_SIZE: usize = 1024; + pub fn run(config: Config) -> anyhow::Result<()> { - let state = State::default(); - - tasks::update_access_list(&config, &state); - - start_workers(config.clone(), state.clone())?; - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - tasks::update_access_list(&config, &state); - - state - .torrent_maps - .lock() - .clean(&config, &state.access_list.load_full()); + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); } -} -pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { - let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?; - - let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); - - let mut out_message_senders = Vec::new(); - let mut wakers = Vec::new(); - - let socket_worker_statuses: SocketWorkerStatuses = { - let mut statuses = Vec::new(); - - for _ in 0..config.socket_workers { - statuses.push(None); - } - - Arc::new(Mutex::new(statuses)) + let access_list = if config.access_list.mode.is_on() { + AccessList::create_from_path(&config.access_list.path).expect("Load access list") + } else { + AccessList::default() }; - for i in 0..config.socket_workers { + let num_peers = config.socket_workers + config.request_workers; + + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + let tls_config = Arc::new(create_tls_config(&config).unwrap()); + + let mut executors = Vec::new(); + + for i in 0..(config.socket_workers) { let config = config.clone(); - let state = state.clone(); - let socket_worker_statuses = socket_worker_statuses.clone(); - let request_channel_sender = request_channel_sender.clone(); - let opt_tls_acceptor = opt_tls_acceptor.clone(); - let poll = Poll::new().expect("create poll"); - let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN).expect("create waker")); + let tls_config = tls_config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + let access_list = access_list.clone(); - let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded(); + let mut builder = LocalExecutorBuilder::default(); - out_message_senders.push(response_channel_sender); - wakers.push(waker); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - network::run_socket_worker( - config, - state, - i, - socket_worker_statuses, - request_channel_sender, - response_channel_receiver, - opt_tls_acceptor, - poll, - ); - })?; - } - - // Wait for socket worker statuses. On error from any, quit program. - // On success from all, drop privileges if corresponding setting is set - // and continue program. - loop { - ::std::thread::sleep(::std::time::Duration::from_millis(10)); - - if let Some(statuses) = socket_worker_statuses.try_lock() { - for opt_status in statuses.iter() { - if let Some(Err(err)) = opt_status { - return Err(::anyhow::anyhow!(err.to_owned())); - } - } - - if statuses.iter().all(Option::is_some) { - if config.privileges.drop_privileges { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply() - .context("Couldn't drop root privileges")?; - } - - break; - } + if config.cpu_pinning.active { + builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); } + + let executor = builder.spawn(|| async move { + network::run_socket_worker( + config, + tls_config, + request_mesh_builder, + response_mesh_builder, + num_bound_sockets, + access_list, + ) + .await + }); + + executors.push(executor); } - let response_channel_sender = ResponseChannelSender::new(out_message_senders); - - for i in 0..config.request_workers { + for i in 0..(config.request_workers) { let config = config.clone(); - let state = state.clone(); - let request_channel_receiver = request_channel_receiver.clone(); - let response_channel_sender = response_channel_sender.clone(); - let wakers = wakers.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let access_list = access_list.clone(); - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - handler::run_request_worker( - config, - state, - request_channel_receiver, - response_channel_sender, - wakers, - ); - })?; + let mut builder = LocalExecutorBuilder::default(); + + if config.cpu_pinning.active { + builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); + } + + let executor = builder.spawn(|| async move { + handlers::run_request_worker( + config, + request_mesh_builder, + response_mesh_builder, + access_list, + ) + .await + }); + + executors.push(executor); } - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); - Builder::new() - .name("statistics".to_string()) - .spawn(move || loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::print_statistics(&state); - }) - .expect("spawn statistics thread"); + for executor in executors { + executor + .expect("failed to spawn local executor") + .join() + .unwrap(); } Ok(()) } + +fn create_tls_config(config: &Config) -> anyhow::Result { + let certs = { + let f = File::open(&config.network.tls_certificate_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::certs(&mut f)? + .into_iter() + .map(|bytes| rustls::Certificate(bytes)) + .collect() + }; + + let private_key = { + let f = File::open(&config.network.tls_private_key_path)?; + let mut f = BufReader::new(f); + + rustls_pemfile::pkcs8_private_keys(&mut f)? + .first() + .map(|bytes| rustls::PrivateKey(bytes.clone())) + .ok_or(anyhow::anyhow!("No private keys in file"))? + }; + + let tls_config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, private_key)?; + + Ok(tls_config) +} diff --git a/aquatic_http/src/lib/network.rs b/aquatic_http/src/lib/network.rs new file mode 100644 index 0000000..4efbab7 --- /dev/null +++ b/aquatic_http/src/lib/network.rs @@ -0,0 +1,496 @@ +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::io::{Cursor, ErrorKind, Read, Write}; +use std::net::SocketAddr; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use aquatic_common::access_list::AccessList; +use aquatic_http_protocol::common::InfoHash; +use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; +use aquatic_http_protocol::response::{ + FailureResponse, Response, ScrapeResponse, ScrapeStatistics, +}; +use either::Either; +use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::channels::shared_channel::ConnectedReceiver; +use glommio::net::{TcpListener, TcpStream}; +use glommio::task::JoinHandle; +use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; +use rustls::ServerConnection; +use slab::Slab; + +use crate::common::num_digits_in_usize; +use crate::config::Config; + +use super::common::*; + +const INTERMEDIATE_BUFFER_SIZE: usize = 1024; +const MAX_REQUEST_SIZE: usize = 2048; + +struct PendingScrapeResponse { + pending_worker_responses: usize, + stats: BTreeMap, +} + +struct ConnectionReference { + response_sender: LocalSender, + handle: JoinHandle<()>, +} + +struct Connection { + config: Rc, + access_list: Rc>, + request_senders: Rc>, + response_receiver: LocalReceiver, + response_consumer_id: ConsumerId, + tls: ServerConnection, + stream: TcpStream, + connection_id: ConnectionId, + request_buffer: [u8; MAX_REQUEST_SIZE], + request_buffer_position: usize, +} + +pub async fn run_socket_worker( + config: Config, + tls_config: Arc, + request_mesh_builder: MeshBuilder, + response_mesh_builder: MeshBuilder, + num_bound_sockets: Arc, + access_list: AccessList, +) { + let config = Rc::new(config); + let access_list = Rc::new(RefCell::new(access_list)); + + let listener = TcpListener::bind(config.network.address).expect("bind socket"); + num_bound_sockets.fetch_add(1, Ordering::SeqCst); + + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let request_senders = Rc::new(request_senders); + + let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); + + let connection_slab = Rc::new(RefCell::new(Slab::new())); + let connections_to_remove = Rc::new(RefCell::new(Vec::new())); + + // Periodically update access list + TimerActionRepeat::repeat(enclose!((config, access_list) move || { + enclose!((config, access_list) move || async move { + update_access_list(config.clone(), access_list.clone()).await; + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + // Periodically remove closed connections + TimerActionRepeat::repeat( + enclose!((config, connection_slab, connections_to_remove) move || { + enclose!((config, connection_slab, connections_to_remove) move || async move { + let connections_to_remove = connections_to_remove.replace(Vec::new()); + + for connection_id in connections_to_remove { + if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { + ::log::debug!("removed connection with id {}", connection_id); + } else { + ::log::error!( + "couldn't remove connection with id {}, it is not in connection slab", + connection_id + ); + } + } + + Some(Duration::from_secs(config.cleaning.interval)) + })() + }), + ); + + for (_, response_receiver) in response_receivers.streams() { + spawn_local(receive_responses( + response_receiver, + connection_slab.clone(), + )) + .detach(); + } + + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + match stream { + Ok(stream) => { + let (response_sender, response_receiver) = new_bounded(config.request_workers); + + let mut slab = connection_slab.borrow_mut(); + let entry = slab.vacant_entry(); + let key = entry.key(); + + let mut conn = Connection { + config: config.clone(), + access_list: access_list.clone(), + request_senders: request_senders.clone(), + response_receiver, + response_consumer_id, + tls: ServerConnection::new(tls_config.clone()).unwrap(), + stream, + connection_id: ConnectionId(entry.key()), + request_buffer: [0u8; MAX_REQUEST_SIZE], + request_buffer_position: 0, + }; + + let connections_to_remove = connections_to_remove.clone(); + + let handle = spawn_local(async move { + if let Err(err) = conn.handle_stream().await { + ::log::info!("conn.handle_stream() error: {:?}", err); + } + + connections_to_remove.borrow_mut().push(key); + }) + .detach(); + + let connection_reference = ConnectionReference { + response_sender, + handle, + }; + + entry.insert(connection_reference); + } + Err(err) => { + ::log::error!("accept connection: {:?}", err); + } + } + } +} + +async fn receive_responses( + mut response_receiver: ConnectedReceiver, + connection_references: Rc>>, +) { + while let Some(channel_response) = response_receiver.next().await { + if let Some(reference) = connection_references + .borrow() + .get(channel_response.get_connection_id().0) + { + if let Err(err) = reference.response_sender.try_send(channel_response) { + ::log::error!("Couldn't send response to local receiver: {:?}", err); + } + } + } +} + +impl Connection { + async fn handle_stream(&mut self) -> anyhow::Result<()> { + let mut close_after_writing = false; + + loop { + match self.read_tls().await? { + Some(Either::Left(request)) => { + let response = match self.handle_request(request).await? { + Some(Either::Left(response)) => response, + Some(Either::Right(pending_scrape_response)) => { + self.wait_for_response(Some(pending_scrape_response)) + .await? + } + None => self.wait_for_response(None).await?, + }; + + self.queue_response(&response)?; + + if !self.config.network.keep_alive { + close_after_writing = true; + } + } + Some(Either::Right(response)) => { + self.queue_response(&Response::Failure(response))?; + + close_after_writing = true; + } + None => { + // Still handshaking + } + } + + self.write_tls().await?; + + if close_after_writing { + let _ = self.stream.shutdown(std::net::Shutdown::Both).await; + + break; + } + } + + Ok(()) + } + + async fn read_tls(&mut self) -> anyhow::Result>> { + loop { + ::log::debug!("read_tls"); + + let mut buf = [0u8; INTERMEDIATE_BUFFER_SIZE]; + + let bytes_read = self.stream.read(&mut buf).await?; + + if bytes_read == 0 { + return Err(anyhow::anyhow!("Peer has closed connection")); + } + + let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); + + let io_state = self.tls.process_new_packets()?; + + let mut added_plaintext = false; + + if io_state.plaintext_bytes_to_read() != 0 { + loop { + match self.tls.reader().read(&mut buf) { + Ok(0) => { + break; + } + Ok(amt) => { + let end = self.request_buffer_position + amt; + + if end > self.request_buffer.len() { + return Err(anyhow::anyhow!("request too large")); + } else { + let request_buffer_slice = + &mut self.request_buffer[self.request_buffer_position..end]; + + request_buffer_slice.copy_from_slice(&buf[..amt]); + + self.request_buffer_position = end; + + added_plaintext = true; + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { + // Should never happen + ::log::error!("tls.reader().read error: {:?}", err); + + break; + } + } + } + } + + if added_plaintext { + match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) { + Ok(request) => { + ::log::debug!("received request: {:?}", request); + + self.request_buffer_position = 0; + + return Ok(Some(Either::Left(request))); + } + Err(RequestParseError::NeedMoreData) => { + ::log::debug!( + "need more request data. current data: {:?}", + std::str::from_utf8(&self.request_buffer) + ); + } + Err(RequestParseError::Invalid(err)) => { + ::log::debug!("invalid request: {:?}", err); + + let response = FailureResponse { + failure_reason: "Invalid request".into(), + }; + + return Ok(Some(Either::Right(response))); + } + } + } + + if self.tls.wants_write() { + break; + } + } + + Ok(None) + } + + async fn write_tls(&mut self) -> anyhow::Result<()> { + if !self.tls.wants_write() { + return Ok(()); + } + + ::log::debug!("write_tls (wants write)"); + + let mut buf = Vec::new(); + let mut buf = Cursor::new(&mut buf); + + while self.tls.wants_write() { + self.tls.write_tls(&mut buf).unwrap(); + } + + self.stream.write_all(&buf.into_inner()).await?; + self.stream.flush().await?; + + Ok(()) + } + + /// Take a request and: + /// - Return error response if request is not allowed + /// - If it is an announce requests, pass it on to request workers and return None + /// - If it is a scrape requests, split it up and pass on parts to + /// relevant request workers, and return PendingScrapeResponse struct. + async fn handle_request( + &self, + request: Request, + ) -> anyhow::Result>> { + let peer_addr = self.get_peer_addr()?; + + match request { + Request::Announce(request) => { + let info_hash = request.info_hash; + + if self + .access_list + .borrow() + .allows(self.config.access_list.mode, &info_hash.0) + { + let request = ChannelRequest::Announce { + request, + connection_id: self.connection_id, + response_consumer_id: self.response_consumer_id, + peer_addr, + }; + + let consumer_index = calculate_request_consumer_index(&self.config, info_hash); + + // Only fails when receiver is closed + self.request_senders + .send_to(consumer_index, request) + .await + .unwrap(); + + Ok(None) + } else { + let response = Response::Failure(FailureResponse { + failure_reason: "Info hash not allowed".into(), + }); + + Ok(Some(Either::Left(response))) + } + } + Request::Scrape(ScrapeRequest { info_hashes }) => { + let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); + + for info_hash in info_hashes.into_iter() { + let info_hashes = info_hashes_by_worker + .entry(calculate_request_consumer_index(&self.config, info_hash)) + .or_default(); + + info_hashes.push(info_hash); + } + + let pending_worker_responses = info_hashes_by_worker.len(); + + for (consumer_index, info_hashes) in info_hashes_by_worker { + let request = ChannelRequest::Scrape { + request: ScrapeRequest { info_hashes }, + peer_addr, + response_consumer_id: self.response_consumer_id, + connection_id: self.connection_id, + }; + + // Only fails when receiver is closed + self.request_senders + .send_to(consumer_index, request) + .await + .unwrap(); + } + + let pending_scrape_response = PendingScrapeResponse { + pending_worker_responses, + stats: Default::default(), + }; + + Ok(Some(Either::Right(pending_scrape_response))) + } + } + } + + /// Wait for announce response or partial scrape responses to arrive, + /// return full response + async fn wait_for_response( + &self, + mut opt_pending_scrape_response: Option, + ) -> anyhow::Result { + loop { + if let Some(channel_response) = self.response_receiver.recv().await { + if channel_response.get_peer_addr() != self.get_peer_addr()? { + return Err(anyhow::anyhow!("peer addressess didn't match")); + } + + match channel_response { + ChannelResponse::Announce { response, .. } => { + break Ok(Response::Announce(response)); + } + ChannelResponse::Scrape { response, .. } => { + if let Some(mut pending) = opt_pending_scrape_response.take() { + pending.stats.extend(response.files); + pending.pending_worker_responses -= 1; + + if pending.pending_worker_responses == 0 { + let response = Response::Scrape(ScrapeResponse { + files: pending.stats, + }); + + break Ok(response); + } else { + opt_pending_scrape_response = Some(pending); + } + } else { + return Err(anyhow::anyhow!( + "received channel scrape response without pending scrape response" + )); + } + } + }; + } else { + // TODO: this is a serious error condition and should maybe be handled differently + return Err(anyhow::anyhow!( + "response receiver can't receive - sender is closed" + )); + } + } + } + + fn queue_response(&mut self, response: &Response) -> anyhow::Result<()> { + let mut body = Vec::new(); + + response.write(&mut body).unwrap(); + + let content_len = body.len() + 2; // 2 is for newlines at end + let content_len_num_digits = num_digits_in_usize(content_len); + + let mut response_bytes = Vec::with_capacity(39 + content_len_num_digits + body.len()); + + response_bytes.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: "); + ::itoa::write(&mut response_bytes, content_len)?; + response_bytes.extend_from_slice(b"\r\n\r\n"); + response_bytes.append(&mut body); + response_bytes.extend_from_slice(b"\r\n"); + + self.tls.writer().write(&response_bytes[..])?; + + Ok(()) + } + + fn get_peer_addr(&self) -> anyhow::Result { + self.stream + .peer_addr() + .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err)) + } +} + +fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { + (info_hash.0[0] as usize) % config.request_workers +} diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs deleted file mode 100644 index 4324dfc..0000000 --- a/aquatic_http/src/lib/network/connection.rs +++ /dev/null @@ -1,291 +0,0 @@ -use std::io::ErrorKind; -use std::io::{Read, Write}; -use std::net::SocketAddr; -use std::sync::Arc; - -use hashbrown::HashMap; -use mio::net::TcpStream; -use mio::{Poll, Token}; -use native_tls::{MidHandshakeTlsStream, TlsAcceptor}; - -use aquatic_http_protocol::request::{Request, RequestParseError}; - -use crate::common::*; - -use super::stream::Stream; - -#[derive(Debug)] -pub enum RequestReadError { - NeedMoreData, - StreamEnded, - Parse(anyhow::Error), - Io(::std::io::Error), -} - -pub struct EstablishedConnection { - stream: Stream, - pub peer_addr: SocketAddr, - buf: Vec, - bytes_read: usize, -} - -impl EstablishedConnection { - #[inline] - fn new(stream: Stream) -> Self { - let peer_addr = stream.get_peer_addr(); - - Self { - stream, - peer_addr, - buf: Vec::new(), - bytes_read: 0, - } - } - - pub fn read_request(&mut self) -> Result { - if (self.buf.len() - self.bytes_read < 512) & (self.buf.len() <= 3072) { - self.buf.extend_from_slice(&[0; 1024]); - } - - match self.stream.read(&mut self.buf[self.bytes_read..]) { - Ok(0) => { - self.clear_buffer(); - - return Err(RequestReadError::StreamEnded); - } - Ok(bytes_read) => { - self.bytes_read += bytes_read; - - ::log::debug!("read_request read {} bytes", bytes_read); - } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - return Err(RequestReadError::NeedMoreData); - } - Err(err) => { - self.clear_buffer(); - - return Err(RequestReadError::Io(err)); - } - } - - match Request::from_bytes(&self.buf[..self.bytes_read]) { - Ok(request) => { - self.clear_buffer(); - - Ok(request) - } - Err(RequestParseError::NeedMoreData) => Err(RequestReadError::NeedMoreData), - Err(RequestParseError::Invalid(err)) => { - self.clear_buffer(); - - Err(RequestReadError::Parse(err)) - } - } - } - - pub fn send_response(&mut self, body: &[u8]) -> ::std::io::Result<()> { - let content_len = body.len() + 2; // 2 is for newlines at end - let content_len_num_digits = Self::num_digits_in_usize(content_len); - - let mut response = Vec::with_capacity(39 + content_len_num_digits + body.len()); - - response.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: "); - ::itoa::write(&mut response, content_len)?; - response.extend_from_slice(b"\r\n\r\n"); - response.extend_from_slice(body); - response.extend_from_slice(b"\r\n"); - - let bytes_written = self.stream.write(&response)?; - - if bytes_written != response.len() { - ::log::error!( - "send_response: only {} out of {} bytes written", - bytes_written, - response.len() - ); - } - - self.stream.flush()?; - - Ok(()) - } - - fn num_digits_in_usize(mut number: usize) -> usize { - let mut num_digits = 1usize; - - while number >= 10 { - num_digits += 1; - - number /= 10; - } - - num_digits - } - - #[inline] - pub fn clear_buffer(&mut self) { - self.bytes_read = 0; - self.buf = Vec::new(); - } -} - -pub enum TlsHandshakeMachineError { - WouldBlock(TlsHandshakeMachine), - Failure(native_tls::Error), -} - -enum TlsHandshakeMachineInner { - TcpStream(TcpStream), - TlsMidHandshake(MidHandshakeTlsStream), -} - -pub struct TlsHandshakeMachine { - tls_acceptor: Arc, - inner: TlsHandshakeMachineInner, -} - -impl<'a> TlsHandshakeMachine { - #[inline] - fn new(tls_acceptor: Arc, tcp_stream: TcpStream) -> Self { - Self { - tls_acceptor, - inner: TlsHandshakeMachineInner::TcpStream(tcp_stream), - } - } - - /// Attempt to establish a TLS connection. On a WouldBlock error, return - /// the machine wrapped in an error for later attempts. - pub fn establish_tls(self) -> Result { - let handshake_result = match self.inner { - TlsHandshakeMachineInner::TcpStream(stream) => self.tls_acceptor.accept(stream), - TlsHandshakeMachineInner::TlsMidHandshake(handshake) => handshake.handshake(), - }; - - match handshake_result { - Ok(stream) => { - let established = EstablishedConnection::new(Stream::TlsStream(stream)); - - ::log::debug!("established tls connection"); - - Ok(established) - } - Err(native_tls::HandshakeError::WouldBlock(handshake)) => { - let inner = TlsHandshakeMachineInner::TlsMidHandshake(handshake); - - let machine = Self { - tls_acceptor: self.tls_acceptor, - inner, - }; - - Err(TlsHandshakeMachineError::WouldBlock(machine)) - } - Err(native_tls::HandshakeError::Failure(err)) => { - Err(TlsHandshakeMachineError::Failure(err)) - } - } - } -} - -enum ConnectionInner { - Established(EstablishedConnection), - InProgress(TlsHandshakeMachine), -} - -pub struct Connection { - pub valid_until: ValidUntil, - inner: ConnectionInner, -} - -impl Connection { - #[inline] - pub fn new( - opt_tls_acceptor: &Option>, - valid_until: ValidUntil, - tcp_stream: TcpStream, - ) -> Self { - // Setup handshake machine if TLS is requested - let inner = if let Some(tls_acceptor) = opt_tls_acceptor { - ConnectionInner::InProgress(TlsHandshakeMachine::new(tls_acceptor.clone(), tcp_stream)) - } else { - ::log::debug!("established tcp connection"); - - ConnectionInner::Established(EstablishedConnection::new(Stream::TcpStream(tcp_stream))) - }; - - Self { valid_until, inner } - } - - #[inline] - pub fn from_established(valid_until: ValidUntil, established: EstablishedConnection) -> Self { - Self { - valid_until, - inner: ConnectionInner::Established(established), - } - } - - #[inline] - pub fn from_in_progress(valid_until: ValidUntil, machine: TlsHandshakeMachine) -> Self { - Self { - valid_until, - inner: ConnectionInner::InProgress(machine), - } - } - - #[inline] - pub fn get_established(&mut self) -> Option<&mut EstablishedConnection> { - if let ConnectionInner::Established(ref mut established) = self.inner { - Some(established) - } else { - None - } - } - - /// Takes ownership since TlsStream needs ownership of TcpStream - #[inline] - pub fn get_in_progress(self) -> Option { - if let ConnectionInner::InProgress(machine) = self.inner { - Some(machine) - } else { - None - } - } - - pub fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> { - match &mut self.inner { - ConnectionInner::Established(established) => match &mut established.stream { - Stream::TcpStream(ref mut stream) => poll.registry().deregister(stream), - Stream::TlsStream(ref mut stream) => poll.registry().deregister(stream.get_mut()), - }, - ConnectionInner::InProgress(TlsHandshakeMachine { inner, .. }) => match inner { - TlsHandshakeMachineInner::TcpStream(ref mut stream) => { - poll.registry().deregister(stream) - } - TlsHandshakeMachineInner::TlsMidHandshake(ref mut mid_handshake) => { - poll.registry().deregister(mid_handshake.get_mut()) - } - }, - } - } -} - -pub type ConnectionMap = HashMap; - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_num_digits_in_usize() { - let f = EstablishedConnection::num_digits_in_usize; - - assert_eq!(f(0), 1); - assert_eq!(f(1), 1); - assert_eq!(f(9), 1); - assert_eq!(f(10), 2); - assert_eq!(f(11), 2); - assert_eq!(f(99), 2); - assert_eq!(f(100), 3); - assert_eq!(f(101), 3); - assert_eq!(f(1000), 4); - } -} diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs deleted file mode 100644 index dd7d889..0000000 --- a/aquatic_http/src/lib/network/mod.rs +++ /dev/null @@ -1,387 +0,0 @@ -use std::io::{Cursor, ErrorKind}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use std::vec::Drain; - -use aquatic_common::access_list::AccessListQuery; -use aquatic_http_protocol::request::Request; -use hashbrown::HashMap; -use log::{debug, error, info}; -use mio::net::TcpListener; -use mio::{Events, Interest, Poll, Token}; -use native_tls::TlsAcceptor; - -use aquatic_http_protocol::response::*; - -use crate::common::*; -use crate::config::Config; - -pub mod connection; -pub mod stream; -pub mod utils; - -use connection::*; -use utils::*; - -const CONNECTION_CLEAN_INTERVAL: usize = 2 ^ 22; - -pub fn run_socket_worker( - config: Config, - state: State, - socket_worker_index: usize, - socket_worker_statuses: SocketWorkerStatuses, - request_channel_sender: RequestChannelSender, - response_channel_receiver: ResponseChannelReceiver, - opt_tls_acceptor: Option, - poll: Poll, -) { - match create_listener(config.network.address, config.network.ipv6_only) { - Ok(listener) => { - socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(())); - - run_poll_loop( - config, - &state, - socket_worker_index, - request_channel_sender, - response_channel_receiver, - listener, - opt_tls_acceptor, - poll, - ); - } - Err(err) => { - socket_worker_statuses.lock()[socket_worker_index] = - Some(Err(format!("Couldn't open socket: {:#}", err))); - } - } -} - -pub fn run_poll_loop( - config: Config, - state: &State, - socket_worker_index: usize, - request_channel_sender: RequestChannelSender, - response_channel_receiver: ResponseChannelReceiver, - listener: ::std::net::TcpListener, - opt_tls_acceptor: Option, - mut poll: Poll, -) { - let poll_timeout = Duration::from_micros(config.network.poll_timeout_microseconds); - - let mut listener = TcpListener::from_std(listener); - let mut events = Events::with_capacity(config.network.poll_event_capacity); - - poll.registry() - .register(&mut listener, Token(0), Interest::READABLE) - .unwrap(); - - let mut connections: ConnectionMap = HashMap::new(); - let opt_tls_acceptor = opt_tls_acceptor.map(Arc::new); - - let mut poll_token_counter = Token(0usize); - let mut iter_counter = 0usize; - - let mut response_buffer = [0u8; 4096]; - let mut response_buffer = Cursor::new(&mut response_buffer[..]); - let mut local_responses = Vec::new(); - - loop { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); - - for event in events.iter() { - let token = event.token(); - - if token == LISTENER_TOKEN { - accept_new_streams( - &config, - &mut listener, - &mut poll, - &mut connections, - &mut poll_token_counter, - &opt_tls_acceptor, - ); - } else if token != CHANNEL_TOKEN { - handle_connection_read_event( - &config, - &state, - socket_worker_index, - &mut poll, - &request_channel_sender, - &mut local_responses, - &mut connections, - token, - ); - } - - // Send responses for each event. Channel token is not interesting - // by itself, but is just for making sure responses are sent even - // if no new connects / requests come in. - send_responses( - &config, - &mut poll, - &mut response_buffer, - local_responses.drain(..), - &response_channel_receiver, - &mut connections, - ); - } - - // Remove inactive connections, but not every iteration - if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 { - remove_inactive_connections(&mut poll, &mut connections); - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - -fn accept_new_streams( - config: &Config, - listener: &mut TcpListener, - poll: &mut Poll, - connections: &mut ConnectionMap, - poll_token_counter: &mut Token, - opt_tls_acceptor: &Option>, -) { - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - loop { - match listener.accept() { - Ok((mut stream, _)) => { - poll_token_counter.0 = poll_token_counter.0.wrapping_add(1); - - // Skip listener and channel tokens - if poll_token_counter.0 < 2 { - poll_token_counter.0 = 2; - } - - let token = *poll_token_counter; - - // Remove connection if it exists (which is unlikely) - remove_connection(poll, connections, poll_token_counter); - - poll.registry() - .register(&mut stream, token, Interest::READABLE) - .unwrap(); - - let connection = Connection::new(opt_tls_acceptor, valid_until, stream); - - connections.insert(token, connection); - } - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break; - } - - info!("error while accepting streams: {}", err); - } - } - } -} - -/// On the stream given by poll_token, get TLS up and running if requested, -/// then read requests and pass on through channel. -pub fn handle_connection_read_event( - config: &Config, - state: &State, - socket_worker_index: usize, - poll: &mut Poll, - request_channel_sender: &RequestChannelSender, - local_responses: &mut Vec<(ConnectionMeta, Response)>, - connections: &mut ConnectionMap, - poll_token: Token, -) { - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - let access_list_mode = config.access_list.mode; - - loop { - // Get connection, updating valid_until - let connection = if let Some(c) = connections.get_mut(&poll_token) { - c - } else { - // If there is no connection, there is no stream, so there - // shouldn't be any (relevant) poll events. In other words, it's - // safe to return here - return; - }; - - connection.valid_until = valid_until; - - if let Some(established) = connection.get_established() { - match established.read_request() { - Ok(Request::Announce(ref r)) - if !state.access_list.allows(access_list_mode, &r.info_hash.0) => - { - let meta = ConnectionMeta { - worker_index: socket_worker_index, - poll_token, - peer_addr: established.peer_addr, - }; - let response = FailureResponse::new("Info hash not allowed"); - - debug!("read disallowed request, sending back error response"); - - local_responses.push((meta, Response::Failure(response))); - - break; - } - Ok(request) => { - let meta = ConnectionMeta { - worker_index: socket_worker_index, - poll_token, - peer_addr: established.peer_addr, - }; - - debug!("read allowed request, sending on to channel"); - - if let Err(err) = request_channel_sender.send((meta, request)) { - error!("RequestChannelSender: couldn't send message: {:?}", err); - } - - break; - } - Err(RequestReadError::NeedMoreData) => { - info!("need more data"); - - // Stop reading data (defer to later events) - break; - } - Err(RequestReadError::Parse(err)) => { - info!("error reading request (invalid): {:#?}", err); - - let meta = ConnectionMeta { - worker_index: socket_worker_index, - poll_token, - peer_addr: established.peer_addr, - }; - - let response = FailureResponse::new("Invalid request"); - - local_responses.push((meta, Response::Failure(response))); - - break; - } - Err(RequestReadError::StreamEnded) => { - ::log::debug!("stream ended"); - - remove_connection(poll, connections, &poll_token); - - break; - } - Err(RequestReadError::Io(err)) => { - ::log::info!("error reading request (io): {}", err); - - remove_connection(poll, connections, &poll_token); - - break; - } - } - } else if let Some(handshake_machine) = connections - .remove(&poll_token) - .and_then(Connection::get_in_progress) - { - match handshake_machine.establish_tls() { - Ok(established) => { - let connection = Connection::from_established(valid_until, established); - - connections.insert(poll_token, connection); - } - Err(TlsHandshakeMachineError::WouldBlock(machine)) => { - let connection = Connection::from_in_progress(valid_until, machine); - - connections.insert(poll_token, connection); - - // Break and wait for more data - break; - } - Err(TlsHandshakeMachineError::Failure(err)) => { - info!("tls handshake error: {}", err); - - // TLS negotiation failed - break; - } - } - } - } -} - -/// Read responses from channel, send to peers -pub fn send_responses( - config: &Config, - poll: &mut Poll, - buffer: &mut Cursor<&mut [u8]>, - local_responses: Drain<(ConnectionMeta, Response)>, - channel_responses: &ResponseChannelReceiver, - connections: &mut ConnectionMap, -) { - let channel_responses_len = channel_responses.len(); - let channel_responses_drain = channel_responses.try_iter().take(channel_responses_len); - - for (meta, response) in local_responses.chain(channel_responses_drain) { - if let Some(established) = connections - .get_mut(&meta.poll_token) - .and_then(Connection::get_established) - { - if established.peer_addr != meta.peer_addr { - info!("socket worker error: peer socket addrs didn't match"); - - continue; - } - - buffer.set_position(0); - - let bytes_written = response.write(buffer).unwrap(); - - match established.send_response(&buffer.get_mut()[..bytes_written]) { - Ok(()) => { - ::log::debug!( - "sent response: {:?} with response string {}", - response, - String::from_utf8_lossy(&buffer.get_ref()[..bytes_written]) - ); - - if !config.network.keep_alive { - remove_connection(poll, connections, &meta.poll_token); - } - } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - debug!("send response: would block"); - } - Err(err) => { - info!("error sending response: {}", err); - - remove_connection(poll, connections, &meta.poll_token); - } - } - } - } -} - -// Close and remove inactive connections -pub fn remove_inactive_connections(poll: &mut Poll, connections: &mut ConnectionMap) { - let now = Instant::now(); - - connections.retain(|_, connection| { - let keep = connection.valid_until.0 >= now; - - if !keep { - if let Err(err) = connection.deregister(poll) { - ::log::error!("deregister connection error: {}", err); - } - } - - keep - }); - - connections.shrink_to_fit(); -} - -fn remove_connection(poll: &mut Poll, connections: &mut ConnectionMap, connection_token: &Token) { - if let Some(mut connection) = connections.remove(connection_token) { - if let Err(err) = connection.deregister(poll) { - ::log::error!("deregister connection error: {}", err); - } - } -} diff --git a/aquatic_http/src/lib/network/stream.rs b/aquatic_http/src/lib/network/stream.rs deleted file mode 100644 index 0104f5a..0000000 --- a/aquatic_http/src/lib/network/stream.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::io::{Read, Write}; -use std::net::SocketAddr; - -use mio::net::TcpStream; -use native_tls::TlsStream; - -pub enum Stream { - TcpStream(TcpStream), - TlsStream(TlsStream), -} - -impl Stream { - #[inline] - pub fn get_peer_addr(&self) -> SocketAddr { - match self { - Self::TcpStream(stream) => stream.peer_addr().unwrap(), - Self::TlsStream(stream) => stream.get_ref().peer_addr().unwrap(), - } - } -} - -impl Read for Stream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - match self { - Self::TcpStream(stream) => stream.read(buf), - Self::TlsStream(stream) => stream.read(buf), - } - } - - /// Not used but provided for completeness - #[inline] - fn read_vectored( - &mut self, - bufs: &mut [::std::io::IoSliceMut<'_>], - ) -> ::std::io::Result { - match self { - Self::TcpStream(stream) => stream.read_vectored(bufs), - Self::TlsStream(stream) => stream.read_vectored(bufs), - } - } -} - -impl Write for Stream { - #[inline] - fn write(&mut self, buf: &[u8]) -> ::std::io::Result { - match self { - Self::TcpStream(stream) => stream.write(buf), - Self::TlsStream(stream) => stream.write(buf), - } - } - - /// Not used but provided for completeness - #[inline] - fn write_vectored(&mut self, bufs: &[::std::io::IoSlice<'_>]) -> ::std::io::Result { - match self { - Self::TcpStream(stream) => stream.write_vectored(bufs), - Self::TlsStream(stream) => stream.write_vectored(bufs), - } - } - - #[inline] - fn flush(&mut self) -> ::std::io::Result<()> { - match self { - Self::TcpStream(stream) => stream.flush(), - Self::TlsStream(stream) => stream.flush(), - } - } -} diff --git a/aquatic_http/src/lib/network/utils.rs b/aquatic_http/src/lib/network/utils.rs deleted file mode 100644 index 9349f98..0000000 --- a/aquatic_http/src/lib/network/utils.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::fs::File; -use std::io::Read; -use std::net::SocketAddr; - -use anyhow::Context; -use native_tls::{Identity, TlsAcceptor}; -use socket2::{Domain, Protocol, Socket, Type}; - -use crate::config::TlsConfig; - -pub fn create_tls_acceptor(config: &TlsConfig) -> anyhow::Result> { - if config.use_tls { - let mut identity_bytes = Vec::new(); - let mut file = - File::open(&config.tls_pkcs12_path).context("Couldn't open pkcs12 identity file")?; - - file.read_to_end(&mut identity_bytes) - .context("Couldn't read pkcs12 identity file")?; - - let identity = Identity::from_pkcs12(&identity_bytes[..], &config.tls_pkcs12_password) - .context("Couldn't parse pkcs12 identity file")?; - - let acceptor = TlsAcceptor::new(identity) - .context("Couldn't create TlsAcceptor from pkcs12 identity")?; - - Ok(Some(acceptor)) - } else { - Ok(None) - } -} - -pub fn create_listener( - address: SocketAddr, - ipv6_only: bool, -) -> ::anyhow::Result<::std::net::TcpListener> { - let builder = if address.is_ipv4() { - Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)) - } else { - Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP)) - } - .context("Couldn't create socket2::Socket")?; - - if ipv6_only { - builder - .set_only_v6(true) - .context("Couldn't put socket in ipv6 only mode")? - } - - builder - .set_nonblocking(true) - .context("Couldn't put socket in non-blocking mode")?; - builder - .set_reuse_port(true) - .context("Couldn't put socket in reuse_port mode")?; - builder - .bind(&address.into()) - .with_context(|| format!("Couldn't bind socket to address {}", address))?; - builder - .listen(128) - .context("Couldn't listen for connections on socket")?; - - Ok(builder.into()) -} diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs deleted file mode 100644 index 341a434..0000000 --- a/aquatic_http/src/lib/tasks.rs +++ /dev/null @@ -1,52 +0,0 @@ -use histogram::Histogram; - -use aquatic_common::access_list::{AccessListMode, AccessListQuery}; - -use crate::{common::*, config::Config}; - -pub fn update_access_list(config: &Config, state: &State) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = state.access_list.update_from_path(&config.access_list.path) { - ::log::error!("Couldn't update access list: {:?}", err); - } - } - AccessListMode::Off => {} - } -} - -pub fn print_statistics(state: &State) { - let mut peers_per_torrent = Histogram::new(); - - { - let torrents = &mut state.torrent_maps.lock(); - - for torrent in torrents.ipv4.values() { - let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; - - if let Err(err) = peers_per_torrent.increment(num_peers) { - eprintln!("error incrementing peers_per_torrent histogram: {}", err) - } - } - for torrent in torrents.ipv6.values() { - let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64; - - if let Err(err) = peers_per_torrent.increment(num_peers) { - eprintln!("error incrementing peers_per_torrent histogram: {}", err) - } - } - } - - if peers_per_torrent.entries() != 0 { - println!( - "peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}", - peers_per_torrent.minimum().unwrap(), - peers_per_torrent.percentile(50.0).unwrap(), - peers_per_torrent.percentile(75.0).unwrap(), - peers_per_torrent.percentile(90.0).unwrap(), - peers_per_torrent.percentile(99.0).unwrap(), - peers_per_torrent.percentile(99.9).unwrap(), - peers_per_torrent.maximum().unwrap(), - ); - } -} diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 7d29e52..25baab6 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -13,11 +13,13 @@ name = "aquatic_http_load_test" anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_http_protocol = "0.1.0" +futures-lite = "1" hashbrown = "0.11.2" +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" +rustls = { version = "0.20", features = ["dangerous_configuration"] } serde = { version = "1", features = ["derive"] } [dev-dependencies] diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index b05655c..1c8456a 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -9,20 +9,11 @@ pub struct Config { pub num_workers: u8, pub num_connections: usize, pub duration: usize, - pub network: NetworkConfig, pub torrents: TorrentConfig, } impl aquatic_cli_helpers::Config for Config {} -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct NetworkConfig { - pub connection_creation_interval: usize, - pub poll_timeout_microseconds: u64, - pub poll_event_capacity: usize, -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct TorrentConfig { @@ -48,22 +39,11 @@ impl Default for Config { num_workers: 1, num_connections: 8, duration: 0, - network: NetworkConfig::default(), torrents: TorrentConfig::default(), } } } -impl Default for NetworkConfig { - fn default() -> Self { - Self { - connection_creation_interval: 10, - poll_timeout_microseconds: 197, - poll_event_capacity: 64, - } - } -} - impl Default for TorrentConfig { fn default() -> Self { Self { diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index a23be10..e719f77 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; +use ::glommio::LocalExecutorBuilder; use rand::prelude::*; use rand_distr::Pareto; @@ -53,11 +54,18 @@ fn run(config: Config) -> ::anyhow::Result<()> { // Start socket workers + let tls_config = create_tls_config().unwrap(); + for _ in 0..config.num_workers { let config = config.clone(); + let tls_config = tls_config.clone(); let state = state.clone(); - thread::spawn(move || run_socket_thread(&config, state, 1)); + LocalExecutorBuilder::default() + .spawn(|| async move { + run_socket_thread(config, tls_config, state).await.unwrap(); + }) + .unwrap(); } monitor_statistics(state, &config); @@ -147,3 +155,32 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { } } } + +struct FakeCertificateVerifier; + +impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} + +fn create_tls_config() -> anyhow::Result> { + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(rustls::RootCertStore::empty()) + .with_no_client_auth(); + + config + .dangerous() + .set_certificate_verifier(Arc::new(FakeCertificateVerifier)); + + Ok(Arc::new(config)) +} diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 6ddc19a..aaa15c0 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,280 +1,282 @@ -use std::io::{Cursor, ErrorKind, Read, Write}; -use std::sync::atomic::Ordering; -use std::time::Duration; +use std::{ + cell::RefCell, + convert::TryInto, + io::{Cursor, ErrorKind, Read}, + rc::Rc, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; -use hashbrown::HashMap; -use mio::{net::TcpStream, Events, Interest, Poll, Token}; -use rand::{prelude::*, rngs::SmallRng}; +use aquatic_http_protocol::response::Response; +use futures_lite::{AsyncReadExt, AsyncWriteExt}; +use glommio::net::TcpStream; +use glommio::{prelude::*, timer::TimerActionRepeat}; +use rand::{prelude::SmallRng, SeedableRng}; +use rustls::ClientConnection; -use crate::common::*; -use crate::config::*; -use crate::utils::create_random_request; +use crate::{common::LoadTestState, config::Config, utils::create_random_request}; -pub struct Connection { +pub async fn run_socket_thread( + config: Config, + tls_config: Arc, + load_test_state: LoadTestState, +) -> anyhow::Result<()> { + let config = Rc::new(config); + let num_active_connections = Rc::new(RefCell::new(0usize)); + + TimerActionRepeat::repeat(move || { + periodically_open_connections( + config.clone(), + tls_config.clone(), + load_test_state.clone(), + num_active_connections.clone(), + ) + }); + + futures_lite::future::pending::().await; + + Ok(()) +} + +async fn periodically_open_connections( + config: Rc, + tls_config: Arc, + load_test_state: LoadTestState, + num_active_connections: Rc>, +) -> Option { + if *num_active_connections.borrow() < config.num_connections { + spawn_local(async move { + if let Err(err) = + Connection::run(config, tls_config, load_test_state, num_active_connections).await + { + eprintln!("connection creation error: {:?}", err); + } + }) + .detach(); + } + + Some(Duration::from_secs(1)) +} + +struct Connection { + config: Rc, + load_test_state: LoadTestState, + rng: SmallRng, stream: TcpStream, - read_buffer: [u8; 4096], - bytes_read: usize, - can_send: bool, + tls: ClientConnection, + response_buffer: [u8; 2048], + response_buffer_position: usize, + send_new_request: bool, + queued_responses: usize, } impl Connection { - pub fn create_and_register( - config: &Config, - connections: &mut ConnectionMap, - poll: &mut Poll, - token_counter: &mut usize, + async fn run( + config: Rc, + tls_config: Arc, + load_test_state: LoadTestState, + num_active_connections: Rc>, ) -> anyhow::Result<()> { - let mut stream = TcpStream::connect(config.server_address)?; + let stream = TcpStream::connect(config.server_address) + .await + .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; + let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); + let rng = SmallRng::from_entropy(); - poll.registry() - .register(&mut stream, Token(*token_counter), Interest::READABLE) - .unwrap(); - - let connection = Connection { + let mut connection = Connection { + config, + load_test_state, + rng, stream, - read_buffer: [0; 4096], - bytes_read: 0, - can_send: true, + tls, + response_buffer: [0; 2048], + response_buffer_position: 0, + send_new_request: true, + queued_responses: 0, }; - connections.insert(*token_counter, connection); + *num_active_connections.borrow_mut() += 1; - *token_counter = token_counter.wrapping_add(1); + println!("run connection"); + + if let Err(err) = connection.run_connection_loop().await { + eprintln!("connection error: {:?}", err); + } + + *num_active_connections.borrow_mut() -= 1; Ok(()) } - pub fn read_response(&mut self, state: &LoadTestState) -> bool { - // bool = remove connection + async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { - match self.stream.read(&mut self.read_buffer[self.bytes_read..]) { - Ok(0) => { - if self.bytes_read == self.read_buffer.len() { - eprintln!("read buffer is full"); + if self.send_new_request { + let request = + create_random_request(&self.config, &self.load_test_state, &mut self.rng); + + request.write(&mut self.tls.writer())?; + self.queued_responses += 1; + + self.send_new_request = false; + } + + self.write_tls().await?; + self.read_tls().await?; + } + } + + async fn read_tls(&mut self) -> anyhow::Result<()> { + loop { + let mut buf = [0u8; 1024]; + + let bytes_read = self.stream.read(&mut buf).await?; + + if bytes_read == 0 { + return Err(anyhow::anyhow!("Peer has closed connection")); + } + + self.load_test_state + .statistics + .bytes_received + .fetch_add(bytes_read, Ordering::SeqCst); + + let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); + + let io_state = self.tls.process_new_packets()?; + + let mut added_plaintext = false; + + if io_state.plaintext_bytes_to_read() != 0 { + loop { + match self.tls.reader().read(&mut buf) { + Ok(0) => { + break; + } + Ok(amt) => { + let end = self.response_buffer_position + amt; + + if end > self.response_buffer.len() { + return Err(anyhow::anyhow!("response too large")); + } else { + let response_buffer_slice = + &mut self.response_buffer[self.response_buffer_position..end]; + + response_buffer_slice.copy_from_slice(&buf[..amt]); + + self.response_buffer_position = end; + + added_plaintext = true; + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { + panic!("tls.reader().read: {}", err); + } } - - break true; } - Ok(bytes_read) => { - self.bytes_read += bytes_read; + } - let interesting_bytes = &self.read_buffer[..self.bytes_read]; + if added_plaintext { + let interesting_bytes = &self.response_buffer[..self.response_buffer_position]; - let mut opt_body_start_index = None; + let mut opt_body_start_index = None; - for (i, chunk) in interesting_bytes.windows(4).enumerate() { - if chunk == b"\r\n\r\n" { - opt_body_start_index = Some(i + 4); + for (i, chunk) in interesting_bytes.windows(4).enumerate() { + if chunk == b"\r\n\r\n" { + opt_body_start_index = Some(i + 4); + + break; + } + } + + if let Some(body_start_index) = opt_body_start_index { + let interesting_bytes = &interesting_bytes[body_start_index..]; + + match Response::from_bytes(interesting_bytes) { + Ok(response) => { + match response { + Response::Announce(_) => { + self.load_test_state + .statistics + .responses_announce + .fetch_add(1, Ordering::SeqCst); + } + Response::Scrape(_) => { + self.load_test_state + .statistics + .responses_scrape + .fetch_add(1, Ordering::SeqCst); + } + Response::Failure(response) => { + self.load_test_state + .statistics + .responses_failure + .fetch_add(1, Ordering::SeqCst); + println!( + "failure response: reason: {}", + response.failure_reason + ); + } + } + + self.response_buffer_position = 0; + self.send_new_request = true; break; } - } - - if let Some(body_start_index) = opt_body_start_index { - let interesting_bytes = &interesting_bytes[body_start_index..]; - - match Response::from_bytes(interesting_bytes) { - Ok(response) => { - state - .statistics - .bytes_received - .fetch_add(self.bytes_read, Ordering::SeqCst); - - match response { - Response::Announce(_) => { - state - .statistics - .responses_announce - .fetch_add(1, Ordering::SeqCst); - } - Response::Scrape(_) => { - state - .statistics - .responses_scrape - .fetch_add(1, Ordering::SeqCst); - } - Response::Failure(response) => { - state - .statistics - .responses_failure - .fetch_add(1, Ordering::SeqCst); - println!( - "failure response: reason: {}", - response.failure_reason - ); - } - } - - self.bytes_read = 0; - self.can_send = true; - } - Err(err) => { - eprintln!( - "deserialize response error with {} bytes read: {:?}, text: {}", - self.bytes_read, - err, - String::from_utf8_lossy(interesting_bytes) - ); - } + Err(err) => { + eprintln!( + "deserialize response error with {} bytes read: {:?}, text: {}", + self.response_buffer_position, + err, + String::from_utf8_lossy(interesting_bytes) + ); } } } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - break false; - } - Err(_) => { - self.bytes_read = 0; + } - break true; - } + if self.tls.wants_write() { + break; } } - } - - pub fn send_request( - &mut self, - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - request_buffer: &mut Cursor<&mut [u8]>, - ) -> bool { - // bool = remove connection - if !self.can_send { - return false; - } - - let request = create_random_request(&config, &state, rng); - - request_buffer.set_position(0); - request.write(request_buffer).unwrap(); - let position = request_buffer.position() as usize; - - match self.send_request_inner(state, &request_buffer.get_mut()[..position]) { - Ok(()) => { - state.statistics.requests.fetch_add(1, Ordering::SeqCst); - - self.can_send = false; - - false - } - Err(_) => true, - } - } - - fn send_request_inner( - &mut self, - state: &LoadTestState, - request: &[u8], - ) -> ::std::io::Result<()> { - let bytes_sent = self.stream.write(request)?; - - state - .statistics - .bytes_sent - .fetch_add(bytes_sent, Ordering::SeqCst); - - self.stream.flush()?; Ok(()) } - fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> { - poll.registry().deregister(&mut self.stream) - } -} - -pub type ConnectionMap = HashMap; - -pub fn run_socket_thread(config: &Config, state: LoadTestState, num_initial_requests: usize) { - let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); - let create_conn_interval = 2 ^ config.network.connection_creation_interval; - - let mut connections: ConnectionMap = HashMap::with_capacity(config.num_connections); - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut rng = SmallRng::from_entropy(); - let mut request_buffer = [0u8; 1024]; - let mut request_buffer = Cursor::new(&mut request_buffer[..]); - - let mut token_counter = 0usize; - - for _ in 0..num_initial_requests { - Connection::create_and_register(config, &mut connections, &mut poll, &mut token_counter) - .unwrap(); - } - - let mut iter_counter = 0usize; - let mut num_to_create = 0usize; - - let mut drop_connections = Vec::with_capacity(config.num_connections); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - for event in events.iter() { - if event.is_readable() { - let token = event.token(); - - if let Some(connection) = connections.get_mut(&token.0) { - // Note that this does not indicate successfully reading - // response - if connection.read_response(&state) { - remove_connection(&mut poll, &mut connections, token.0); - - num_to_create += 1; - } - } else { - eprintln!("connection not found: {:?}", token); - } - } - } - - for (k, connection) in connections.iter_mut() { - let remove_connection = - connection.send_request(config, &state, &mut rng, &mut request_buffer); - - if remove_connection { - drop_connections.push(*k); - } - } - - for k in drop_connections.drain(..) { - remove_connection(&mut poll, &mut connections, k); - - num_to_create += 1; - } - - let max_new = config.num_connections - connections.len(); - - if iter_counter % create_conn_interval == 0 { - num_to_create += 1; - } - - num_to_create = num_to_create.min(max_new); - - for _ in 0..num_to_create { - let ok = Connection::create_and_register( - config, - &mut connections, - &mut poll, - &mut token_counter, - ) - .is_ok(); - - if ok { - num_to_create -= 1; - } - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - -fn remove_connection(poll: &mut Poll, connections: &mut ConnectionMap, connection_id: usize) { - if let Some(mut connection) = connections.remove(&connection_id) { - if let Err(err) = connection.deregister(poll) { - eprintln!("couldn't deregister connection: {}", err); - } + async fn write_tls(&mut self) -> anyhow::Result<()> { + if !self.tls.wants_write() { + return Ok(()); + } + + let mut buf = Vec::new(); + let mut buf = Cursor::new(&mut buf); + + while self.tls.wants_write() { + self.tls.write_tls(&mut buf).unwrap(); + } + + let len = buf.get_ref().len(); + + self.stream.write_all(&buf.into_inner()).await?; + self.stream.flush().await?; + + self.load_test_state + .statistics + .bytes_sent + .fetch_add(len, Ordering::SeqCst); + + if self.queued_responses != 0 { + self.load_test_state + .statistics + .requests + .fetch_add(self.queued_responses, Ordering::SeqCst); + + self.queued_responses = 0; + } + + Ok(()) } } diff --git a/aquatic_http_protocol/src/request.rs b/aquatic_http_protocol/src/request.rs index 95f0fdd..9b8e2ee 100644 --- a/aquatic_http_protocol/src/request.rs +++ b/aquatic_http_protocol/src/request.rs @@ -238,6 +238,10 @@ impl Request { Ok(Request::Announce(request)) } else { + if info_hashes.is_empty() { + return Err(anyhow::anyhow!("No info hashes sent")); + } + let request = ScrapeRequest { info_hashes }; Ok(Request::Scrape(request)) diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 4dfed44..a7bf538 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -32,7 +32,6 @@ indexmap = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } parking_lot = "0.11" -privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index d05b338..dbda176 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; -use aquatic_common::access_list::AccessListConfig; +use aquatic_common::cpu_pinning::CpuPinningConfig; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -25,7 +26,7 @@ pub struct Config { pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, pub access_list: AccessListConfig, - pub core_affinity: CoreAffinityConfig, + pub cpu_pinning: CpuPinningConfig, } impl aquatic_cli_helpers::Config for Config { @@ -98,24 +99,6 @@ pub struct CleaningConfig { pub max_connection_age: u64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct PrivilegeConfig { - /// Chroot and switch user after binding to sockets - pub drop_privileges: bool, - /// Chroot to this path - pub chroot_path: String, - /// User to switch to after chrooting - pub user: String, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct CoreAffinityConfig { - pub set_affinities: bool, - pub offset: usize, -} - impl Default for Config { fn default() -> Self { Self { @@ -131,7 +114,7 @@ impl Default for Config { cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), access_list: AccessListConfig::default(), - core_affinity: CoreAffinityConfig::default(), + cpu_pinning: CpuPinningConfig::default(), } } } @@ -183,22 +166,3 @@ impl Default for CleaningConfig { } } } - -impl Default for PrivilegeConfig { - fn default() -> Self { - Self { - drop_privileges: false, - chroot_path: ".".to_string(), - user: "nobody".to_string(), - } - } -} - -impl Default for CoreAffinityConfig { - fn default() -> Self { - Self { - set_affinities: false, - offset: 0, - } - } -} diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index d1d398c..4b3d80d 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -1,3 +1,4 @@ +use std::borrow::Borrow; use std::cell::RefCell; use std::rc::Rc; @@ -8,18 +9,22 @@ use glommio::prelude::*; use crate::common::*; use crate::config::Config; -pub async fn update_access_list(config: Config, access_list: Rc>) { - if config.access_list.mode.is_on() { - match BufferedFile::open(config.access_list.path).await { +pub async fn update_access_list>( + config: C, + access_list: Rc>, +) { + if config.borrow().access_list.mode.is_on() { + match BufferedFile::open(&config.borrow().access_list.path).await { Ok(file) => { let mut reader = StreamReaderBuilder::new(file).build(); + let mut new_access_list = AccessList::default(); loop { let mut buf = String::with_capacity(42); match reader.read_line(&mut buf).await { Ok(_) => { - if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) { + if let Err(err) = new_access_list.insert_from_line(&buf) { ::log::error!( "Couln't parse access list line '{}': {:?}", buf, @@ -36,6 +41,8 @@ pub async fn update_access_list(config: Config, access_list: Rc { ::log::error!("Couldn't open access list file: {:?}", err) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index d95121f..72a8eb8 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,11 +1,11 @@ use std::sync::{atomic::AtomicUsize, Arc}; use aquatic_common::access_list::AccessList; +use aquatic_common::privileges::drop_privileges_after_socket_binding; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; use crate::config::Config; -use crate::drop_privileges_after_socket_binding; mod common; pub mod handlers; @@ -14,9 +14,9 @@ pub mod network; pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> anyhow::Result<()> { - if config.core_affinity.set_affinities { + if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { - id: config.core_affinity.offset, + id: config.cpu_pinning.offset, }); } @@ -44,8 +44,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); - if config.core_affinity.set_affinities { - builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + i); + if config.cpu_pinning.active { + builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i); } let executor = builder.spawn(|| async move { @@ -70,9 +70,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); - if config.core_affinity.set_affinities { - builder = - builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); + if config.cpu_pinning.active { + builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i); } let executor = builder.spawn(|| async move { @@ -88,7 +87,12 @@ pub fn run(config: Config) -> anyhow::Result<()> { executors.push(executor); } - drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); for executor in executors { executor diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index e3eea68..34e25a8 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,11 +1,3 @@ -use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, -}; - use cfg_if::cfg_if; pub mod common; @@ -16,7 +8,6 @@ pub mod glommio; pub mod mio; use config::Config; -use privdrop::PrivDrop; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; @@ -29,35 +20,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } } } - -fn drop_privileges_after_socket_binding( - config: &Config, - num_bound_sockets: Arc, -) -> anyhow::Result<()> { - if config.privileges.drop_privileges { - let mut counter = 0usize; - - loop { - let sockets = num_bound_sockets.load(Ordering::SeqCst); - - if sockets == config.socket_workers { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); - } - } - } - - Ok(()) -} diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 5c5f649..1b8e656 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -6,6 +6,7 @@ use std::{ }; use anyhow::Context; +use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; pub mod common; @@ -16,14 +17,13 @@ pub mod tasks; use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; use crate::config::Config; -use crate::drop_privileges_after_socket_binding; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { - if config.core_affinity.set_affinities { + if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { - id: config.core_affinity.offset, + id: config.cpu_pinning.offset, }); } @@ -35,7 +35,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); @@ -66,9 +71,9 @@ pub fn start_workers( Builder::new() .name(format!("request-{:02}", i + 1)) .spawn(move || { - if config.core_affinity.set_affinities { + if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { - id: config.core_affinity.offset + 1 + i, + id: config.cpu_pinning.offset + 1 + i, }); } @@ -87,9 +92,9 @@ pub fn start_workers( Builder::new() .name(format!("socket-{:02}", i + 1)) .spawn(move || { - if config.core_affinity.set_affinities { + if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { - id: config.core_affinity.offset + 1 + config.request_workers + i, + id: config.cpu_pinning.offset + 1 + config.request_workers + i, }); } @@ -112,9 +117,9 @@ pub fn start_workers( Builder::new() .name("statistics-collector".to_string()) .spawn(move || { - if config.core_affinity.set_affinities { + if config.cpu_pinning.active { core_affinity::set_for_current(core_affinity::CoreId { - id: config.core_affinity.offset, + id: config.cpu_pinning.offset, }); } diff --git a/aquatic_ws/src/lib/config.rs b/aquatic_ws/src/lib/config.rs index b7ef10c..c0cd032 100644 --- a/aquatic_ws/src/lib/config.rs +++ b/aquatic_ws/src/lib/config.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use aquatic_common::access_list::AccessListConfig; +use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; use serde::{Deserialize, Serialize}; use aquatic_cli_helpers::LogLevel; @@ -84,17 +84,6 @@ pub struct StatisticsConfig { pub interval: u64, } -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct PrivilegeConfig { - /// Chroot and switch user after binding to sockets - pub drop_privileges: bool, - /// Chroot to this path - pub chroot_path: String, - /// User to switch to after chrooting - pub user: String, -} - impl Default for Config { fn default() -> Self { Self { @@ -162,13 +151,3 @@ impl Default for StatisticsConfig { Self { interval: 0 } } } - -impl Default for PrivilegeConfig { - fn default() -> Self { - Self { - drop_privileges: false, - chroot_path: ".".to_string(), - user: "nobody".to_string(), - } - } -} diff --git a/scripts/gen-tls.sh b/scripts/gen-tls.sh new file mode 100755 index 0000000..5fd4512 --- /dev/null +++ b/scripts/gen-tls.sh @@ -0,0 +1,16 @@ +#/bin/bash + +set -e + +mkdir -p tmp/tls + +cd tmp/tls + +openssl ecparam -genkey -name prime256v1 -out key.pem +openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com" +openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt + +sudo cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt +sudo update-ca-certificates + +openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8