Merge pull request #13 from greatest-ape/http-glommio

aquatic_http: replace mio implementation with new glommio implementation
This commit is contained in:
Joakim Frostegård 2021-10-29 08:20:42 +02:00 committed by GitHub
commit ca644ba2df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1773 additions and 1932 deletions

View file

@ -1,8 +1,8 @@
name: 'test-transfer'
description: 'test aquatic file transfer'
outputs:
http_ipv4:
description: 'HTTP IPv4 status'
# http_ipv4:
# description: 'HTTP IPv4 status'
http_tls_ipv4:
description: 'HTTP IPv4 over TLS status'
udp_ipv4:

View file

@ -50,6 +50,7 @@ $SUDO echo "127.0.0.1 example.com" >> /etc/hosts
openssl ecparam -genkey -name prime256v1 -out key.pem
openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com"
openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt
openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8 # rustls
$SUDO cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt
$SUDO update-ca-certificates
@ -60,19 +61,19 @@ openssl pkcs12 -export -passout "pass:p" -out identity.pfx -inkey key.pem -in ce
cargo build --bin aquatic
echo "log_level = 'debug'
[network]
address = '127.0.0.1:3000'" > http.toml
./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 &
# echo "log_level = 'debug'
#
# [network]
# address = '127.0.0.1:3000'" > http.toml
# ./target/debug/aquatic http -c http.toml > "$HOME/http.log" 2>&1 &
echo "log_level = 'debug'
[network]
address = '127.0.0.1:3001'
use_tls = true
tls_pkcs12_path = './identity.pfx'
tls_pkcs12_password = 'p'
tls_certificate_path = './cert.crt'
tls_private_key_path = './key.pk8'
" > tls.toml
./target/debug/aquatic http -c tls.toml > "$HOME/tls.log" 2>&1 &
@ -100,12 +101,12 @@ mkdir torrents
# Create torrents
echo "http-test-ipv4" > seed/http-test-ipv4
# echo "http-test-ipv4" > seed/http-test-ipv4
echo "tls-test-ipv4" > seed/tls-test-ipv4
echo "udp-test-ipv4" > seed/udp-test-ipv4
echo "wss-test-ipv4" > seed/wss-test-ipv4
mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4"
# mktorrent -p -o "torrents/http-ipv4.torrent" -a "http://127.0.0.1:3000/announce" "seed/http-test-ipv4"
mktorrent -p -o "torrents/tls-ipv4.torrent" -a "https://example.com:3001/announce" "seed/tls-test-ipv4"
mktorrent -p -o "torrents/udp-ipv4.torrent" -a "udp://127.0.0.1:3000" "seed/udp-test-ipv4"
mktorrent -p -o "torrents/wss-ipv4.torrent" -a "wss://example.com:3002" "seed/wss-test-ipv4"
@ -148,7 +149,7 @@ cd ..
# Check for completion
HTTP_IPv4="Failed"
# HTTP_IPv4="Ok"
TLS_IPv4="Failed"
UDP_IPv4="Failed"
WSS_IPv4="Failed"
@ -159,14 +160,14 @@ echo "Watching for finished files.."
while [ $i -lt 60 ]
do
if test -f "leech/http-test-ipv4"; then
if grep -q "http-test-ipv4" "leech/http-test-ipv4"; then
if [ "$HTTP_IPv4" != "Ok" ]; then
HTTP_IPv4="Ok"
echo "HTTP_IPv4 is Ok"
fi
fi
fi
# if test -f "leech/http-test-ipv4"; then
# if grep -q "http-test-ipv4" "leech/http-test-ipv4"; then
# if [ "$HTTP_IPv4" != "Ok" ]; then
# HTTP_IPv4="Ok"
# echo "HTTP_IPv4 is Ok"
# fi
# fi
# fi
if test -f "leech/tls-test-ipv4"; then
if grep -q "tls-test-ipv4" "leech/tls-test-ipv4"; then
if [ "$TLS_IPv4" != "Ok" ]; then
@ -192,7 +193,8 @@ do
fi
fi
if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then
# if [ "$HTTP_IPv4" = "Ok" ] && [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then
if [ "$TLS_IPv4" = "Ok" ] && [ "$UDP_IPv4" = "Ok" ] && [ "$WSS_IPv4" = "Ok" ]; then
break
fi
@ -203,14 +205,14 @@ done
echo "Waited for $i seconds"
echo "::set-output name=http_ipv4::$HTTP_IPv4"
# echo "::set-output name=http_ipv4::$HTTP_IPv4"
echo "::set-output name=http_tls_ipv4::$TLS_IPv4"
echo "::set-output name=udp_ipv4::$UDP_IPv4"
echo "::set-output name=wss_ipv4::$WSS_IPv4"
echo ""
echo "# --- HTTP log --- #"
cat "http.log"
# echo ""
# echo "# --- HTTP log --- #"
# cat "http.log"
sleep 1
@ -246,11 +248,12 @@ sleep 1
echo ""
echo "# --- Test results --- #"
echo "HTTP (IPv4): $HTTP_IPv4"
# echo "HTTP (IPv4): $HTTP_IPv4"
echo "HTTP over TLS (IPv4): $TLS_IPv4"
echo "UDP (IPv4): $UDP_IPv4"
echo "WSS (IPv4): $WSS_IPv4"
if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then
# if [ "$HTTP_IPv4" != "Ok" ] || [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then
if [ "$TLS_IPv4" != "Ok" ] || [ "$UDP_IPv4" != "Ok" ] || [ "$WSS_IPv4" != "Ok" ]; then
exit 1
fi

View file

@ -1,4 +1,4 @@
name: "Test HTTP, UDP and WSS file transfer"
name: "Test UDP, TLS and WSS file transfer"
on:
push:
@ -9,7 +9,7 @@ on:
jobs:
test-transfer-http:
runs-on: ubuntu-latest
name: "Test BitTorrent file transfer over HTTP (with and without TLS), UDP and WSS"
name: "Test BitTorrent file transfer over UDP, TLS and WSS"
timeout-minutes: 20
container:
image: rust:1-bullseye

86
Cargo.lock generated
View file

@ -79,6 +79,7 @@ dependencies = [
"hashbrown 0.11.2",
"hex",
"indexmap",
"privdrop",
"rand",
"serde",
]
@ -91,25 +92,27 @@ dependencies = [
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_http_protocol",
"crossbeam-channel",
"cfg-if",
"core_affinity",
"either",
"futures-lite",
"glommio",
"hashbrown 0.11.2",
"histogram",
"indexmap",
"itoa",
"log",
"memchr",
"mimalloc",
"mio",
"native-tls",
"parking_lot",
"privdrop",
"quickcheck",
"quickcheck_macros",
"rand",
"rustls",
"rustls-pemfile",
"serde",
"slab",
"smartstring",
"socket2 0.4.2",
]
[[package]]
@ -119,13 +122,15 @@ dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_http_protocol",
"futures-lite",
"glommio",
"hashbrown 0.11.2",
"mimalloc",
"mio",
"quickcheck",
"quickcheck_macros",
"rand",
"rand_distr",
"rustls",
"serde",
]
@ -172,7 +177,6 @@ dependencies = [
"mimalloc",
"mio",
"parking_lot",
"privdrop",
"quickcheck",
"quickcheck_macros",
"rand",
@ -1542,6 +1546,21 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi 0.3.9",
]
[[package]]
name = "rlimit"
version = "0.6.2"
@ -1566,6 +1585,27 @@ dependencies = [
"semver",
]
[[package]]
name = "rustls"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95"
dependencies = [
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64",
]
[[package]]
name = "ryu"
version = "1.0.5"
@ -1603,6 +1643,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.4.2"
@ -1777,6 +1827,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "static_assertions"
version = "1.1.0"
@ -1996,6 +2052,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.2.2"
@ -2131,6 +2193,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "winapi"
version = "0.2.8"

152
README.md
View file

@ -2,70 +2,77 @@
[![CargoBuildAndTest](https://github.com/greatest-ape/aquatic/actions/workflows/cargo-build-and-test.yml/badge.svg)](https://github.com/greatest-ape/aquatic/actions/workflows/cargo-build-and-test.yml) [![Test HTTP, UDP and WSS file transfer](https://github.com/greatest-ape/aquatic/actions/workflows/test-transfer.yml/badge.svg)](https://github.com/greatest-ape/aquatic/actions/workflows/test-transfer.yml)
Blazingly fast, multi-threaded BitTorrent tracker written in Rust.
Blazingly fast, multi-threaded BitTorrent tracker written in Rust, consisting
of sub-implementations for different protocols:
Consists of three sub-implementations for different protocols:
* `aquatic_udp`: BitTorrent over UDP. Implementation achieves 45% higher throughput
than opentracker (see benchmarks below)
* `aquatic_http`: BitTorrent over HTTP/TLS (slightly experimental)
* `aquatic_ws`: WebTorrent (experimental)
[BitTorrent over UDP]: https://libtorrent.org/udp_tracker_protocol.html
[BitTorrent over HTTP]: https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol
[WebTorrent]: https://github.com/webtorrent
[rustls]: https://github.com/rustls/rustls
[native-tls]: https://github.com/sfackler/rust-native-tls
[mio]: https://github.com/tokio-rs/mio
[glommio]: https://github.com/DataDog/glommio
## Copyright and license
| Name | Protocol | OS requirements |
|--------------|------------------------------------------------|-----------------------------------------------------------------|
| aquatic_udp | [BitTorrent over UDP] | Cross-platform with [mio] (default) / Linux 5.8+ with [glommio] |
| aquatic_http | [BitTorrent over HTTP] with TLS ([rustls]) | Linux 5.8+ |
| aquatic_ws | [WebTorrent], plain or with TLS ([native-tls]) | Cross-platform |
Copyright (c) 2020-2021 Joakim Frostegård
## Usage
Distributed under Apache 2.0 license (details in `LICENSE` file.)
## Installation prerequisites
### Prerequisites
- Install Rust with [rustup](https://rustup.rs/) (stable is recommended)
- Install cmake with your package manager (e.g., `apt-get install cmake`)
- On GNU/Linux, also install the OpenSSL components necessary for dynamic
linking (e.g., `apt-get install libssl-dev`)
- Clone the git repository and refer to the next section.
- If you want to run aquatic_ws and are on Linux or BSD, install OpenSSL
components necessary for dynamic linking (e.g., `apt-get install libssl-dev`)
- Clone this git repository and enter it
## Compile and run
### Compiling
To compile the master executable for all protocols, run:
Compile the implementations that you are interested in:
```sh
./scripts/build-aquatic.sh
cargo build --release -p aquatic_udp
cargo build --release -p aquatic_udp --features "with-glommio" --no-default-features
cargo build --release -p aquatic_http
cargo build --release -p aquatic_ws
```
To start the tracker for a protocol with default settings, run:
### Running
Begin by generating configuration files. They differ between protocols.
```sh
./target/release/aquatic udp
./target/release/aquatic http
./target/release/aquatic ws
./target/release/aquatic_udp -p > "aquatic-udp-config.toml"
./target/release/aquatic_http -p > "aquatic-http-config.toml"
./target/release/aquatic_ws -p > "aquatic-ws-config.toml"
```
To print default settings to standard output, pass the "-p" flag to the binary:
Make adjustments to the files. The values you will most likely want to adjust
are `socket_workers` (number of threads reading from and writing to sockets)
and `address` under the `network` section (listening address). This goes for
all three protocols.
`aquatic_http` requires configuring a TLS certificate file and a private key file
to run. More information is available futher down in this document.
Once done, run the tracker:
```sh
./target/release/aquatic udp -p
./target/release/aquatic http -p
./target/release/aquatic ws -p
./target/release/aquatic_udp -c "aquatic-udp-config.toml"
./target/release/aquatic_http -c "aquatic-http-config.toml"
./target/release/aquatic_ws -c "aquatic-ws-config.toml"
```
Note that the configuration files differ between protocols.
More documentation of configuration file values might be available under
`src/lib/config.rs` in crates `aquatic_udp`, `aquatic_http`, `aquatic_ws`.
To adjust the settings, save the output of the relevant previous command to a
file and make your changes. Then run `aquatic` with a "-c" argument pointing to
the file, e.g.:
#### General settings
```sh
./target/release/aquatic udp -c "/path/to/aquatic-udp-config.toml"
./target/release/aquatic http -c "/path/to/aquatic-http-config.toml"
./target/release/aquatic ws -c "/path/to/aquatic-ws-config.toml"
```
The configuration file values you will most likely want to adjust are
`socket_workers` (number of threads reading from and writing to sockets) and
`address` under the `network` section (listening address). This goes for all
three protocols.
Access control by info hash is supported for all protocols. Relevant part of configuration:
Access control by info hash is supported for all protocols. The relevant part
of configuration is:
```toml
[access_list]
@ -73,9 +80,6 @@ mode = 'off' # Change to 'black' (blacklist) or 'white' (whitelist)
path = '' # Path to text file with newline-delimited hex-encoded info hashes
```
Some more documentation of configuration file values might be available under
`src/lib/config.rs` in crates `aquatic_udp`, `aquatic_http`, `aquatic_ws`.
## Details on implementations
### aquatic_udp: UDP BitTorrent tracker
@ -121,20 +125,14 @@ There is an alternative implementation that utilizes [io_uring] by running on
[glommio]. It only runs on Linux and requires a recent kernel (version 5.8 or later).
In some cases, it performs even better than the cross-platform implementation.
To use it, pass the `with-glommio` feature when building, e.g.:
```sh
cargo build --release -p aquatic_udp --features "with-glommio" --no-default-features
./target/release/aquatic_udp
```
### aquatic_http: HTTP BitTorrent tracker
Aims for compatibility with the HTTP BitTorrent protocol, as described
[here](https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol),
including TLS and scrape request support. There are some exceptions:
[HTTP BitTorrent protocol]: https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol
* Doesn't track of the number of torrent downloads (0 is always sent).
Aims for compatibility with the [HTTP BitTorrent protocol], with some exceptions:
* Only runs over TLS
* Doesn't track of the number of torrent downloads (0 is always sent)
* Doesn't allow full scrapes, i.e. of all registered info hashes
`aquatic_http` has not been tested as much as `aquatic_udp` but likely works
@ -142,6 +140,28 @@ fine.
#### TLS
A TLS certificate file (DER-encoded X.509) and a corresponding private key file
(DER-encoded ASN.1 in either PKCS#8 or PKCS#1 format) are required. Set their
paths in the configuration file, e.g.:
```toml
[network]
address = '0.0.0.0:3000'
tls_certificate_path = './cert.crt'
tls_private_key_path = './key.pk8'
```
### aquatic_ws: WebTorrent tracker
Aims for compatibility with [WebTorrent](https://github.com/webtorrent)
clients, including `wss` protocol support (WebSockets over TLS), with some
exceptions:
* Doesn't track of the number of torrent downloads (0 is always sent).
* Doesn't allow full scrapes, i.e. of all registered info hashes
#### TLS
To run over TLS, a pkcs12 file (`.pkx`) is needed. It can be generated from
Let's Encrypt certificates as follows, assuming you are in the directory where
they are stored:
@ -154,24 +174,14 @@ Enter a password when prompted. Then move `identity.pfx` somewhere suitable,
and enter the path into the tracker configuration field `tls_pkcs12_path`. Set
the password in the field `tls_pkcs12_password` and set `use_tls` to true.
### aquatic_ws: WebTorrent tracker
Aims for compatibility with [WebTorrent](https://github.com/webtorrent)
clients, including `wss` protocol support (WebSockets over TLS), with some
exceptions:
* Doesn't track of the number of torrent downloads (0 is always sent).
* Doesn't allow full scrapes, i.e. of all registered info hashes
For information about running over TLS, please refer to the TLS subsection
of the `aquatic_http` section above.
#### Benchmarks
[wt-tracker]: https://github.com/Novage/wt-tracker
[bittorrent-tracker]: https://github.com/webtorrent/bittorrent-tracker
The following benchmark is not very realistic, as it simulates a small number of clients, each sending a large number of requests. Nonetheless, I think that it gives a useful indication of relative performance.
The following benchmark is not very realistic, as it simulates a small number
of clients, each sending a large number of requests. Nonetheless, I think that
it gives a useful indication of relative performance.
Server responses per second, best result in bold:
@ -220,6 +230,12 @@ This design means little waiting for locks on internal state occurs,
while network work can be efficiently distributed over multiple threads,
making use of SO_REUSEPORT setting.
## Copyright and license
Copyright (c) 2020-2021 Joakim Frostegård
Distributed under Apache 2.0 license (details in `LICENSE` file.)
## Trivia
The tracker is called aquatic because it thrives under a torrent of bits ;-)

17
TODO.md
View file

@ -1,5 +1,22 @@
# TODO
* aquatic_http glommio:
* optimize?
* get_peer_addr only once (takes 1.2% of runtime)
* queue response: allocating takes 2.8% of runtime
* clean out connections regularly
* timeout inside of task for "it took to long to receive request, send response"?
* handle panicked/cancelled tasks
* consider better error type for request parsing, so that better error
messages can be sent back (e.g., "full scrapes are not supported")
* Scrape: should stats with only zeroes be sent back for non-registered info hashes?
Relevant for mio implementation too.
* Don't return read request immediately. Set it as self.read_request
and continue looping to wait for any new input. Then check after
read_tls is finished. This might prevent issues when using plain HTTP
where only part of request is read, but that part is valid, and reading
is stopped, which might lead to various issues.
* aquatic_udp glommio
* Add to file transfer CI
* consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest)

View file

@ -16,5 +16,6 @@ arc-swap = "1"
hashbrown = "0.11.2"
hex = "0.4"
indexmap = "1"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }

View file

@ -0,0 +1,17 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct CpuPinningConfig {
pub active: bool,
pub offset: usize,
}
impl Default for CpuPinningConfig {
fn default() -> Self {
Self {
active: false,
offset: 0,
}
}
}

View file

@ -5,6 +5,8 @@ use indexmap::IndexMap;
use rand::Rng;
pub mod access_list;
pub mod cpu_pinning;
pub mod privileges;
/// Peer or connection valid until this instant
///

View file

@ -0,0 +1,64 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use privdrop::PrivDrop;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct PrivilegeConfig {
/// Chroot and switch user after binding to sockets
pub drop_privileges: bool,
/// Chroot to this path
pub chroot_path: String,
/// User to switch to after chrooting
pub user: String,
}
impl Default for PrivilegeConfig {
fn default() -> Self {
Self {
drop_privileges: false,
chroot_path: ".".to_string(),
user: "nobody".to_string(),
}
}
}
pub fn drop_privileges_after_socket_binding(
config: &PrivilegeConfig,
num_bound_sockets: Arc<AtomicUsize>,
target_num: usize,
) -> anyhow::Result<()> {
if config.drop_privileges {
let mut counter = 0usize;
loop {
let num_bound = num_bound_sockets.load(Ordering::SeqCst);
if num_bound == target_num {
PrivDrop::default()
.chroot(config.chroot_path.clone())
.user(config.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
}
}
}
Ok(())
}

View file

@ -20,23 +20,25 @@ anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_http_protocol = "0.1.0"
crossbeam-channel = "0.5"
cfg-if = "1"
core_affinity = "0.5"
either = "1"
futures-lite = "1"
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" }
hashbrown = "0.11.2"
histogram = "0.6"
indexmap = "1"
itoa = "0.4"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
memchr = "2"
mio = { version = "0.7", features = ["tcp", "os-poll", "os-util"] }
native-tls = "0.2"
parking_lot = "0.11"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
rustls = "0.20"
rustls-pemfile = "0.2"
serde = { version = "1", features = ["derive"] }
slab = "0.4"
smartstring = "0.2"
socket2 = { version = "0.4.1", features = ["all"] }
[dev-dependencies]
quickcheck = "1.0"

View file

@ -1,27 +1,124 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::{AccessList, AccessListArcSwap};
use crossbeam_channel::{Receiver, Sender};
use aquatic_common::access_list::AccessList;
use either::Either;
use hashbrown::HashMap;
use indexmap::IndexMap;
use log::error;
use mio::Token;
use parking_lot::Mutex;
use smartstring::{LazyCompact, SmartString};
pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil};
use aquatic_http_protocol::common::*;
use aquatic_http_protocol::request::Request;
use aquatic_http_protocol::response::{Response, ResponsePeer};
use aquatic_http_protocol::response::ResponsePeer;
use crate::config::Config;
pub const LISTENER_TOKEN: Token = Token(0);
pub const CHANNEL_TOKEN: Token = Token(1);
use std::borrow::Borrow;
use std::cell::RefCell;
use std::rc::Rc;
use futures_lite::AsyncBufReadExt;
use glommio::io::{BufferedFile, StreamReaderBuilder};
use glommio::prelude::*;
use aquatic_http_protocol::{
request::{AnnounceRequest, ScrapeRequest},
response::{AnnounceResponse, ScrapeResponse},
};
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);
#[derive(Clone, Copy, Debug)]
pub struct ConnectionId(pub usize);
#[derive(Debug)]
pub enum ChannelRequest {
Announce {
request: AnnounceRequest,
peer_addr: SocketAddr,
connection_id: ConnectionId,
response_consumer_id: ConsumerId,
},
Scrape {
request: ScrapeRequest,
peer_addr: SocketAddr,
connection_id: ConnectionId,
response_consumer_id: ConsumerId,
},
}
#[derive(Debug)]
pub enum ChannelResponse {
Announce {
response: AnnounceResponse,
peer_addr: SocketAddr,
connection_id: ConnectionId,
},
Scrape {
response: ScrapeResponse,
peer_addr: SocketAddr,
connection_id: ConnectionId,
},
}
impl ChannelResponse {
pub fn get_connection_id(&self) -> ConnectionId {
match self {
Self::Announce { connection_id, .. } => *connection_id,
Self::Scrape { connection_id, .. } => *connection_id,
}
}
pub fn get_peer_addr(&self) -> SocketAddr {
match self {
Self::Announce { peer_addr, .. } => *peer_addr,
Self::Scrape { peer_addr, .. } => *peer_addr,
}
}
}
pub async fn update_access_list<C: Borrow<Config>>(
config: C,
access_list: Rc<RefCell<AccessList>>,
) {
if config.borrow().access_list.mode.is_on() {
match BufferedFile::open(&config.borrow().access_list.path).await {
Ok(file) => {
let mut reader = StreamReaderBuilder::new(file).build();
let mut new_access_list = AccessList::default();
loop {
let mut buf = String::with_capacity(42);
match reader.read_line(&mut buf).await {
Ok(_) => {
if let Err(err) = new_access_list.insert_from_line(&buf) {
::log::error!(
"Couln't parse access list line '{}': {:?}",
buf,
err
);
}
}
Err(err) => {
::log::error!("Couln't read access list line {:?}", err);
break;
}
}
yield_if_needed().await;
}
*access_list.borrow_mut() = new_access_list;
}
Err(err) => {
::log::error!("Couldn't open access list file: {:?}", err)
}
};
}
}
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
@ -32,15 +129,16 @@ impl Ip for Ipv6Addr {}
pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker.
pub worker_index: usize,
pub response_consumer_id: ConsumerId,
pub peer_addr: SocketAddr,
pub poll_token: Token,
/// Connection id local to socket worker
pub connection_id: ConnectionId,
}
#[derive(Clone, Copy, Debug)]
pub struct PeerConnectionMeta<I: Ip> {
pub worker_index: usize,
pub poll_token: Token,
pub response_consumer_id: ConsumerId,
pub connection_id: ConnectionId,
pub peer_ip_address: I,
}
@ -118,14 +216,14 @@ pub struct TorrentMaps {
}
impl TorrentMaps {
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessList>) {
pub fn clean(&mut self, config: &Config, access_list: &AccessList) {
Self::clean_torrent_map(config, access_list, &mut self.ipv4);
Self::clean_torrent_map(config, access_list, &mut self.ipv6);
}
fn clean_torrent_map<I: Ip>(
config: &Config,
access_list: &Arc<AccessList>,
access_list: &AccessList,
torrent_map: &mut TorrentMap<I>,
) {
let now = Instant::now();
@ -163,42 +261,34 @@ impl TorrentMaps {
}
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrent_maps: Arc<Mutex<TorrentMaps>>,
}
pub fn num_digits_in_usize(mut number: usize) -> usize {
let mut num_digits = 1usize;
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(Default::default()),
torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())),
}
}
}
while number >= 10 {
num_digits += 1;
pub type RequestChannelSender = Sender<(ConnectionMeta, Request)>;
pub type RequestChannelReceiver = Receiver<(ConnectionMeta, Request)>;
pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>;
#[derive(Clone)]
pub struct ResponseChannelSender {
senders: Vec<Sender<(ConnectionMeta, Response)>>,
}
impl ResponseChannelSender {
pub fn new(senders: Vec<Sender<(ConnectionMeta, Response)>>) -> Self {
Self { senders }
number /= 10;
}
#[inline]
pub fn send(&self, meta: ConnectionMeta, message: Response) {
if let Err(err) = self.senders[meta.worker_index].send((meta, message)) {
error!("ResponseChannelSender: couldn't send message: {:?}", err);
}
}
num_digits
}
pub type SocketWorkerStatus = Option<Result<(), String>>;
pub type SocketWorkerStatuses = Arc<Mutex<Vec<SocketWorkerStatus>>>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_num_digits_in_usize() {
let f = num_digits_in_usize;
assert_eq!(f(0), 1);
assert_eq!(f(1), 1);
assert_eq!(f(9), 1);
assert_eq!(f(10), 2);
assert_eq!(f(11), 2);
assert_eq!(f(99), 2);
assert_eq!(f(100), 3);
assert_eq!(f(101), 3);
assert_eq!(f(1000), 4);
}
}

View file

@ -1,6 +1,7 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::access_list::AccessListConfig;
use aquatic_common::cpu_pinning::CpuPinningConfig;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::{Deserialize, Serialize};
use aquatic_cli_helpers::LogLevel;
@ -18,11 +19,10 @@ pub struct Config {
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
pub handlers: HandlerConfig,
pub cleaning: CleaningConfig,
pub statistics: StatisticsConfig,
pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig,
pub cpu_pinning: CpuPinningConfig,
}
impl aquatic_cli_helpers::Config for Config {
@ -31,25 +31,15 @@ impl aquatic_cli_helpers::Config for Config {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TlsConfig {
pub use_tls: bool,
pub tls_pkcs12_path: String,
pub tls_pkcs12_password: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
pub tls_certificate_path: PathBuf,
pub tls_private_key_path: PathBuf,
pub ipv6_only: bool,
#[serde(flatten)]
pub tls: TlsConfig,
pub keep_alive: bool,
pub poll_event_capacity: usize,
pub poll_timeout_microseconds: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -63,15 +53,6 @@ pub struct ProtocolConfig {
pub peer_announce_interval: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct HandlerConfig {
/// Maximum number of requests to receive from channel before locking
/// mutex and starting work
pub max_requests_per_iter: usize,
pub channel_recv_timeout_microseconds: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
@ -83,24 +64,6 @@ pub struct CleaningConfig {
pub max_connection_age: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct StatisticsConfig {
/// Print statistics this often (seconds). Don't print when set to zero.
pub interval: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct PrivilegeConfig {
/// Chroot and switch user after binding to sockets
pub drop_privileges: bool,
/// Chroot to this path
pub chroot_path: String,
/// User to switch to after chrooting
pub user: String,
}
impl Default for Config {
fn default() -> Self {
Self {
@ -109,11 +72,10 @@ impl Default for Config {
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
handlers: HandlerConfig::default(),
cleaning: CleaningConfig::default(),
statistics: StatisticsConfig::default(),
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
cpu_pinning: Default::default(),
}
}
}
@ -122,11 +84,10 @@ impl Default for NetworkConfig {
fn default() -> Self {
Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)),
tls_certificate_path: "".into(),
tls_private_key_path: "".into(),
ipv6_only: false,
tls: TlsConfig::default(),
keep_alive: true,
poll_event_capacity: 4096,
poll_timeout_microseconds: 200_000,
}
}
}
@ -141,15 +102,6 @@ impl Default for ProtocolConfig {
}
}
impl Default for HandlerConfig {
fn default() -> Self {
Self {
max_requests_per_iter: 10_000,
channel_recv_timeout_microseconds: 200,
}
}
}
impl Default for CleaningConfig {
fn default() -> Self {
Self {
@ -159,29 +111,3 @@ impl Default for CleaningConfig {
}
}
}
impl Default for StatisticsConfig {
fn default() -> Self {
Self { interval: 0 }
}
}
impl Default for PrivilegeConfig {
fn default() -> Self {
Self {
drop_privileges: false,
chroot_path: ".".to_string(),
user: "nobody".to_string(),
}
}
}
impl Default for TlsConfig {
fn default() -> Self {
Self {
use_tls: false,
tls_pkcs12_path: "".into(),
tls_pkcs12_password: "".into(),
}
}
}

View file

@ -1,311 +0,0 @@
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::Duration;
use std::vec::Drain;
use either::Either;
use mio::Waker;
use parking_lot::MutexGuard;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use aquatic_common::extract_response_peers;
use aquatic_http_protocol::request::*;
use aquatic_http_protocol::response::*;
use crate::common::*;
use crate::config::Config;
pub fn run_request_worker(
config: Config,
state: State,
request_channel_receiver: RequestChannelReceiver,
response_channel_sender: ResponseChannelSender,
wakers: Vec<Arc<Waker>>,
) {
let mut wake_socket_workers: Vec<bool> = (0..config.socket_workers).map(|_| false).collect();
let mut announce_requests = Vec::new();
let mut scrape_requests = Vec::new();
let mut rng = SmallRng::from_entropy();
let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds);
loop {
let mut opt_torrent_map_guard: Option<MutexGuard<TorrentMaps>> = None;
// If torrent state mutex is locked, just keep collecting requests
// and process them later. This can happen with either multiple
// request workers or while cleaning is underway.
for i in 0..config.handlers.max_requests_per_iter {
let opt_in_message = if i == 0 {
request_channel_receiver.recv().ok()
} else {
request_channel_receiver.recv_timeout(timeout).ok()
};
match opt_in_message {
Some((meta, Request::Announce(r))) => {
announce_requests.push((meta, r));
}
Some((meta, Request::Scrape(r))) => {
scrape_requests.push((meta, r));
}
None => {
if let Some(torrent_guard) = state.torrent_maps.try_lock() {
opt_torrent_map_guard = Some(torrent_guard);
break;
}
}
}
}
let mut torrent_map_guard =
opt_torrent_map_guard.unwrap_or_else(|| state.torrent_maps.lock());
handle_announce_requests(
&config,
&mut rng,
&mut torrent_map_guard,
&response_channel_sender,
&mut wake_socket_workers,
announce_requests.drain(..),
);
handle_scrape_requests(
&config,
&mut torrent_map_guard,
&response_channel_sender,
&mut wake_socket_workers,
scrape_requests.drain(..),
);
for (worker_index, wake) in wake_socket_workers.iter_mut().enumerate() {
if *wake {
if let Err(err) = wakers[worker_index].wake() {
::log::error!("request handler couldn't wake poll: {:?}", err);
}
*wake = false;
}
}
}
}
pub fn handle_announce_requests(
config: &Config,
rng: &mut impl Rng,
torrent_maps: &mut TorrentMaps,
response_channel_sender: &ResponseChannelSender,
wake_socket_workers: &mut Vec<bool>,
requests: Drain<(ConnectionMeta, AnnounceRequest)>,
) {
let valid_until = ValidUntil::new(config.cleaning.max_peer_age);
for (meta, request) in requests {
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip());
::log::debug!("peer ip: {:?}", peer_ip);
let response = match peer_ip {
IpAddr::V4(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv4Addr> =
torrent_maps.ipv4.entry(request.info_hash).or_default();
let peer_connection_meta = PeerConnectionMeta {
worker_index: meta.worker_index,
poll_token: meta.poll_token,
peer_ip_address,
};
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
peer_connection_meta,
torrent_data,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(response_peers),
peers6: ResponsePeerListV6(vec![]),
};
Response::Announce(response)
}
IpAddr::V6(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv6Addr> =
torrent_maps.ipv6.entry(request.info_hash).or_default();
let peer_connection_meta = PeerConnectionMeta {
worker_index: meta.worker_index,
poll_token: meta.poll_token,
peer_ip_address,
};
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
peer_connection_meta,
torrent_data,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(vec![]),
peers6: ResponsePeerListV6(response_peers),
};
Response::Announce(response)
}
};
response_channel_sender.send(meta, response);
wake_socket_workers[meta.worker_index] = true;
}
}
/// Insert/update peer. Return num_seeders, num_leechers and response peers
fn upsert_peer_and_get_response_peers<I: Ip>(
config: &Config,
rng: &mut impl Rng,
request_sender_meta: PeerConnectionMeta<I>,
torrent_data: &mut TorrentData<I>,
request: AnnounceRequest,
valid_until: ValidUntil,
) -> (usize, usize, Vec<ResponsePeer<I>>) {
// Insert/update/remove peer who sent this request
let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
let peer = Peer {
connection_meta: request_sender_meta,
port: request.port,
status: peer_status,
valid_until,
};
::log::debug!("peer: {:?}", peer);
let ip_or_key = request
.key
.map(Either::Right)
.unwrap_or_else(|| Either::Left(request_sender_meta.peer_ip_address));
let peer_map_key = PeerMapKey {
peer_id: request.peer_id,
ip_or_key,
};
::log::debug!("peer map key: {:?}", peer_map_key);
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key),
};
::log::debug!("opt_removed_peer: {:?}", opt_removed_peer);
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
::log::debug!("peer request numwant: {:?}", request.numwant);
let max_num_peers_to_take = match request.numwant {
Some(0) | None => config.protocol.max_peers,
Some(numwant) => numwant.min(config.protocol.max_peers),
};
let response_peers: Vec<ResponsePeer<I>> = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
peer_map_key,
Peer::to_response_peer,
);
(
torrent_data.num_seeders,
torrent_data.num_leechers,
response_peers,
)
}
pub fn handle_scrape_requests(
config: &Config,
torrent_maps: &mut TorrentMaps,
response_channel_sender: &ResponseChannelSender,
wake_socket_workers: &mut Vec<bool>,
requests: Drain<(ConnectionMeta, ScrapeRequest)>,
) {
for (meta, request) in requests {
let num_to_take = request
.info_hashes
.len()
.min(config.protocol.max_scrape_torrents);
let mut response = ScrapeResponse {
files: BTreeMap::new(),
};
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip());
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
if peer_ip.is_ipv4() {
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
};
response.files.insert(info_hash, stats);
}
}
} else {
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
};
response.files.insert(info_hash, stats);
}
}
};
response_channel_sender.send(meta, Response::Scrape(response));
wake_socket_workers[meta.worker_index] = true;
}
}

View file

@ -0,0 +1,360 @@
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use either::Either;
use rand::Rng;
use aquatic_common::extract_response_peers;
use aquatic_http_protocol::request::*;
use aquatic_http_protocol::response::*;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use aquatic_common::access_list::AccessList;
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use rand::prelude::SmallRng;
use rand::SeedableRng;
use crate::common::*;
use crate::config::Config;
pub async fn run_request_worker(
config: Config,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
response_mesh_builder: MeshBuilder<ChannelResponse, Partial>,
access_list: AccessList,
) {
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap();
let response_senders = Rc::new(response_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let access_list = Rc::new(RefCell::new(access_list));
// Periodically clean torrents and update access list
TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || {
enclose!((config, torrents, access_list) move || async move {
update_access_list(&config, access_list.clone()).await;
torrents.borrow_mut().clean(&config, &*access_list.borrow());
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
let mut handles = Vec::new();
for (_, receiver) in request_receivers.streams() {
let handle = spawn_local(handle_request_stream(
config.clone(),
torrents.clone(),
response_senders.clone(),
receiver,
))
.detach();
handles.push(handle);
}
for handle in handles {
handle.await;
}
}
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
response_senders: Rc<Senders<ChannelResponse>>,
mut stream: S,
) where
S: Stream<Item = ChannelRequest> + ::std::marker::Unpin,
{
let mut rng = SmallRng::from_entropy();
let max_peer_age = config.cleaning.max_peer_age;
let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age)));
TimerActionRepeat::repeat(enclose!((peer_valid_until) move || {
enclose!((peer_valid_until) move || async move {
*peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age);
Some(Duration::from_secs(1))
})()
}));
while let Some(channel_request) = stream.next().await {
let (response, consumer_id) = match channel_request {
ChannelRequest::Announce {
request,
peer_addr,
response_consumer_id,
connection_id,
} => {
let meta = ConnectionMeta {
response_consumer_id,
connection_id,
peer_addr,
};
let response = handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
peer_valid_until.borrow().to_owned(),
meta,
request,
);
let response = ChannelResponse::Announce {
response,
peer_addr,
connection_id,
};
(response, response_consumer_id)
}
ChannelRequest::Scrape {
request,
peer_addr,
response_consumer_id,
connection_id,
} => {
let meta = ConnectionMeta {
response_consumer_id,
connection_id,
peer_addr,
};
let response =
handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request);
let response = ChannelResponse::Scrape {
response,
peer_addr,
connection_id,
};
(response, response_consumer_id)
}
};
::log::debug!("preparing to send response to channel: {:?}", response);
if let Err(err) = response_senders.try_send_to(consumer_id.0, response) {
::log::warn!("response_sender.try_send: {:?}", err);
}
yield_if_needed().await;
}
}
pub fn handle_announce_request(
config: &Config,
rng: &mut impl Rng,
torrent_maps: &mut TorrentMaps,
valid_until: ValidUntil,
meta: ConnectionMeta,
request: AnnounceRequest,
) -> AnnounceResponse {
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip());
::log::debug!("peer ip: {:?}", peer_ip);
match peer_ip {
IpAddr::V4(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv4Addr> =
torrent_maps.ipv4.entry(request.info_hash).or_default();
let peer_connection_meta = PeerConnectionMeta {
response_consumer_id: meta.response_consumer_id,
connection_id: meta.connection_id,
peer_ip_address,
};
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
peer_connection_meta,
torrent_data,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(response_peers),
peers6: ResponsePeerListV6(vec![]),
};
response
}
IpAddr::V6(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv6Addr> =
torrent_maps.ipv6.entry(request.info_hash).or_default();
let peer_connection_meta = PeerConnectionMeta {
response_consumer_id: meta.response_consumer_id,
connection_id: meta.connection_id,
peer_ip_address,
};
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
peer_connection_meta,
torrent_data,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(vec![]),
peers6: ResponsePeerListV6(response_peers),
};
response
}
}
}
/// Insert/update peer. Return num_seeders, num_leechers and response peers
pub fn upsert_peer_and_get_response_peers<I: Ip>(
config: &Config,
rng: &mut impl Rng,
request_sender_meta: PeerConnectionMeta<I>,
torrent_data: &mut TorrentData<I>,
request: AnnounceRequest,
valid_until: ValidUntil,
) -> (usize, usize, Vec<ResponsePeer<I>>) {
// Insert/update/remove peer who sent this request
let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
let peer = Peer {
connection_meta: request_sender_meta,
port: request.port,
status: peer_status,
valid_until,
};
::log::debug!("peer: {:?}", peer);
let ip_or_key = request
.key
.map(Either::Right)
.unwrap_or_else(|| Either::Left(request_sender_meta.peer_ip_address));
let peer_map_key = PeerMapKey {
peer_id: request.peer_id,
ip_or_key,
};
::log::debug!("peer map key: {:?}", peer_map_key);
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key),
};
::log::debug!("opt_removed_peer: {:?}", opt_removed_peer);
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
::log::debug!("peer request numwant: {:?}", request.numwant);
let max_num_peers_to_take = match request.numwant {
Some(0) | None => config.protocol.max_peers,
Some(numwant) => numwant.min(config.protocol.max_peers),
};
let response_peers: Vec<ResponsePeer<I>> = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
peer_map_key,
Peer::to_response_peer,
);
(
torrent_data.num_seeders,
torrent_data.num_leechers,
response_peers,
)
}
pub fn handle_scrape_request(
config: &Config,
torrent_maps: &mut TorrentMaps,
meta: ConnectionMeta,
request: ScrapeRequest,
) -> ScrapeResponse {
let num_to_take = request
.info_hashes
.len()
.min(config.protocol.max_scrape_torrents);
let mut response = ScrapeResponse {
files: BTreeMap::new(),
};
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip());
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
if peer_ip.is_ipv4() {
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
};
response.files.insert(info_hash, stats);
}
}
} else {
for info_hash in request.info_hashes.into_iter().take(num_to_take) {
if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash) {
let stats = ScrapeStatistics {
complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned
incomplete: torrent_data.num_leechers,
};
response.files.insert(info_hash, stats);
}
}
};
response
}

View file

@ -1,153 +1,143 @@
use std::sync::Arc;
use std::thread::Builder;
use std::time::Duration;
use std::{
fs::File,
io::BufReader,
sync::{atomic::AtomicUsize, Arc},
};
use anyhow::Context;
use mio::{Poll, Waker};
use parking_lot::Mutex;
use privdrop::PrivDrop;
use aquatic_common::{access_list::AccessList, privileges::drop_privileges_after_socket_binding};
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
pub mod common;
use crate::config::Config;
mod common;
pub mod config;
pub mod handler;
pub mod network;
pub mod tasks;
use common::*;
use config::Config;
use network::utils::create_tls_acceptor;
mod handlers;
mod network;
pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run(config: Config) -> anyhow::Result<()> {
let state = State::default();
tasks::update_access_list(&config, &state);
start_workers(config.clone(), state.clone())?;
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::update_access_list(&config, &state);
state
.torrent_maps
.lock()
.clean(&config, &state.access_list.load_full());
if config.cpu_pinning.active {
core_affinity::set_for_current(core_affinity::CoreId {
id: config.cpu_pinning.offset,
});
}
}
pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?;
let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded();
let mut out_message_senders = Vec::new();
let mut wakers = Vec::new();
let socket_worker_statuses: SocketWorkerStatuses = {
let mut statuses = Vec::new();
for _ in 0..config.socket_workers {
statuses.push(None);
}
Arc::new(Mutex::new(statuses))
let access_list = if config.access_list.mode.is_on() {
AccessList::create_from_path(&config.access_list.path).expect("Load access list")
} else {
AccessList::default()
};
for i in 0..config.socket_workers {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_tls_config(&config).unwrap());
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let state = state.clone();
let socket_worker_statuses = socket_worker_statuses.clone();
let request_channel_sender = request_channel_sender.clone();
let opt_tls_acceptor = opt_tls_acceptor.clone();
let poll = Poll::new().expect("create poll");
let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN).expect("create waker"));
let tls_config = tls_config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let access_list = access_list.clone();
let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded();
let mut builder = LocalExecutorBuilder::default();
out_message_senders.push(response_channel_sender);
wakers.push(waker);
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
network::run_socket_worker(
config,
state,
i,
socket_worker_statuses,
request_channel_sender,
response_channel_receiver,
opt_tls_acceptor,
poll,
);
})?;
}
// Wait for socket worker statuses. On error from any, quit program.
// On success from all, drop privileges if corresponding setting is set
// and continue program.
loop {
::std::thread::sleep(::std::time::Duration::from_millis(10));
if let Some(statuses) = socket_worker_statuses.try_lock() {
for opt_status in statuses.iter() {
if let Some(Err(err)) = opt_status {
return Err(::anyhow::anyhow!(err.to_owned()));
}
}
if statuses.iter().all(Option::is_some) {
if config.privileges.drop_privileges {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()
.context("Couldn't drop root privileges")?;
}
break;
}
if config.cpu_pinning.active {
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i);
}
let executor = builder.spawn(|| async move {
network::run_socket_worker(
config,
tls_config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
access_list,
)
.await
});
executors.push(executor);
}
let response_channel_sender = ResponseChannelSender::new(out_message_senders);
for i in 0..config.request_workers {
for i in 0..(config.request_workers) {
let config = config.clone();
let state = state.clone();
let request_channel_receiver = request_channel_receiver.clone();
let response_channel_sender = response_channel_sender.clone();
let wakers = wakers.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let access_list = access_list.clone();
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
handler::run_request_worker(
config,
state,
request_channel_receiver,
response_channel_sender,
wakers,
);
})?;
let mut builder = LocalExecutorBuilder::default();
if config.cpu_pinning.active {
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
}
let executor = builder.spawn(|| async move {
handlers::run_request_worker(
config,
request_mesh_builder,
response_mesh_builder,
access_list,
)
.await
});
executors.push(executor);
}
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
Builder::new()
.name("statistics".to_string())
.spawn(move || loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
tasks::print_statistics(&state);
})
.expect("spawn statistics thread");
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}
fn create_tls_config(config: &Config) -> anyhow::Result<rustls::ServerConfig> {
let certs = {
let f = File::open(&config.network.tls_certificate_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::certs(&mut f)?
.into_iter()
.map(|bytes| rustls::Certificate(bytes))
.collect()
};
let private_key = {
let f = File::open(&config.network.tls_private_key_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::pkcs8_private_keys(&mut f)?
.first()
.map(|bytes| rustls::PrivateKey(bytes.clone()))
.ok_or(anyhow::anyhow!("No private keys in file"))?
};
let tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key)?;
Ok(tls_config)
}

View file

@ -0,0 +1,496 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::io::{Cursor, ErrorKind, Read, Write};
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use aquatic_common::access_list::AccessList;
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{
FailureResponse, Response, ScrapeResponse, ScrapeStatistics,
};
use either::Either;
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::ConnectedReceiver;
use glommio::net::{TcpListener, TcpStream};
use glommio::task::JoinHandle;
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use rustls::ServerConnection;
use slab::Slab;
use crate::common::num_digits_in_usize;
use crate::config::Config;
use super::common::*;
const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
const MAX_REQUEST_SIZE: usize = 2048;
struct PendingScrapeResponse {
pending_worker_responses: usize,
stats: BTreeMap<InfoHash, ScrapeStatistics>,
}
struct ConnectionReference {
response_sender: LocalSender<ChannelResponse>,
handle: JoinHandle<()>,
}
struct Connection {
config: Rc<Config>,
access_list: Rc<RefCell<AccessList>>,
request_senders: Rc<Senders<ChannelRequest>>,
response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId,
tls: ServerConnection,
stream: TcpStream,
connection_id: ConnectionId,
request_buffer: [u8; MAX_REQUEST_SIZE],
request_buffer_position: usize,
}
pub async fn run_socket_worker(
config: Config,
tls_config: Arc<rustls::ServerConfig>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
response_mesh_builder: MeshBuilder<ChannelResponse, Partial>,
num_bound_sockets: Arc<AtomicUsize>,
access_list: AccessList,
) {
let config = Rc::new(config);
let access_list = Rc::new(RefCell::new(access_list));
let listener = TcpListener::bind(config.network.address).expect("bind socket");
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let request_senders = Rc::new(request_senders);
let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap());
let connection_slab = Rc::new(RefCell::new(Slab::new()));
let connections_to_remove = Rc::new(RefCell::new(Vec::new()));
// Periodically update access list
TimerActionRepeat::repeat(enclose!((config, access_list) move || {
enclose!((config, access_list) move || async move {
update_access_list(config.clone(), access_list.clone()).await;
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
// Periodically remove closed connections
TimerActionRepeat::repeat(
enclose!((config, connection_slab, connections_to_remove) move || {
enclose!((config, connection_slab, connections_to_remove) move || async move {
let connections_to_remove = connections_to_remove.replace(Vec::new());
for connection_id in connections_to_remove {
if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) {
::log::debug!("removed connection with id {}", connection_id);
} else {
::log::error!(
"couldn't remove connection with id {}, it is not in connection slab",
connection_id
);
}
}
Some(Duration::from_secs(config.cleaning.interval))
})()
}),
);
for (_, response_receiver) in response_receivers.streams() {
spawn_local(receive_responses(
response_receiver,
connection_slab.clone(),
))
.detach();
}
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let (response_sender, response_receiver) = new_bounded(config.request_workers);
let mut slab = connection_slab.borrow_mut();
let entry = slab.vacant_entry();
let key = entry.key();
let mut conn = Connection {
config: config.clone(),
access_list: access_list.clone(),
request_senders: request_senders.clone(),
response_receiver,
response_consumer_id,
tls: ServerConnection::new(tls_config.clone()).unwrap(),
stream,
connection_id: ConnectionId(entry.key()),
request_buffer: [0u8; MAX_REQUEST_SIZE],
request_buffer_position: 0,
};
let connections_to_remove = connections_to_remove.clone();
let handle = spawn_local(async move {
if let Err(err) = conn.handle_stream().await {
::log::info!("conn.handle_stream() error: {:?}", err);
}
connections_to_remove.borrow_mut().push(key);
})
.detach();
let connection_reference = ConnectionReference {
response_sender,
handle,
};
entry.insert(connection_reference);
}
Err(err) => {
::log::error!("accept connection: {:?}", err);
}
}
}
}
async fn receive_responses(
mut response_receiver: ConnectedReceiver<ChannelResponse>,
connection_references: Rc<RefCell<Slab<ConnectionReference>>>,
) {
while let Some(channel_response) = response_receiver.next().await {
if let Some(reference) = connection_references
.borrow()
.get(channel_response.get_connection_id().0)
{
if let Err(err) = reference.response_sender.try_send(channel_response) {
::log::error!("Couldn't send response to local receiver: {:?}", err);
}
}
}
}
impl Connection {
async fn handle_stream(&mut self) -> anyhow::Result<()> {
let mut close_after_writing = false;
loop {
match self.read_tls().await? {
Some(Either::Left(request)) => {
let response = match self.handle_request(request).await? {
Some(Either::Left(response)) => response,
Some(Either::Right(pending_scrape_response)) => {
self.wait_for_response(Some(pending_scrape_response))
.await?
}
None => self.wait_for_response(None).await?,
};
self.queue_response(&response)?;
if !self.config.network.keep_alive {
close_after_writing = true;
}
}
Some(Either::Right(response)) => {
self.queue_response(&Response::Failure(response))?;
close_after_writing = true;
}
None => {
// Still handshaking
}
}
self.write_tls().await?;
if close_after_writing {
let _ = self.stream.shutdown(std::net::Shutdown::Both).await;
break;
}
}
Ok(())
}
async fn read_tls(&mut self) -> anyhow::Result<Option<Either<Request, FailureResponse>>> {
loop {
::log::debug!("read_tls");
let mut buf = [0u8; INTERMEDIATE_BUFFER_SIZE];
let bytes_read = self.stream.read(&mut buf).await?;
if bytes_read == 0 {
return Err(anyhow::anyhow!("Peer has closed connection"));
}
let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap();
let io_state = self.tls.process_new_packets()?;
let mut added_plaintext = false;
if io_state.plaintext_bytes_to_read() != 0 {
loop {
match self.tls.reader().read(&mut buf) {
Ok(0) => {
break;
}
Ok(amt) => {
let end = self.request_buffer_position + amt;
if end > self.request_buffer.len() {
return Err(anyhow::anyhow!("request too large"));
} else {
let request_buffer_slice =
&mut self.request_buffer[self.request_buffer_position..end];
request_buffer_slice.copy_from_slice(&buf[..amt]);
self.request_buffer_position = end;
added_plaintext = true;
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => {
// Should never happen
::log::error!("tls.reader().read error: {:?}", err);
break;
}
}
}
}
if added_plaintext {
match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
Ok(request) => {
::log::debug!("received request: {:?}", request);
self.request_buffer_position = 0;
return Ok(Some(Either::Left(request)));
}
Err(RequestParseError::NeedMoreData) => {
::log::debug!(
"need more request data. current data: {:?}",
std::str::from_utf8(&self.request_buffer)
);
}
Err(RequestParseError::Invalid(err)) => {
::log::debug!("invalid request: {:?}", err);
let response = FailureResponse {
failure_reason: "Invalid request".into(),
};
return Ok(Some(Either::Right(response)));
}
}
}
if self.tls.wants_write() {
break;
}
}
Ok(None)
}
async fn write_tls(&mut self) -> anyhow::Result<()> {
if !self.tls.wants_write() {
return Ok(());
}
::log::debug!("write_tls (wants write)");
let mut buf = Vec::new();
let mut buf = Cursor::new(&mut buf);
while self.tls.wants_write() {
self.tls.write_tls(&mut buf).unwrap();
}
self.stream.write_all(&buf.into_inner()).await?;
self.stream.flush().await?;
Ok(())
}
/// Take a request and:
/// - Return error response if request is not allowed
/// - If it is an announce requests, pass it on to request workers and return None
/// - If it is a scrape requests, split it up and pass on parts to
/// relevant request workers, and return PendingScrapeResponse struct.
async fn handle_request(
&self,
request: Request,
) -> anyhow::Result<Option<Either<Response, PendingScrapeResponse>>> {
let peer_addr = self.get_peer_addr()?;
match request {
Request::Announce(request) => {
let info_hash = request.info_hash;
if self
.access_list
.borrow()
.allows(self.config.access_list.mode, &info_hash.0)
{
let request = ChannelRequest::Announce {
request,
connection_id: self.connection_id,
response_consumer_id: self.response_consumer_id,
peer_addr,
};
let consumer_index = calculate_request_consumer_index(&self.config, info_hash);
// Only fails when receiver is closed
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
Ok(None)
} else {
let response = Response::Failure(FailureResponse {
failure_reason: "Info hash not allowed".into(),
});
Ok(Some(Either::Left(response)))
}
}
Request::Scrape(ScrapeRequest { info_hashes }) => {
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
for info_hash in info_hashes.into_iter() {
let info_hashes = info_hashes_by_worker
.entry(calculate_request_consumer_index(&self.config, info_hash))
.or_default();
info_hashes.push(info_hash);
}
let pending_worker_responses = info_hashes_by_worker.len();
for (consumer_index, info_hashes) in info_hashes_by_worker {
let request = ChannelRequest::Scrape {
request: ScrapeRequest { info_hashes },
peer_addr,
response_consumer_id: self.response_consumer_id,
connection_id: self.connection_id,
};
// Only fails when receiver is closed
self.request_senders
.send_to(consumer_index, request)
.await
.unwrap();
}
let pending_scrape_response = PendingScrapeResponse {
pending_worker_responses,
stats: Default::default(),
};
Ok(Some(Either::Right(pending_scrape_response)))
}
}
}
/// Wait for announce response or partial scrape responses to arrive,
/// return full response
async fn wait_for_response(
&self,
mut opt_pending_scrape_response: Option<PendingScrapeResponse>,
) -> anyhow::Result<Response> {
loop {
if let Some(channel_response) = self.response_receiver.recv().await {
if channel_response.get_peer_addr() != self.get_peer_addr()? {
return Err(anyhow::anyhow!("peer addressess didn't match"));
}
match channel_response {
ChannelResponse::Announce { response, .. } => {
break Ok(Response::Announce(response));
}
ChannelResponse::Scrape { response, .. } => {
if let Some(mut pending) = opt_pending_scrape_response.take() {
pending.stats.extend(response.files);
pending.pending_worker_responses -= 1;
if pending.pending_worker_responses == 0 {
let response = Response::Scrape(ScrapeResponse {
files: pending.stats,
});
break Ok(response);
} else {
opt_pending_scrape_response = Some(pending);
}
} else {
return Err(anyhow::anyhow!(
"received channel scrape response without pending scrape response"
));
}
}
};
} else {
// TODO: this is a serious error condition and should maybe be handled differently
return Err(anyhow::anyhow!(
"response receiver can't receive - sender is closed"
));
}
}
}
fn queue_response(&mut self, response: &Response) -> anyhow::Result<()> {
let mut body = Vec::new();
response.write(&mut body).unwrap();
let content_len = body.len() + 2; // 2 is for newlines at end
let content_len_num_digits = num_digits_in_usize(content_len);
let mut response_bytes = Vec::with_capacity(39 + content_len_num_digits + body.len());
response_bytes.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: ");
::itoa::write(&mut response_bytes, content_len)?;
response_bytes.extend_from_slice(b"\r\n\r\n");
response_bytes.append(&mut body);
response_bytes.extend_from_slice(b"\r\n");
self.tls.writer().write(&response_bytes[..])?;
Ok(())
}
fn get_peer_addr(&self) -> anyhow::Result<SocketAddr> {
self.stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))
}
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers
}

View file

@ -1,291 +0,0 @@
use std::io::ErrorKind;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::sync::Arc;
use hashbrown::HashMap;
use mio::net::TcpStream;
use mio::{Poll, Token};
use native_tls::{MidHandshakeTlsStream, TlsAcceptor};
use aquatic_http_protocol::request::{Request, RequestParseError};
use crate::common::*;
use super::stream::Stream;
#[derive(Debug)]
pub enum RequestReadError {
NeedMoreData,
StreamEnded,
Parse(anyhow::Error),
Io(::std::io::Error),
}
pub struct EstablishedConnection {
stream: Stream,
pub peer_addr: SocketAddr,
buf: Vec<u8>,
bytes_read: usize,
}
impl EstablishedConnection {
#[inline]
fn new(stream: Stream) -> Self {
let peer_addr = stream.get_peer_addr();
Self {
stream,
peer_addr,
buf: Vec::new(),
bytes_read: 0,
}
}
pub fn read_request(&mut self) -> Result<Request, RequestReadError> {
if (self.buf.len() - self.bytes_read < 512) & (self.buf.len() <= 3072) {
self.buf.extend_from_slice(&[0; 1024]);
}
match self.stream.read(&mut self.buf[self.bytes_read..]) {
Ok(0) => {
self.clear_buffer();
return Err(RequestReadError::StreamEnded);
}
Ok(bytes_read) => {
self.bytes_read += bytes_read;
::log::debug!("read_request read {} bytes", bytes_read);
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
return Err(RequestReadError::NeedMoreData);
}
Err(err) => {
self.clear_buffer();
return Err(RequestReadError::Io(err));
}
}
match Request::from_bytes(&self.buf[..self.bytes_read]) {
Ok(request) => {
self.clear_buffer();
Ok(request)
}
Err(RequestParseError::NeedMoreData) => Err(RequestReadError::NeedMoreData),
Err(RequestParseError::Invalid(err)) => {
self.clear_buffer();
Err(RequestReadError::Parse(err))
}
}
}
pub fn send_response(&mut self, body: &[u8]) -> ::std::io::Result<()> {
let content_len = body.len() + 2; // 2 is for newlines at end
let content_len_num_digits = Self::num_digits_in_usize(content_len);
let mut response = Vec::with_capacity(39 + content_len_num_digits + body.len());
response.extend_from_slice(b"HTTP/1.1 200 OK\r\nContent-Length: ");
::itoa::write(&mut response, content_len)?;
response.extend_from_slice(b"\r\n\r\n");
response.extend_from_slice(body);
response.extend_from_slice(b"\r\n");
let bytes_written = self.stream.write(&response)?;
if bytes_written != response.len() {
::log::error!(
"send_response: only {} out of {} bytes written",
bytes_written,
response.len()
);
}
self.stream.flush()?;
Ok(())
}
fn num_digits_in_usize(mut number: usize) -> usize {
let mut num_digits = 1usize;
while number >= 10 {
num_digits += 1;
number /= 10;
}
num_digits
}
#[inline]
pub fn clear_buffer(&mut self) {
self.bytes_read = 0;
self.buf = Vec::new();
}
}
pub enum TlsHandshakeMachineError {
WouldBlock(TlsHandshakeMachine),
Failure(native_tls::Error),
}
enum TlsHandshakeMachineInner {
TcpStream(TcpStream),
TlsMidHandshake(MidHandshakeTlsStream<TcpStream>),
}
pub struct TlsHandshakeMachine {
tls_acceptor: Arc<TlsAcceptor>,
inner: TlsHandshakeMachineInner,
}
impl<'a> TlsHandshakeMachine {
#[inline]
fn new(tls_acceptor: Arc<TlsAcceptor>, tcp_stream: TcpStream) -> Self {
Self {
tls_acceptor,
inner: TlsHandshakeMachineInner::TcpStream(tcp_stream),
}
}
/// Attempt to establish a TLS connection. On a WouldBlock error, return
/// the machine wrapped in an error for later attempts.
pub fn establish_tls(self) -> Result<EstablishedConnection, TlsHandshakeMachineError> {
let handshake_result = match self.inner {
TlsHandshakeMachineInner::TcpStream(stream) => self.tls_acceptor.accept(stream),
TlsHandshakeMachineInner::TlsMidHandshake(handshake) => handshake.handshake(),
};
match handshake_result {
Ok(stream) => {
let established = EstablishedConnection::new(Stream::TlsStream(stream));
::log::debug!("established tls connection");
Ok(established)
}
Err(native_tls::HandshakeError::WouldBlock(handshake)) => {
let inner = TlsHandshakeMachineInner::TlsMidHandshake(handshake);
let machine = Self {
tls_acceptor: self.tls_acceptor,
inner,
};
Err(TlsHandshakeMachineError::WouldBlock(machine))
}
Err(native_tls::HandshakeError::Failure(err)) => {
Err(TlsHandshakeMachineError::Failure(err))
}
}
}
}
enum ConnectionInner {
Established(EstablishedConnection),
InProgress(TlsHandshakeMachine),
}
pub struct Connection {
pub valid_until: ValidUntil,
inner: ConnectionInner,
}
impl Connection {
#[inline]
pub fn new(
opt_tls_acceptor: &Option<Arc<TlsAcceptor>>,
valid_until: ValidUntil,
tcp_stream: TcpStream,
) -> Self {
// Setup handshake machine if TLS is requested
let inner = if let Some(tls_acceptor) = opt_tls_acceptor {
ConnectionInner::InProgress(TlsHandshakeMachine::new(tls_acceptor.clone(), tcp_stream))
} else {
::log::debug!("established tcp connection");
ConnectionInner::Established(EstablishedConnection::new(Stream::TcpStream(tcp_stream)))
};
Self { valid_until, inner }
}
#[inline]
pub fn from_established(valid_until: ValidUntil, established: EstablishedConnection) -> Self {
Self {
valid_until,
inner: ConnectionInner::Established(established),
}
}
#[inline]
pub fn from_in_progress(valid_until: ValidUntil, machine: TlsHandshakeMachine) -> Self {
Self {
valid_until,
inner: ConnectionInner::InProgress(machine),
}
}
#[inline]
pub fn get_established(&mut self) -> Option<&mut EstablishedConnection> {
if let ConnectionInner::Established(ref mut established) = self.inner {
Some(established)
} else {
None
}
}
/// Takes ownership since TlsStream needs ownership of TcpStream
#[inline]
pub fn get_in_progress(self) -> Option<TlsHandshakeMachine> {
if let ConnectionInner::InProgress(machine) = self.inner {
Some(machine)
} else {
None
}
}
pub fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> {
match &mut self.inner {
ConnectionInner::Established(established) => match &mut established.stream {
Stream::TcpStream(ref mut stream) => poll.registry().deregister(stream),
Stream::TlsStream(ref mut stream) => poll.registry().deregister(stream.get_mut()),
},
ConnectionInner::InProgress(TlsHandshakeMachine { inner, .. }) => match inner {
TlsHandshakeMachineInner::TcpStream(ref mut stream) => {
poll.registry().deregister(stream)
}
TlsHandshakeMachineInner::TlsMidHandshake(ref mut mid_handshake) => {
poll.registry().deregister(mid_handshake.get_mut())
}
},
}
}
}
pub type ConnectionMap = HashMap<Token, Connection>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_num_digits_in_usize() {
let f = EstablishedConnection::num_digits_in_usize;
assert_eq!(f(0), 1);
assert_eq!(f(1), 1);
assert_eq!(f(9), 1);
assert_eq!(f(10), 2);
assert_eq!(f(11), 2);
assert_eq!(f(99), 2);
assert_eq!(f(100), 3);
assert_eq!(f(101), 3);
assert_eq!(f(1000), 4);
}
}

View file

@ -1,387 +0,0 @@
use std::io::{Cursor, ErrorKind};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec::Drain;
use aquatic_common::access_list::AccessListQuery;
use aquatic_http_protocol::request::Request;
use hashbrown::HashMap;
use log::{debug, error, info};
use mio::net::TcpListener;
use mio::{Events, Interest, Poll, Token};
use native_tls::TlsAcceptor;
use aquatic_http_protocol::response::*;
use crate::common::*;
use crate::config::Config;
pub mod connection;
pub mod stream;
pub mod utils;
use connection::*;
use utils::*;
const CONNECTION_CLEAN_INTERVAL: usize = 2 ^ 22;
pub fn run_socket_worker(
config: Config,
state: State,
socket_worker_index: usize,
socket_worker_statuses: SocketWorkerStatuses,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
opt_tls_acceptor: Option<TlsAcceptor>,
poll: Poll,
) {
match create_listener(config.network.address, config.network.ipv6_only) {
Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
run_poll_loop(
config,
&state,
socket_worker_index,
request_channel_sender,
response_channel_receiver,
listener,
opt_tls_acceptor,
poll,
);
}
Err(err) => {
socket_worker_statuses.lock()[socket_worker_index] =
Some(Err(format!("Couldn't open socket: {:#}", err)));
}
}
}
pub fn run_poll_loop(
config: Config,
state: &State,
socket_worker_index: usize,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
listener: ::std::net::TcpListener,
opt_tls_acceptor: Option<TlsAcceptor>,
mut poll: Poll,
) {
let poll_timeout = Duration::from_micros(config.network.poll_timeout_microseconds);
let mut listener = TcpListener::from_std(listener);
let mut events = Events::with_capacity(config.network.poll_event_capacity);
poll.registry()
.register(&mut listener, Token(0), Interest::READABLE)
.unwrap();
let mut connections: ConnectionMap = HashMap::new();
let opt_tls_acceptor = opt_tls_acceptor.map(Arc::new);
let mut poll_token_counter = Token(0usize);
let mut iter_counter = 0usize;
let mut response_buffer = [0u8; 4096];
let mut response_buffer = Cursor::new(&mut response_buffer[..]);
let mut local_responses = Vec::new();
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
for event in events.iter() {
let token = event.token();
if token == LISTENER_TOKEN {
accept_new_streams(
&config,
&mut listener,
&mut poll,
&mut connections,
&mut poll_token_counter,
&opt_tls_acceptor,
);
} else if token != CHANNEL_TOKEN {
handle_connection_read_event(
&config,
&state,
socket_worker_index,
&mut poll,
&request_channel_sender,
&mut local_responses,
&mut connections,
token,
);
}
// Send responses for each event. Channel token is not interesting
// by itself, but is just for making sure responses are sent even
// if no new connects / requests come in.
send_responses(
&config,
&mut poll,
&mut response_buffer,
local_responses.drain(..),
&response_channel_receiver,
&mut connections,
);
}
// Remove inactive connections, but not every iteration
if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 {
remove_inactive_connections(&mut poll, &mut connections);
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn accept_new_streams(
config: &Config,
listener: &mut TcpListener,
poll: &mut Poll,
connections: &mut ConnectionMap,
poll_token_counter: &mut Token,
opt_tls_acceptor: &Option<Arc<TlsAcceptor>>,
) {
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
loop {
match listener.accept() {
Ok((mut stream, _)) => {
poll_token_counter.0 = poll_token_counter.0.wrapping_add(1);
// Skip listener and channel tokens
if poll_token_counter.0 < 2 {
poll_token_counter.0 = 2;
}
let token = *poll_token_counter;
// Remove connection if it exists (which is unlikely)
remove_connection(poll, connections, poll_token_counter);
poll.registry()
.register(&mut stream, token, Interest::READABLE)
.unwrap();
let connection = Connection::new(opt_tls_acceptor, valid_until, stream);
connections.insert(token, connection);
}
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
break;
}
info!("error while accepting streams: {}", err);
}
}
}
}
/// On the stream given by poll_token, get TLS up and running if requested,
/// then read requests and pass on through channel.
pub fn handle_connection_read_event(
config: &Config,
state: &State,
socket_worker_index: usize,
poll: &mut Poll,
request_channel_sender: &RequestChannelSender,
local_responses: &mut Vec<(ConnectionMeta, Response)>,
connections: &mut ConnectionMap,
poll_token: Token,
) {
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
let access_list_mode = config.access_list.mode;
loop {
// Get connection, updating valid_until
let connection = if let Some(c) = connections.get_mut(&poll_token) {
c
} else {
// If there is no connection, there is no stream, so there
// shouldn't be any (relevant) poll events. In other words, it's
// safe to return here
return;
};
connection.valid_until = valid_until;
if let Some(established) = connection.get_established() {
match established.read_request() {
Ok(Request::Announce(ref r))
if !state.access_list.allows(access_list_mode, &r.info_hash.0) =>
{
let meta = ConnectionMeta {
worker_index: socket_worker_index,
poll_token,
peer_addr: established.peer_addr,
};
let response = FailureResponse::new("Info hash not allowed");
debug!("read disallowed request, sending back error response");
local_responses.push((meta, Response::Failure(response)));
break;
}
Ok(request) => {
let meta = ConnectionMeta {
worker_index: socket_worker_index,
poll_token,
peer_addr: established.peer_addr,
};
debug!("read allowed request, sending on to channel");
if let Err(err) = request_channel_sender.send((meta, request)) {
error!("RequestChannelSender: couldn't send message: {:?}", err);
}
break;
}
Err(RequestReadError::NeedMoreData) => {
info!("need more data");
// Stop reading data (defer to later events)
break;
}
Err(RequestReadError::Parse(err)) => {
info!("error reading request (invalid): {:#?}", err);
let meta = ConnectionMeta {
worker_index: socket_worker_index,
poll_token,
peer_addr: established.peer_addr,
};
let response = FailureResponse::new("Invalid request");
local_responses.push((meta, Response::Failure(response)));
break;
}
Err(RequestReadError::StreamEnded) => {
::log::debug!("stream ended");
remove_connection(poll, connections, &poll_token);
break;
}
Err(RequestReadError::Io(err)) => {
::log::info!("error reading request (io): {}", err);
remove_connection(poll, connections, &poll_token);
break;
}
}
} else if let Some(handshake_machine) = connections
.remove(&poll_token)
.and_then(Connection::get_in_progress)
{
match handshake_machine.establish_tls() {
Ok(established) => {
let connection = Connection::from_established(valid_until, established);
connections.insert(poll_token, connection);
}
Err(TlsHandshakeMachineError::WouldBlock(machine)) => {
let connection = Connection::from_in_progress(valid_until, machine);
connections.insert(poll_token, connection);
// Break and wait for more data
break;
}
Err(TlsHandshakeMachineError::Failure(err)) => {
info!("tls handshake error: {}", err);
// TLS negotiation failed
break;
}
}
}
}
}
/// Read responses from channel, send to peers
pub fn send_responses(
config: &Config,
poll: &mut Poll,
buffer: &mut Cursor<&mut [u8]>,
local_responses: Drain<(ConnectionMeta, Response)>,
channel_responses: &ResponseChannelReceiver,
connections: &mut ConnectionMap,
) {
let channel_responses_len = channel_responses.len();
let channel_responses_drain = channel_responses.try_iter().take(channel_responses_len);
for (meta, response) in local_responses.chain(channel_responses_drain) {
if let Some(established) = connections
.get_mut(&meta.poll_token)
.and_then(Connection::get_established)
{
if established.peer_addr != meta.peer_addr {
info!("socket worker error: peer socket addrs didn't match");
continue;
}
buffer.set_position(0);
let bytes_written = response.write(buffer).unwrap();
match established.send_response(&buffer.get_mut()[..bytes_written]) {
Ok(()) => {
::log::debug!(
"sent response: {:?} with response string {}",
response,
String::from_utf8_lossy(&buffer.get_ref()[..bytes_written])
);
if !config.network.keep_alive {
remove_connection(poll, connections, &meta.poll_token);
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
debug!("send response: would block");
}
Err(err) => {
info!("error sending response: {}", err);
remove_connection(poll, connections, &meta.poll_token);
}
}
}
}
}
// Close and remove inactive connections
pub fn remove_inactive_connections(poll: &mut Poll, connections: &mut ConnectionMap) {
let now = Instant::now();
connections.retain(|_, connection| {
let keep = connection.valid_until.0 >= now;
if !keep {
if let Err(err) = connection.deregister(poll) {
::log::error!("deregister connection error: {}", err);
}
}
keep
});
connections.shrink_to_fit();
}
fn remove_connection(poll: &mut Poll, connections: &mut ConnectionMap, connection_token: &Token) {
if let Some(mut connection) = connections.remove(connection_token) {
if let Err(err) = connection.deregister(poll) {
::log::error!("deregister connection error: {}", err);
}
}
}

View file

@ -1,69 +0,0 @@
use std::io::{Read, Write};
use std::net::SocketAddr;
use mio::net::TcpStream;
use native_tls::TlsStream;
pub enum Stream {
TcpStream(TcpStream),
TlsStream(TlsStream<TcpStream>),
}
impl Stream {
#[inline]
pub fn get_peer_addr(&self) -> SocketAddr {
match self {
Self::TcpStream(stream) => stream.peer_addr().unwrap(),
Self::TlsStream(stream) => stream.get_ref().peer_addr().unwrap(),
}
}
}
impl Read for Stream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, ::std::io::Error> {
match self {
Self::TcpStream(stream) => stream.read(buf),
Self::TlsStream(stream) => stream.read(buf),
}
}
/// Not used but provided for completeness
#[inline]
fn read_vectored(
&mut self,
bufs: &mut [::std::io::IoSliceMut<'_>],
) -> ::std::io::Result<usize> {
match self {
Self::TcpStream(stream) => stream.read_vectored(bufs),
Self::TlsStream(stream) => stream.read_vectored(bufs),
}
}
}
impl Write for Stream {
#[inline]
fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
match self {
Self::TcpStream(stream) => stream.write(buf),
Self::TlsStream(stream) => stream.write(buf),
}
}
/// Not used but provided for completeness
#[inline]
fn write_vectored(&mut self, bufs: &[::std::io::IoSlice<'_>]) -> ::std::io::Result<usize> {
match self {
Self::TcpStream(stream) => stream.write_vectored(bufs),
Self::TlsStream(stream) => stream.write_vectored(bufs),
}
}
#[inline]
fn flush(&mut self) -> ::std::io::Result<()> {
match self {
Self::TcpStream(stream) => stream.flush(),
Self::TlsStream(stream) => stream.flush(),
}
}
}

View file

@ -1,63 +0,0 @@
use std::fs::File;
use std::io::Read;
use std::net::SocketAddr;
use anyhow::Context;
use native_tls::{Identity, TlsAcceptor};
use socket2::{Domain, Protocol, Socket, Type};
use crate::config::TlsConfig;
pub fn create_tls_acceptor(config: &TlsConfig) -> anyhow::Result<Option<TlsAcceptor>> {
if config.use_tls {
let mut identity_bytes = Vec::new();
let mut file =
File::open(&config.tls_pkcs12_path).context("Couldn't open pkcs12 identity file")?;
file.read_to_end(&mut identity_bytes)
.context("Couldn't read pkcs12 identity file")?;
let identity = Identity::from_pkcs12(&identity_bytes[..], &config.tls_pkcs12_password)
.context("Couldn't parse pkcs12 identity file")?;
let acceptor = TlsAcceptor::new(identity)
.context("Couldn't create TlsAcceptor from pkcs12 identity")?;
Ok(Some(acceptor))
} else {
Ok(None)
}
}
pub fn create_listener(
address: SocketAddr,
ipv6_only: bool,
) -> ::anyhow::Result<::std::net::TcpListener> {
let builder = if address.is_ipv4() {
Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
} else {
Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))
}
.context("Couldn't create socket2::Socket")?;
if ipv6_only {
builder
.set_only_v6(true)
.context("Couldn't put socket in ipv6 only mode")?
}
builder
.set_nonblocking(true)
.context("Couldn't put socket in non-blocking mode")?;
builder
.set_reuse_port(true)
.context("Couldn't put socket in reuse_port mode")?;
builder
.bind(&address.into())
.with_context(|| format!("Couldn't bind socket to address {}", address))?;
builder
.listen(128)
.context("Couldn't listen for connections on socket")?;
Ok(builder.into())
}

View file

@ -1,52 +0,0 @@
use histogram::Histogram;
use aquatic_common::access_list::{AccessListMode, AccessListQuery};
use crate::{common::*, config::Config};
pub fn update_access_list(config: &Config, state: &State) {
match config.access_list.mode {
AccessListMode::White | AccessListMode::Black => {
if let Err(err) = state.access_list.update_from_path(&config.access_list.path) {
::log::error!("Couldn't update access list: {:?}", err);
}
}
AccessListMode::Off => {}
}
}
pub fn print_statistics(state: &State) {
let mut peers_per_torrent = Histogram::new();
{
let torrents = &mut state.torrent_maps.lock();
for torrent in torrents.ipv4.values() {
let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64;
if let Err(err) = peers_per_torrent.increment(num_peers) {
eprintln!("error incrementing peers_per_torrent histogram: {}", err)
}
}
for torrent in torrents.ipv6.values() {
let num_peers = (torrent.num_seeders + torrent.num_leechers) as u64;
if let Err(err) = peers_per_torrent.increment(num_peers) {
eprintln!("error incrementing peers_per_torrent histogram: {}", err)
}
}
}
if peers_per_torrent.entries() != 0 {
println!(
"peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}",
peers_per_torrent.minimum().unwrap(),
peers_per_torrent.percentile(50.0).unwrap(),
peers_per_torrent.percentile(75.0).unwrap(),
peers_per_torrent.percentile(90.0).unwrap(),
peers_per_torrent.percentile(99.0).unwrap(),
peers_per_torrent.percentile(99.9).unwrap(),
peers_per_torrent.maximum().unwrap(),
);
}
}

View file

@ -13,11 +13,13 @@ name = "aquatic_http_load_test"
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_http_protocol = "0.1.0"
futures-lite = "1"
hashbrown = "0.11.2"
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" }
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"
rustls = { version = "0.20", features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
[dev-dependencies]

View file

@ -9,20 +9,11 @@ pub struct Config {
pub num_workers: u8,
pub num_connections: usize,
pub duration: usize,
pub network: NetworkConfig,
pub torrents: TorrentConfig,
}
impl aquatic_cli_helpers::Config for Config {}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
pub connection_creation_interval: usize,
pub poll_timeout_microseconds: u64,
pub poll_event_capacity: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TorrentConfig {
@ -48,22 +39,11 @@ impl Default for Config {
num_workers: 1,
num_connections: 8,
duration: 0,
network: NetworkConfig::default(),
torrents: TorrentConfig::default(),
}
}
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
connection_creation_interval: 10,
poll_timeout_microseconds: 197,
poll_event_capacity: 64,
}
}
}
impl Default for TorrentConfig {
fn default() -> Self {
Self {

View file

@ -2,6 +2,7 @@ use std::sync::{atomic::Ordering, Arc};
use std::thread;
use std::time::{Duration, Instant};
use ::glommio::LocalExecutorBuilder;
use rand::prelude::*;
use rand_distr::Pareto;
@ -53,11 +54,18 @@ fn run(config: Config) -> ::anyhow::Result<()> {
// Start socket workers
let tls_config = create_tls_config().unwrap();
for _ in 0..config.num_workers {
let config = config.clone();
let tls_config = tls_config.clone();
let state = state.clone();
thread::spawn(move || run_socket_thread(&config, state, 1));
LocalExecutorBuilder::default()
.spawn(|| async move {
run_socket_thread(config, tls_config, state).await.unwrap();
})
.unwrap();
}
monitor_statistics(state, &config);
@ -147,3 +155,32 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
}
}
}
struct FakeCertificateVerifier;
impl rustls::client::ServerCertVerifier for FakeCertificateVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}
fn create_tls_config() -> anyhow::Result<Arc<rustls::ClientConfig>> {
let mut config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(rustls::RootCertStore::empty())
.with_no_client_auth();
config
.dangerous()
.set_certificate_verifier(Arc::new(FakeCertificateVerifier));
Ok(Arc::new(config))
}

View file

@ -1,280 +1,282 @@
use std::io::{Cursor, ErrorKind, Read, Write};
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{
cell::RefCell,
convert::TryInto,
io::{Cursor, ErrorKind, Read},
rc::Rc,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use hashbrown::HashMap;
use mio::{net::TcpStream, Events, Interest, Poll, Token};
use rand::{prelude::*, rngs::SmallRng};
use aquatic_http_protocol::response::Response;
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use glommio::net::TcpStream;
use glommio::{prelude::*, timer::TimerActionRepeat};
use rand::{prelude::SmallRng, SeedableRng};
use rustls::ClientConnection;
use crate::common::*;
use crate::config::*;
use crate::utils::create_random_request;
use crate::{common::LoadTestState, config::Config, utils::create_random_request};
pub struct Connection {
pub async fn run_socket_thread(
config: Config,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
) -> anyhow::Result<()> {
let config = Rc::new(config);
let num_active_connections = Rc::new(RefCell::new(0usize));
TimerActionRepeat::repeat(move || {
periodically_open_connections(
config.clone(),
tls_config.clone(),
load_test_state.clone(),
num_active_connections.clone(),
)
});
futures_lite::future::pending::<bool>().await;
Ok(())
}
async fn periodically_open_connections(
config: Rc<Config>,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
) -> Option<Duration> {
if *num_active_connections.borrow() < config.num_connections {
spawn_local(async move {
if let Err(err) =
Connection::run(config, tls_config, load_test_state, num_active_connections).await
{
eprintln!("connection creation error: {:?}", err);
}
})
.detach();
}
Some(Duration::from_secs(1))
}
struct Connection {
config: Rc<Config>,
load_test_state: LoadTestState,
rng: SmallRng,
stream: TcpStream,
read_buffer: [u8; 4096],
bytes_read: usize,
can_send: bool,
tls: ClientConnection,
response_buffer: [u8; 2048],
response_buffer_position: usize,
send_new_request: bool,
queued_responses: usize,
}
impl Connection {
pub fn create_and_register(
config: &Config,
connections: &mut ConnectionMap,
poll: &mut Poll,
token_counter: &mut usize,
async fn run(
config: Rc<Config>,
tls_config: Arc<rustls::ClientConfig>,
load_test_state: LoadTestState,
num_active_connections: Rc<RefCell<usize>>,
) -> anyhow::Result<()> {
let mut stream = TcpStream::connect(config.server_address)?;
let stream = TcpStream::connect(config.server_address)
.await
.map_err(|err| anyhow::anyhow!("connect: {:?}", err))?;
let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap();
let rng = SmallRng::from_entropy();
poll.registry()
.register(&mut stream, Token(*token_counter), Interest::READABLE)
.unwrap();
let connection = Connection {
let mut connection = Connection {
config,
load_test_state,
rng,
stream,
read_buffer: [0; 4096],
bytes_read: 0,
can_send: true,
tls,
response_buffer: [0; 2048],
response_buffer_position: 0,
send_new_request: true,
queued_responses: 0,
};
connections.insert(*token_counter, connection);
*num_active_connections.borrow_mut() += 1;
*token_counter = token_counter.wrapping_add(1);
println!("run connection");
if let Err(err) = connection.run_connection_loop().await {
eprintln!("connection error: {:?}", err);
}
*num_active_connections.borrow_mut() -= 1;
Ok(())
}
pub fn read_response(&mut self, state: &LoadTestState) -> bool {
// bool = remove connection
async fn run_connection_loop(&mut self) -> anyhow::Result<()> {
loop {
match self.stream.read(&mut self.read_buffer[self.bytes_read..]) {
Ok(0) => {
if self.bytes_read == self.read_buffer.len() {
eprintln!("read buffer is full");
if self.send_new_request {
let request =
create_random_request(&self.config, &self.load_test_state, &mut self.rng);
request.write(&mut self.tls.writer())?;
self.queued_responses += 1;
self.send_new_request = false;
}
self.write_tls().await?;
self.read_tls().await?;
}
}
async fn read_tls(&mut self) -> anyhow::Result<()> {
loop {
let mut buf = [0u8; 1024];
let bytes_read = self.stream.read(&mut buf).await?;
if bytes_read == 0 {
return Err(anyhow::anyhow!("Peer has closed connection"));
}
self.load_test_state
.statistics
.bytes_received
.fetch_add(bytes_read, Ordering::SeqCst);
let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap();
let io_state = self.tls.process_new_packets()?;
let mut added_plaintext = false;
if io_state.plaintext_bytes_to_read() != 0 {
loop {
match self.tls.reader().read(&mut buf) {
Ok(0) => {
break;
}
Ok(amt) => {
let end = self.response_buffer_position + amt;
if end > self.response_buffer.len() {
return Err(anyhow::anyhow!("response too large"));
} else {
let response_buffer_slice =
&mut self.response_buffer[self.response_buffer_position..end];
response_buffer_slice.copy_from_slice(&buf[..amt]);
self.response_buffer_position = end;
added_plaintext = true;
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => {
panic!("tls.reader().read: {}", err);
}
}
break true;
}
Ok(bytes_read) => {
self.bytes_read += bytes_read;
}
let interesting_bytes = &self.read_buffer[..self.bytes_read];
if added_plaintext {
let interesting_bytes = &self.response_buffer[..self.response_buffer_position];
let mut opt_body_start_index = None;
let mut opt_body_start_index = None;
for (i, chunk) in interesting_bytes.windows(4).enumerate() {
if chunk == b"\r\n\r\n" {
opt_body_start_index = Some(i + 4);
for (i, chunk) in interesting_bytes.windows(4).enumerate() {
if chunk == b"\r\n\r\n" {
opt_body_start_index = Some(i + 4);
break;
}
}
if let Some(body_start_index) = opt_body_start_index {
let interesting_bytes = &interesting_bytes[body_start_index..];
match Response::from_bytes(interesting_bytes) {
Ok(response) => {
match response {
Response::Announce(_) => {
self.load_test_state
.statistics
.responses_announce
.fetch_add(1, Ordering::SeqCst);
}
Response::Scrape(_) => {
self.load_test_state
.statistics
.responses_scrape
.fetch_add(1, Ordering::SeqCst);
}
Response::Failure(response) => {
self.load_test_state
.statistics
.responses_failure
.fetch_add(1, Ordering::SeqCst);
println!(
"failure response: reason: {}",
response.failure_reason
);
}
}
self.response_buffer_position = 0;
self.send_new_request = true;
break;
}
}
if let Some(body_start_index) = opt_body_start_index {
let interesting_bytes = &interesting_bytes[body_start_index..];
match Response::from_bytes(interesting_bytes) {
Ok(response) => {
state
.statistics
.bytes_received
.fetch_add(self.bytes_read, Ordering::SeqCst);
match response {
Response::Announce(_) => {
state
.statistics
.responses_announce
.fetch_add(1, Ordering::SeqCst);
}
Response::Scrape(_) => {
state
.statistics
.responses_scrape
.fetch_add(1, Ordering::SeqCst);
}
Response::Failure(response) => {
state
.statistics
.responses_failure
.fetch_add(1, Ordering::SeqCst);
println!(
"failure response: reason: {}",
response.failure_reason
);
}
}
self.bytes_read = 0;
self.can_send = true;
}
Err(err) => {
eprintln!(
"deserialize response error with {} bytes read: {:?}, text: {}",
self.bytes_read,
err,
String::from_utf8_lossy(interesting_bytes)
);
}
Err(err) => {
eprintln!(
"deserialize response error with {} bytes read: {:?}, text: {}",
self.response_buffer_position,
err,
String::from_utf8_lossy(interesting_bytes)
);
}
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break false;
}
Err(_) => {
self.bytes_read = 0;
}
break true;
}
if self.tls.wants_write() {
break;
}
}
}
pub fn send_request(
&mut self,
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
request_buffer: &mut Cursor<&mut [u8]>,
) -> bool {
// bool = remove connection
if !self.can_send {
return false;
}
let request = create_random_request(&config, &state, rng);
request_buffer.set_position(0);
request.write(request_buffer).unwrap();
let position = request_buffer.position() as usize;
match self.send_request_inner(state, &request_buffer.get_mut()[..position]) {
Ok(()) => {
state.statistics.requests.fetch_add(1, Ordering::SeqCst);
self.can_send = false;
false
}
Err(_) => true,
}
}
fn send_request_inner(
&mut self,
state: &LoadTestState,
request: &[u8],
) -> ::std::io::Result<()> {
let bytes_sent = self.stream.write(request)?;
state
.statistics
.bytes_sent
.fetch_add(bytes_sent, Ordering::SeqCst);
self.stream.flush()?;
Ok(())
}
fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> {
poll.registry().deregister(&mut self.stream)
}
}
pub type ConnectionMap = HashMap<usize, Connection>;
pub fn run_socket_thread(config: &Config, state: LoadTestState, num_initial_requests: usize) {
let timeout = Duration::from_micros(config.network.poll_timeout_microseconds);
let create_conn_interval = 2 ^ config.network.connection_creation_interval;
let mut connections: ConnectionMap = HashMap::with_capacity(config.num_connections);
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut rng = SmallRng::from_entropy();
let mut request_buffer = [0u8; 1024];
let mut request_buffer = Cursor::new(&mut request_buffer[..]);
let mut token_counter = 0usize;
for _ in 0..num_initial_requests {
Connection::create_and_register(config, &mut connections, &mut poll, &mut token_counter)
.unwrap();
}
let mut iter_counter = 0usize;
let mut num_to_create = 0usize;
let mut drop_connections = Vec::with_capacity(config.num_connections);
loop {
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for event in events.iter() {
if event.is_readable() {
let token = event.token();
if let Some(connection) = connections.get_mut(&token.0) {
// Note that this does not indicate successfully reading
// response
if connection.read_response(&state) {
remove_connection(&mut poll, &mut connections, token.0);
num_to_create += 1;
}
} else {
eprintln!("connection not found: {:?}", token);
}
}
}
for (k, connection) in connections.iter_mut() {
let remove_connection =
connection.send_request(config, &state, &mut rng, &mut request_buffer);
if remove_connection {
drop_connections.push(*k);
}
}
for k in drop_connections.drain(..) {
remove_connection(&mut poll, &mut connections, k);
num_to_create += 1;
}
let max_new = config.num_connections - connections.len();
if iter_counter % create_conn_interval == 0 {
num_to_create += 1;
}
num_to_create = num_to_create.min(max_new);
for _ in 0..num_to_create {
let ok = Connection::create_and_register(
config,
&mut connections,
&mut poll,
&mut token_counter,
)
.is_ok();
if ok {
num_to_create -= 1;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn remove_connection(poll: &mut Poll, connections: &mut ConnectionMap, connection_id: usize) {
if let Some(mut connection) = connections.remove(&connection_id) {
if let Err(err) = connection.deregister(poll) {
eprintln!("couldn't deregister connection: {}", err);
}
async fn write_tls(&mut self) -> anyhow::Result<()> {
if !self.tls.wants_write() {
return Ok(());
}
let mut buf = Vec::new();
let mut buf = Cursor::new(&mut buf);
while self.tls.wants_write() {
self.tls.write_tls(&mut buf).unwrap();
}
let len = buf.get_ref().len();
self.stream.write_all(&buf.into_inner()).await?;
self.stream.flush().await?;
self.load_test_state
.statistics
.bytes_sent
.fetch_add(len, Ordering::SeqCst);
if self.queued_responses != 0 {
self.load_test_state
.statistics
.requests
.fetch_add(self.queued_responses, Ordering::SeqCst);
self.queued_responses = 0;
}
Ok(())
}
}

View file

@ -238,6 +238,10 @@ impl Request {
Ok(Request::Announce(request))
} else {
if info_hashes.is_empty() {
return Err(anyhow::anyhow!("No info hashes sent"));
}
let request = ScrapeRequest { info_hashes };
Ok(Request::Scrape(request))

View file

@ -32,7 +32,6 @@ indexmap = "1"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
parking_lot = "0.11"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }

View file

@ -1,6 +1,7 @@
use std::net::SocketAddr;
use aquatic_common::access_list::AccessListConfig;
use aquatic_common::cpu_pinning::CpuPinningConfig;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::{Deserialize, Serialize};
use aquatic_cli_helpers::LogLevel;
@ -25,7 +26,7 @@ pub struct Config {
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
pub access_list: AccessListConfig,
pub core_affinity: CoreAffinityConfig,
pub cpu_pinning: CpuPinningConfig,
}
impl aquatic_cli_helpers::Config for Config {
@ -98,24 +99,6 @@ pub struct CleaningConfig {
pub max_connection_age: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct PrivilegeConfig {
/// Chroot and switch user after binding to sockets
pub drop_privileges: bool,
/// Chroot to this path
pub chroot_path: String,
/// User to switch to after chrooting
pub user: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct CoreAffinityConfig {
pub set_affinities: bool,
pub offset: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
@ -131,7 +114,7 @@ impl Default for Config {
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
core_affinity: CoreAffinityConfig::default(),
cpu_pinning: CpuPinningConfig::default(),
}
}
}
@ -183,22 +166,3 @@ impl Default for CleaningConfig {
}
}
}
impl Default for PrivilegeConfig {
fn default() -> Self {
Self {
drop_privileges: false,
chroot_path: ".".to_string(),
user: "nobody".to_string(),
}
}
}
impl Default for CoreAffinityConfig {
fn default() -> Self {
Self {
set_affinities: false,
offset: 0,
}
}
}

View file

@ -1,3 +1,4 @@
use std::borrow::Borrow;
use std::cell::RefCell;
use std::rc::Rc;
@ -8,18 +9,22 @@ use glommio::prelude::*;
use crate::common::*;
use crate::config::Config;
pub async fn update_access_list(config: Config, access_list: Rc<RefCell<AccessList>>) {
if config.access_list.mode.is_on() {
match BufferedFile::open(config.access_list.path).await {
pub async fn update_access_list<C: Borrow<Config>>(
config: C,
access_list: Rc<RefCell<AccessList>>,
) {
if config.borrow().access_list.mode.is_on() {
match BufferedFile::open(&config.borrow().access_list.path).await {
Ok(file) => {
let mut reader = StreamReaderBuilder::new(file).build();
let mut new_access_list = AccessList::default();
loop {
let mut buf = String::with_capacity(42);
match reader.read_line(&mut buf).await {
Ok(_) => {
if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) {
if let Err(err) = new_access_list.insert_from_line(&buf) {
::log::error!(
"Couln't parse access list line '{}': {:?}",
buf,
@ -36,6 +41,8 @@ pub async fn update_access_list(config: Config, access_list: Rc<RefCell<AccessLi
yield_if_needed().await;
}
*access_list.borrow_mut() = new_access_list;
}
Err(err) => {
::log::error!("Couldn't open access list file: {:?}", err)

View file

@ -1,11 +1,11 @@
use std::sync::{atomic::AtomicUsize, Arc};
use aquatic_common::access_list::AccessList;
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use glommio::channels::channel_mesh::MeshBuilder;
use glommio::prelude::*;
use crate::config::Config;
use crate::drop_privileges_after_socket_binding;
mod common;
pub mod handlers;
@ -14,9 +14,9 @@ pub mod network;
pub const SHARED_CHANNEL_SIZE: usize = 4096;
pub fn run(config: Config) -> anyhow::Result<()> {
if config.core_affinity.set_affinities {
if config.cpu_pinning.active {
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset,
id: config.cpu_pinning.offset,
});
}
@ -44,8 +44,8 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut builder = LocalExecutorBuilder::default();
if config.core_affinity.set_affinities {
builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + i);
if config.cpu_pinning.active {
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + i);
}
let executor = builder.spawn(|| async move {
@ -70,9 +70,8 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut builder = LocalExecutorBuilder::default();
if config.core_affinity.set_affinities {
builder =
builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i);
if config.cpu_pinning.active {
builder = builder.pin_to_cpu(config.cpu_pinning.offset + 1 + config.socket_workers + i);
}
let executor = builder.spawn(|| async move {
@ -88,7 +87,12 @@ pub fn run(config: Config) -> anyhow::Result<()> {
executors.push(executor);
}
drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap();
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
for executor in executors {
executor

View file

@ -1,11 +1,3 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use cfg_if::cfg_if;
pub mod common;
@ -16,7 +8,6 @@ pub mod glommio;
pub mod mio;
use config::Config;
use privdrop::PrivDrop;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
@ -29,35 +20,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
}
}
}
fn drop_privileges_after_socket_binding(
config: &Config,
num_bound_sockets: Arc<AtomicUsize>,
) -> anyhow::Result<()> {
if config.privileges.drop_privileges {
let mut counter = 0usize;
loop {
let sockets = num_bound_sockets.load(Ordering::SeqCst);
if sockets == config.socket_workers {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
}
}
}
Ok(())
}

View file

@ -6,6 +6,7 @@ use std::{
};
use anyhow::Context;
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use crossbeam_channel::unbounded;
pub mod common;
@ -16,14 +17,13 @@ pub mod tasks;
use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery};
use crate::config::Config;
use crate::drop_privileges_after_socket_binding;
use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.core_affinity.set_affinities {
if config.cpu_pinning.active {
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset,
id: config.cpu_pinning.offset,
});
}
@ -35,7 +35,12 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?;
drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap();
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
@ -66,9 +71,9 @@ pub fn start_workers(
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
if config.core_affinity.set_affinities {
if config.cpu_pinning.active {
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset + 1 + i,
id: config.cpu_pinning.offset + 1 + i,
});
}
@ -87,9 +92,9 @@ pub fn start_workers(
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
if config.core_affinity.set_affinities {
if config.cpu_pinning.active {
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset + 1 + config.request_workers + i,
id: config.cpu_pinning.offset + 1 + config.request_workers + i,
});
}
@ -112,9 +117,9 @@ pub fn start_workers(
Builder::new()
.name("statistics-collector".to_string())
.spawn(move || {
if config.core_affinity.set_affinities {
if config.cpu_pinning.active {
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset,
id: config.cpu_pinning.offset,
});
}

View file

@ -1,6 +1,6 @@
use std::net::SocketAddr;
use aquatic_common::access_list::AccessListConfig;
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::{Deserialize, Serialize};
use aquatic_cli_helpers::LogLevel;
@ -84,17 +84,6 @@ pub struct StatisticsConfig {
pub interval: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct PrivilegeConfig {
/// Chroot and switch user after binding to sockets
pub drop_privileges: bool,
/// Chroot to this path
pub chroot_path: String,
/// User to switch to after chrooting
pub user: String,
}
impl Default for Config {
fn default() -> Self {
Self {
@ -162,13 +151,3 @@ impl Default for StatisticsConfig {
Self { interval: 0 }
}
}
impl Default for PrivilegeConfig {
fn default() -> Self {
Self {
drop_privileges: false,
chroot_path: ".".to_string(),
user: "nobody".to_string(),
}
}
}

16
scripts/gen-tls.sh Executable file
View file

@ -0,0 +1,16 @@
#/bin/bash
set -e
mkdir -p tmp/tls
cd tmp/tls
openssl ecparam -genkey -name prime256v1 -out key.pem
openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com"
openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt
sudo cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt
sudo update-ca-certificates
openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8