Merge branch 'master' into cpu-pinning-2022-03-30

This commit is contained in:
Joakim Frostegård 2022-04-04 22:49:25 +02:00
commit 908e18360c
63 changed files with 2513 additions and 446 deletions

1
.gitignore vendored
View file

@ -5,3 +5,4 @@
**/criterion/*/new
.DS_Store
.env

967
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,6 +6,7 @@ members = [
"aquatic_common",
"aquatic_http",
"aquatic_http_load_test",
"aquatic_http_private",
"aquatic_http_protocol",
"aquatic_toml_config",
"aquatic_toml_config_derive",
@ -19,30 +20,19 @@ members = [
]
[patch.crates-io]
aquatic = { path = "aquatic" }
aquatic_cli_helpers = { path = "aquatic_cli_helpers" }
aquatic_common = { path = "aquatic_common" }
aquatic_http_load_test = { path = "aquatic_http_load_test" }
aquatic_http = { path = "aquatic_http" }
aquatic_http_protocol = { path = "aquatic_http_protocol" }
aquatic_toml_config_derive = { path = "aquatic_toml_config_derive" }
aquatic_toml_config = { path = "aquatic_toml_config" }
aquatic_udp_bench = { path = "aquatic_udp_bench" }
aquatic_udp_load_test = { path = "aquatic_udp_load_test" }
aquatic_udp = { path = "aquatic_udp" }
aquatic_udp_protocol = { path = "aquatic_udp_protocol" }
aquatic_ws_load_test = { path = "aquatic_ws_load_test" }
aquatic_ws = { path = "aquatic_ws" }
aquatic_ws_protocol = { path = "aquatic_ws_protocol" }
membarrier = { git = "https://github.com/glommer/membarrier-rs.git", branch = "issue-22" }
[profile.release]
debug = true
lto = true
debug = false
lto = "thin"
opt-level = 3
[profile.test]
opt-level = 3
inherits = "release-debug"
[profile.bench]
inherits = "release-debug"
[profile.release-debug]
inherits = "release"
debug = true
opt-level = 3
lto = true

View file

@ -214,6 +214,12 @@ IPv4 and IPv6 peers are tracked separately.
`aquatic_ws` has not been tested as much as `aquatic_udp` but likely works
fine.
#### Performance
![WebTorrent tracker throughput comparison](./documents/aquatic-ws-load-test-illustration-2022-03-29.png)
More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf).
## Load testing
There are load test binaries for all protocols. They use a CLI structure

18
TODO.md
View file

@ -2,15 +2,21 @@
## High priority
* aquatic_http_private
* Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead
* stored procedure
* test ip format
* site will likely want num_seeders and num_leechers for all torrents..
## Medium priority
* Use thin LTO?
* Add release-debug profile?
* rename request workers to swarm workers
* quit whole program if any thread panics
* config: fail on unrecognized keys?
* Run cargo-deny in CI
* aquatic_ws
* remove peer from all torrent maps when connection is closed
* RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity
* replacing indexmap_amortized / simd_json with equivalents doesn't help
* SinkExt::send maybe doesn't wake up properly?
@ -25,8 +31,6 @@
* add flag to print parsed config when starting
* aquatic_udp
* look at proper cpu pinning (check that one thread gets bound per core)
* then consider so_attach_reuseport_cbpf
* what poll event capacity is actually needed?
* stagger connection cleaning intervals?
* load test
@ -34,11 +38,9 @@
with probability 0.2
* aquatic_ws
* glommio
* proper cpu set pinning
* general
* large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes
* large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes
* so_attach_reuseport_cbpf
* extract response peers: extract "one extra" to compensate for removal,
of sender if present in selection?

View file

@ -13,8 +13,8 @@ readme = "../README.md"
name = "aquatic"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_http = "0.2.0"
aquatic_udp = "0.2.0"
aquatic_ws = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_http = { version = "0.2.0", path = "../aquatic_http" }
aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" }
aquatic_ws = { version = "0.2.0", path = "../aquatic_ws" }
mimalloc = { version = "0.1", default-features = false }

View file

@ -9,7 +9,7 @@ repository = "https://github.com/greatest-ape/aquatic"
readme = "../README.md"
[dependencies]
aquatic_toml_config = "0.2.0"
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
anyhow = "1"
git-testament = "0.2"

View file

@ -14,9 +14,10 @@ name = "aquatic_common"
[features]
with-glommio = ["glommio"]
with-hwloc = ["hwloc"]
rustls-config = ["rustls", "rustls-pemfile"]
[dependencies]
aquatic_toml_config = "0.2.0"
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
ahash = "0.7"
anyhow = "1"
@ -34,3 +35,7 @@ serde = { version = "1", features = ["derive"] }
# Optional
glommio = { version = "0.7", optional = true }
hwloc = { version = "0.5", optional = true }
# rustls-config
rustls = { version = "0.20", optional = true }
rustls-pemfile = { version = "0.3", optional = true }

View file

@ -7,6 +7,8 @@ use rand::Rng;
pub mod access_list;
pub mod cpu_pinning;
pub mod privileges;
#[cfg(feature = "rustls-config")]
pub mod rustls_config;
/// Amortized IndexMap using AHash hasher
pub type AmortizedIndexMap<K, V> = indexmap_amortized::IndexMap<K, V, RandomState>;

View file

@ -0,0 +1,35 @@
use std::{fs::File, io::BufReader, path::Path};
pub type RustlsConfig = rustls::ServerConfig;
pub fn create_rustls_config(
tls_certificate_path: &Path,
tls_private_key_path: &Path,
) -> anyhow::Result<RustlsConfig> {
let certs = {
let f = File::open(tls_certificate_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::certs(&mut f)?
.into_iter()
.map(|bytes| rustls::Certificate(bytes))
.collect()
};
let private_key = {
let f = File::open(tls_private_key_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::pkcs8_private_keys(&mut f)?
.first()
.map(|bytes| rustls::PrivateKey(bytes.clone()))
.ok_or(anyhow::anyhow!("No private keys in file"))?
};
let tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key)?;
Ok(tls_config)
}

View file

@ -16,10 +16,10 @@ name = "aquatic_http"
name = "aquatic_http"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = { version = "0.2.0", features = ["with-glommio"] }
aquatic_http_protocol = "0.2.0"
aquatic_toml_config = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] }
aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
anyhow = "1"
cfg-if = "1"

View file

@ -10,8 +10,6 @@ use aquatic_http_protocol::{
response::{AnnounceResponse, ScrapeResponse},
};
pub type TlsConfig = futures_rustls::rustls::ServerConfig;
#[derive(Copy, Clone, Debug)]
pub struct ConsumerId(pub usize);

View file

@ -5,15 +5,12 @@ use aquatic_common::{
WorkerIndex,
},
privileges::drop_privileges_after_socket_binding,
rustls_config::create_rustls_config,
};
use common::{State, TlsConfig};
use common::State;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{consts::SIGUSR1, iterator::Signals};
use std::{
fs::File,
io::BufReader,
sync::{atomic::AtomicUsize, Arc},
};
use std::sync::{atomic::AtomicUsize, Arc};
use crate::config::Config;
@ -68,7 +65,10 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_tls_config(&config).unwrap());
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut executors = Vec::new();
@ -154,32 +154,3 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
Ok(())
}
fn create_tls_config(config: &Config) -> anyhow::Result<TlsConfig> {
let certs = {
let f = File::open(&config.network.tls_certificate_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::certs(&mut f)?
.into_iter()
.map(|bytes| futures_rustls::rustls::Certificate(bytes))
.collect()
};
let private_key = {
let f = File::open(&config.network.tls_private_key_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::pkcs8_private_keys(&mut f)?
.first()
.map(|bytes| futures_rustls::rustls::PrivateKey(bytes.clone()))
.ok_or(anyhow::anyhow!("No private keys in file"))?
};
let tls_config = futures_rustls::rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key)?;
Ok(tls_config)
}

View file

@ -337,6 +337,7 @@ pub fn handle_announce_request(
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(response_peers),
peers6: ResponsePeerListV6(vec![]),
warning_message: None,
};
response
@ -366,6 +367,7 @@ pub fn handle_announce_request(
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(vec![]),
peers6: ResponsePeerListV6(response_peers),
warning_message: None,
};
response

View file

@ -7,6 +7,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
@ -54,7 +55,7 @@ struct ConnectionReference {
pub async fn run_socket_worker(
config: Config,
state: State,
tls_config: Arc<TlsConfig>,
tls_config: Arc<RustlsConfig>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
response_mesh_builder: MeshBuilder<ChannelResponse, Partial>,
num_bound_sockets: Arc<AtomicUsize>,
@ -195,7 +196,7 @@ impl Connection {
response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId,
connection_id: ConnectionId,
tls_config: Arc<TlsConfig>,
tls_config: Arc<RustlsConfig>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TcpStream,
) -> anyhow::Result<()> {

View file

@ -13,10 +13,10 @@ readme = "../README.md"
name = "aquatic_http_load_test"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = { version = "0.2.0", features = ["with-glommio"] }
aquatic_http_protocol = "0.2.0"
aquatic_toml_config = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] }
aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
anyhow = "1"
futures-lite = "1"

View file

@ -19,6 +19,8 @@ pub struct Config {
/// opened as quickly as possible, which is useful when the tracker
/// does not keep connections alive.
pub connection_creation_interval_ms: u64,
/// Announce/scrape url suffix. Use `/my_token/` to get `/announce/my_token/`
pub url_suffix: String,
pub duration: usize,
pub torrents: TorrentConfig,
pub cpu_pinning: CpuPinningConfigDesc,
@ -56,6 +58,7 @@ impl Default for Config {
num_workers: 1,
num_connections: 128,
connection_creation_interval_ms: 10,
url_suffix: "".into(),
duration: 0,
torrents: TorrentConfig::default(),
cpu_pinning: Default::default(),

View file

@ -135,7 +135,7 @@ impl Connection {
let request =
create_random_request(&self.config, &self.load_test_state, &mut self.rng);
request.write(&mut self.tls.writer())?;
request.write(&mut self.tls.writer(), self.config.url_suffix.as_bytes())?;
self.queued_responses += 1;
self.send_new_request = false;
@ -213,9 +213,7 @@ impl Connection {
}
if let Some(body_start_index) = opt_body_start_index {
let interesting_bytes = &interesting_bytes[body_start_index..];
match Response::from_bytes(interesting_bytes) {
match Response::from_bytes(&interesting_bytes[body_start_index..]) {
Ok(response) => {
match response {
Response::Announce(_) => {

View file

@ -46,8 +46,9 @@ fn create_announce_request(config: &Config, state: &LoadTestState, rng: &mut imp
event,
key: None,
numwant: None,
compact: true,
port: rng.gen(),
bytes_uploaded: 0,
bytes_downloaded: 0,
})
}

View file

@ -0,0 +1,36 @@
[package]
name = "aquatic_http_private"
version = "0.2.0"
edition = "2021"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
license = "Apache-2.0"
repository = "https://github.com/greatest-ape/aquatic"
keywords = ["http", "benchmark", "peer-to-peer", "torrent", "bittorrent"]
[lib]
name = "aquatic_http_private"
[[bin]]
name = "aquatic_http_private"
[dependencies]
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config"] }
aquatic_http_protocol = { version = "0.2.0", path = "../aquatic_http_protocol", features = ["with-axum"] }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
anyhow = "1"
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
dotenv = "0.15"
futures-util = { version = "0.3", default-features = false }
hex = "0.4"
hyper = "0.14"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
rand = { version = "0.8", features = ["small_rng"] }
rustls = "0.20"
serde = { version = "1", features = ["derive"] }
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.23"

View file

@ -0,0 +1,96 @@
# aquatic_http_private
HTTP (over TLS) BitTorrent tracker that calls a mysql stored procedure to
determine if requests can proceed.
Work in progress.
## Usage
### Database setup
* Create database (you will typically skip this step and use your own database):
```sql
CREATE DATABASE aquatic_db;
```
* Create aquatic user (use a better password):
```sql
CREATE USER 'aquatic'@localhost IDENTIFIED BY 'aquatic_password';
```
* Create stored procedure `aquatic_announce_v1`:
```sql
-- Create stored procedure called by aquatic for each announce request.
--
-- Set output parameter p_announce_allowed determines to true to allow announce.
CREATE OR REPLACE PROCEDURE aquatic_announce_v1 (
-- Canonical source ip address (IPv4/IPv6)
IN p_source_ip VARBINARY(16),
-- Source port (not port where peer says it will accept BitTorrent requests)
IN p_source_port SMALLINT UNSIGNED,
-- User agent (can be NULL)
IN p_user_agent TEXT,
-- User token extracted from announce url ('/announce/USER_TOKEN/)
IN p_user_token VARCHAR(255),
-- Hex-encoded info hash
IN p_info_hash CHAR(40),
-- Peer ID. BINARY since it can be any bytes according to spec.
IN p_peer_id BINARY(20),
-- Event (started/stopped/completed) (can be NULL)
IN p_event VARCHAR(9),
-- Bytes uploaded. Passed directly from request.
IN p_uploaded BIGINT UNSIGNED,
-- Bytes downloaded. Passed directly from request.
IN p_downloaded BIGINT UNSIGNED,
-- Bytes left
IN p_left BIGINT UNSIGNED,
-- Return true to send annonunce response. Defaults to false if not set.
OUT p_announce_allowed BOOLEAN,
-- Optional failure reason. Defaults to NULL if not set.
OUT p_failure_reason TEXT,
-- Optional warning message. Defaults to NULL if not set.
OUT p_warning_message TEXT
)
MODIFIES SQL DATA
BEGIN
-- Replace with your custom code
SELECT true INTO p_announce_allowed;
END
```
* Give aquatic user permission to call stored procedure:
```sql
GRANT EXECUTE ON PROCEDURE aquatic_db.aquatic_announce_v1 TO 'aquatic'@localhost;
FLUSH PRIVILEGES;
```
`CREATE OR REPLACE PROCEDURE` command, which leaves privileges in place,
requires MariaDB 10.1.3 or later. If your database does not support it,
each time you want to replace the procedure, you need to drop it, then
create it using `CREATE PROCEDURE` and grant execution privileges again.
### Tracker setup
* Install rust compiler and cmake
* Create `.env` file with database credentials:
```sh
DATABASE_URL="mysql://aquatic:aquatic_password@localhost/aquatic_db"
```
* Build and run tracker:
```sh
# Build
cargo build --release -p aquatic_http_private
# Generate config file (remember to set paths to TLS cert and key)
./target/release/aquatic_http_private -p > http-private-config.toml
# Run tracker
./target/release/aquatic_http_private -c http-private-config.toml
```

View file

@ -0,0 +1,52 @@
use tokio::sync::{mpsc, oneshot};
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::{common::InfoHash, response::Response};
use crate::{config::Config, workers::socket::db::ValidatedAnnounceRequest};
#[derive(Debug)]
pub struct ChannelAnnounceRequest {
pub request: ValidatedAnnounceRequest,
pub source_addr: CanonicalSocketAddr,
pub response_sender: oneshot::Sender<Response>,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct RequestWorkerIndex(pub usize);
impl RequestWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.request_workers)
}
}
pub struct ChannelRequestSender(Vec<mpsc::Sender<ChannelAnnounceRequest>>);
impl ChannelRequestSender {
pub fn new(senders: Vec<mpsc::Sender<ChannelAnnounceRequest>>) -> Self {
Self(senders)
}
pub async fn send_to(
&self,
index: RequestWorkerIndex,
request: ValidatedAnnounceRequest,
source_addr: CanonicalSocketAddr,
) -> anyhow::Result<oneshot::Receiver<Response>> {
let (response_sender, response_receiver) = oneshot::channel();
let request = ChannelAnnounceRequest {
request,
source_addr,
response_sender,
};
match self.0[index.0].send(request).await {
Ok(()) => Ok(response_receiver),
Err(err) => {
Err(anyhow::Error::new(err).context("error sending ChannelAnnounceRequest"))
}
}
}
}

View file

@ -0,0 +1,118 @@
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::privileges::PrivilegeConfig;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;
/// aquatic_http_private configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the request workers. They then receive responses from the
/// request workers, encode them and send them back over the socket.
pub socket_workers: usize,
/// Request workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
pub request_workers: usize,
pub worker_channel_size: usize,
pub db_connections_per_worker: u32,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
}
impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
worker_channel_size: 128,
db_connections_per_worker: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
}
}
}
impl aquatic_cli_helpers::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf,
pub keep_alive: bool,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)),
tls_certificate_path: "".into(),
tls_private_key_path: "".into(),
keep_alive: true,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of requested peers to accept in announce request
pub max_peers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
max_scrape_torrents: 100,
max_peers: 50,
peer_announce_interval: 300,
}
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,
/// Remove peers that have not announced for this long (seconds)
pub max_peer_age: u64,
}
impl Default for CleaningConfig {
fn default() -> Self {
Self {
torrent_cleaning_interval: 30,
max_peer_age: 360,
}
}
}
#[cfg(test)]
mod tests {
use super::Config;
::aquatic_toml_config::gen_serialize_deserialize_test!(Config);
}

View file

@ -0,0 +1,69 @@
mod common;
pub mod config;
mod workers;
use std::{collections::VecDeque, sync::Arc};
use aquatic_common::rustls_config::create_rustls_config;
use common::ChannelRequestSender;
use dotenv::dotenv;
use tokio::sync::mpsc::channel;
use config::Config;
pub const APP_NAME: &str = "aquatic_http_private: private HTTP/TLS BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> anyhow::Result<()> {
dotenv().ok();
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut request_senders = Vec::new();
let mut request_receivers = VecDeque::new();
for _ in 0..config.request_workers {
let (request_sender, request_receiver) = channel(config.worker_channel_size);
request_senders.push(request_sender);
request_receivers.push_back(request_receiver);
}
let mut handles = Vec::new();
for _ in 0..config.socket_workers {
let config = config.clone();
let tls_config = tls_config.clone();
let request_sender = ChannelRequestSender::new(request_senders.clone());
let handle = ::std::thread::Builder::new()
.name("socket".into())
.spawn(move || {
workers::socket::run_socket_worker(config, tls_config, request_sender)
})?;
handles.push(handle);
}
for _ in 0..config.request_workers {
let config = config.clone();
let request_receiver = request_receivers.pop_front().unwrap();
let handle = ::std::thread::Builder::new()
.name("request".into())
.spawn(move || workers::request::run_request_worker(config, request_receiver))?;
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??;
}
Ok(())
}

View file

@ -0,0 +1,14 @@
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_http_private::config::Config;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn main() {
run_app_with_cli_and_config::<Config>(
aquatic_http_private::APP_NAME,
aquatic_http_private::APP_VERSION,
aquatic_http_private::run,
None,
)
}

View file

@ -0,0 +1,2 @@
pub mod request;
pub mod socket;

View file

@ -0,0 +1,122 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use std::time::Instant;
use aquatic_common::{AmortizedIndexMap, ValidUntil};
use aquatic_http_protocol::common::{AnnounceEvent, InfoHash, PeerId};
use aquatic_http_protocol::response::ResponsePeer;
pub trait Ip: ::std::fmt::Debug + Copy + Eq + ::std::hash::Hash {}
impl Ip for Ipv4Addr {}
impl Ip for Ipv6Addr {}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
Leeching,
Stopped,
}
impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
#[inline]
pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
if let AnnounceEvent::Stopped = event {
Self::Stopped
} else if let Some(0) = opt_bytes_left {
Self::Seeding
} else {
Self::Leeching
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Peer<I: Ip> {
pub ip_address: I,
pub port: u16,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
pub fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PeerMapKey<I: Ip> {
pub peer_id: PeerId,
pub ip_address: I,
}
pub type PeerMap<I> = AmortizedIndexMap<PeerMapKey<I>, Peer<I>>;
pub struct TorrentData<I: Ip> {
pub peers: PeerMap<I>,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl<I: Ip> Default for TorrentData<I> {
#[inline]
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
pub type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)]
pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>,
}
impl TorrentMaps {
pub fn clean(&mut self) {
Self::clean_torrent_map(&mut self.ipv4);
Self::clean_torrent_map(&mut self.ipv6);
}
fn clean_torrent_map<I: Ip>(torrent_map: &mut TorrentMap<I>) {
let now = Instant::now();
torrent_map.retain(|_, torrent_data| {
let num_seeders = &mut torrent_data.num_seeders;
let num_leechers = &mut torrent_data.num_leechers;
torrent_data.peers.retain(|_, peer| {
if peer.valid_until.0 >= now {
true
} else {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
false
}
});
!torrent_data.peers.is_empty()
});
torrent_map.shrink_to_fit();
}
}

View file

@ -0,0 +1,210 @@
mod common;
use std::cell::RefCell;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::rc::Rc;
use aquatic_http_protocol::request::AnnounceRequest;
use rand::prelude::SmallRng;
use rand::SeedableRng;
use tokio::sync::mpsc::Receiver;
use tokio::task::LocalSet;
use tokio::time;
use aquatic_common::{extract_response_peers, CanonicalSocketAddr, ValidUntil};
use aquatic_http_protocol::response::{
AnnounceResponse, Response, ResponsePeer, ResponsePeerListV4, ResponsePeerListV6,
};
use crate::common::ChannelAnnounceRequest;
use crate::config::Config;
use common::*;
pub fn run_request_worker(
config: Config,
request_receiver: Receiver<ChannelAnnounceRequest>,
) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(run_inner(config, request_receiver))?;
Ok(())
}
async fn run_inner(
config: Config,
mut request_receiver: Receiver<ChannelAnnounceRequest>,
) -> anyhow::Result<()> {
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let mut rng = SmallRng::from_entropy();
LocalSet::new().spawn_local(periodically_clean_torrents(
config.clone(),
torrents.clone(),
));
loop {
let request = request_receiver
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("request channel closed"))?;
let valid_until = ValidUntil::new(config.cleaning.max_peer_age);
let response = handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
valid_until,
request.source_addr,
request.request.into(),
);
let _ = request.response_sender.send(Response::Announce(response));
}
}
async fn periodically_clean_torrents(config: Config, torrents: Rc<RefCell<TorrentMaps>>) {
let mut interval = time::interval(time::Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
loop {
interval.tick().await;
torrents.borrow_mut().clean();
}
}
fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrent_maps: &mut TorrentMaps,
valid_until: ValidUntil,
source_addr: CanonicalSocketAddr,
request: AnnounceRequest,
) -> AnnounceResponse {
match source_addr.get().ip() {
IpAddr::V4(source_ip) => {
let torrent_data: &mut TorrentData<Ipv4Addr> =
torrent_maps.ipv4.entry(request.info_hash).or_default();
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
torrent_data,
source_ip,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(response_peers),
peers6: ResponsePeerListV6(vec![]),
warning_message: None,
};
response
}
IpAddr::V6(source_ip) => {
let torrent_data: &mut TorrentData<Ipv6Addr> =
torrent_maps.ipv6.entry(request.info_hash).or_default();
let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
config,
rng,
torrent_data,
source_ip,
request,
valid_until,
);
let response = AnnounceResponse {
complete: seeders,
incomplete: leechers,
announce_interval: config.protocol.peer_announce_interval,
peers: ResponsePeerListV4(vec![]),
peers6: ResponsePeerListV6(response_peers),
warning_message: None,
};
response
}
}
}
/// Insert/update peer. Return num_seeders, num_leechers and response peers
pub fn upsert_peer_and_get_response_peers<I: Ip>(
config: &Config,
rng: &mut SmallRng,
torrent_data: &mut TorrentData<I>,
source_ip: I,
request: AnnounceRequest,
valid_until: ValidUntil,
) -> (usize, usize, Vec<ResponsePeer<I>>) {
// Insert/update/remove peer who sent this request
let peer_status =
PeerStatus::from_event_and_bytes_left(request.event, Some(request.bytes_left));
let peer = Peer {
ip_address: source_ip,
port: request.port,
status: peer_status,
valid_until,
};
let peer_map_key = PeerMapKey {
peer_id: request.peer_id,
ip_address: source_ip,
};
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(peer_map_key.clone(), peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&peer_map_key),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
let max_num_peers_to_take = match request.numwant {
Some(0) | None => config.protocol.max_peers,
Some(numwant) => numwant.min(config.protocol.max_peers),
};
let response_peers: Vec<ResponsePeer<I>> = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
peer_map_key,
Peer::to_response_peer,
);
(
torrent_data.num_seeders,
torrent_data.num_leechers,
response_peers,
)
}

View file

@ -0,0 +1,119 @@
use std::net::IpAddr;
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse};
use sqlx::{Executor, MySql, Pool};
#[derive(Debug)]
pub struct ValidatedAnnounceRequest(AnnounceRequest);
impl Into<AnnounceRequest> for ValidatedAnnounceRequest {
fn into(self) -> AnnounceRequest {
self.0
}
}
#[derive(Debug, sqlx::FromRow)]
struct AnnounceProcedureResults {
announce_allowed: bool,
failure_reason: Option<String>,
warning_message: Option<String>,
}
pub async fn validate_announce_request(
pool: &Pool<MySql>,
source_addr: CanonicalSocketAddr,
user_agent: Option<String>,
user_token: String,
request: AnnounceRequest,
) -> Result<(ValidatedAnnounceRequest, Option<String>), FailureResponse> {
match call_announce_procedure(pool, source_addr, user_agent, user_token, &request).await {
Ok(results) => {
if results.announce_allowed {
Ok((ValidatedAnnounceRequest(request), results.warning_message))
} else {
Err(FailureResponse::new(
results
.failure_reason
.unwrap_or_else(|| "Not allowed".into()),
))
}
}
Err(err) => {
::log::error!("announce procedure error: {:#}", err);
Err(FailureResponse::new("Internal error"))
}
}
}
async fn call_announce_procedure(
pool: &Pool<MySql>,
source_addr: CanonicalSocketAddr,
user_agent: Option<String>,
user_token: String, // FIXME: length
request: &AnnounceRequest,
) -> anyhow::Result<AnnounceProcedureResults> {
let mut t = pool.begin().await?;
t.execute(
"
SET
@p_announce_allowed = false,
@p_failure_reason = NULL,
@p_warning_message = NULL;
",
)
.await?;
let q = sqlx::query(
"
CALL aquatic_announce_v1(
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
@p_announce_allowed,
@p_failure_reason,
@p_warning_message
);
",
)
.bind(match source_addr.get().ip() {
IpAddr::V4(ip) => Vec::from(ip.octets()),
IpAddr::V6(ip) => Vec::from(ip.octets()),
})
.bind(source_addr.get().port())
.bind(user_agent)
.bind(user_token)
.bind(hex::encode(request.info_hash.0))
.bind(&request.peer_id.0[..])
.bind(request.event.as_str())
.bind(request.bytes_uploaded as u64)
.bind(request.bytes_downloaded as u64)
.bind(request.bytes_left as u64);
t.execute(q).await?;
let response = sqlx::query_as::<_, AnnounceProcedureResults>(
"
SELECT
@p_announce_allowed as announce_allowed,
@p_failure_reason as failure_reason,
@p_warning_message as warning_message;
",
)
.fetch_one(&mut t)
.await?;
t.commit().await?;
Ok(response)
}

View file

@ -0,0 +1,97 @@
pub mod db;
mod routes;
mod tls;
use std::{
net::{SocketAddr, TcpListener},
sync::Arc,
};
use anyhow::Context;
use aquatic_common::rustls_config::RustlsConfig;
use axum::{extract::connect_info::Connected, routing::get, Extension, Router};
use hyper::server::conn::AddrIncoming;
use sqlx::mysql::MySqlPoolOptions;
use self::tls::{TlsAcceptor, TlsStream};
use crate::{common::ChannelRequestSender, config::Config};
impl<'a> Connected<&'a tls::TlsStream> for SocketAddr {
fn connect_info(target: &'a TlsStream) -> Self {
target.get_remote_addr()
}
}
pub fn run_socket_worker(
config: Config,
tls_config: Arc<RustlsConfig>,
request_sender: ChannelRequestSender,
) -> anyhow::Result<()> {
let tcp_listener = create_tcp_listener(config.network.address)?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(run_app(config, tls_config, tcp_listener, request_sender))?;
Ok(())
}
async fn run_app(
config: Config,
tls_config: Arc<RustlsConfig>,
tcp_listener: TcpListener,
request_sender: ChannelRequestSender,
) -> anyhow::Result<()> {
let db_url =
::std::env::var("DATABASE_URL").with_context(|| "Retrieve env var DATABASE_URL")?;
let tls_acceptor = TlsAcceptor::new(
tls_config,
AddrIncoming::from_listener(tokio::net::TcpListener::from_std(tcp_listener)?)?,
);
let pool = MySqlPoolOptions::new()
.max_connections(config.db_connections_per_worker)
.connect(&db_url)
.await?;
let app = Router::new()
.route("/announce/:user_token/", get(routes::announce))
.layer(Extension(Arc::new(config.clone())))
.layer(Extension(pool))
.layer(Extension(Arc::new(request_sender)));
axum::Server::builder(tls_acceptor)
.http1_keepalive(config.network.keep_alive)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await?;
Ok(())
}
fn create_tcp_listener(addr: SocketAddr) -> anyhow::Result<TcpListener> {
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
socket
.set_reuse_port(true)
.with_context(|| "set_reuse_port")?;
socket
.set_nonblocking(true)
.with_context(|| "set_nonblocking")?;
socket
.bind(&addr.into())
.with_context(|| format!("bind to {}", addr))?;
socket
.listen(1024)
.with_context(|| format!("listen on {}", addr))?;
Ok(socket.into())
}

View file

@ -0,0 +1,65 @@
use aquatic_common::CanonicalSocketAddr;
use axum::{
extract::{ConnectInfo, Path, RawQuery},
headers::UserAgent,
Extension, TypedHeader,
};
use sqlx::mysql::MySqlPool;
use std::{net::SocketAddr, sync::Arc};
use aquatic_http_protocol::{
request::AnnounceRequest,
response::{FailureResponse, Response},
};
use crate::{
common::{ChannelRequestSender, RequestWorkerIndex},
config::Config,
};
use super::db;
pub async fn announce(
Extension(config): Extension<Arc<Config>>,
Extension(pool): Extension<MySqlPool>,
Extension(request_sender): Extension<Arc<ChannelRequestSender>>,
ConnectInfo(source_addr): ConnectInfo<SocketAddr>,
opt_user_agent: Option<TypedHeader<UserAgent>>,
Path(user_token): Path<String>,
RawQuery(query): RawQuery,
) -> Result<Response, FailureResponse> {
let query = query.ok_or_else(|| FailureResponse::new("Empty query string"))?;
let request = AnnounceRequest::from_query_string(&query)
.map_err(|_| FailureResponse::new("Malformed request"))?;
let request_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash);
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
let source_addr = CanonicalSocketAddr::new(source_addr);
let (validated_request, opt_warning_message) =
db::validate_announce_request(&pool, source_addr, opt_user_agent, user_token, request)
.await?;
let response_receiver = request_sender
.send_to(request_worker_index, validated_request, source_addr)
.await
.map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?;
let mut response = response_receiver.await.map_err(|err| {
internal_error(format!("Receiving response over channel failed: {:#}", err))
})?;
if let Response::Announce(ref mut r) = response {
r.warning_message = opt_warning_message;
}
Ok(response)
}
fn internal_error(error: String) -> FailureResponse {
::log::error!("{}", error);
FailureResponse::new("Internal error")
}

View file

@ -0,0 +1,151 @@
//! hyper/rustls integration
//!
//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2,
//! otherwise HTTP/1.1 will be used.
//!
//! Based on https://github.com/rustls/hyper-rustls/blob/9b7b1220f74de9b249ce2b8f8b922fd00074c53b/examples/server.rs
// ISC License (ISC)
// Copyright (c) 2016, Joseph Birr-Pixton <jpixton@gmail.com>
//
// Permission to use, copy, modify, and/or distribute this software for
// any purpose with or without fee is hereby granted, provided that the
// above copyright notice and this permission notice appear in all copies.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
// WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
// AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL
// DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR
// PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
// ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
use core::task::{Context, Poll};
use futures_util::ready;
use hyper::server::accept::Accept;
use hyper::server::conn::{AddrIncoming, AddrStream};
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::rustls::ServerConfig;
enum State {
Handshaking(tokio_rustls::Accept<AddrStream>, SocketAddr),
Streaming(tokio_rustls::server::TlsStream<AddrStream>),
}
// tokio_rustls::server::TlsStream doesn't expose constructor methods,
// so we have to TlsAcceptor::accept and handshake to have access to it
// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first
pub struct TlsStream {
state: State,
}
impl TlsStream {
fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream {
let remote_addr = stream.remote_addr();
let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream);
TlsStream {
state: State::Handshaking(accept, remote_addr),
}
}
pub fn get_remote_addr(&self) -> SocketAddr {
match &self.state {
State::Handshaking(_, remote_addr) => *remote_addr,
State::Streaming(stream) => stream.get_ref().0.remote_addr(),
}
}
}
impl AsyncRead for TlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept, ref mut socket_addr) => {
match ready!(Pin::new(accept).poll(cx)) {
Ok(mut stream) => {
*socket_addr = stream.get_ref().0.remote_addr();
let result = Pin::new(&mut stream).poll_read(cx, buf);
pin.state = State::Streaming(stream);
result
}
Err(err) => Poll::Ready(Err(err)),
}
}
State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWrite for TlsStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let pin = self.get_mut();
match pin.state {
State::Handshaking(ref mut accept, _) => match ready!(Pin::new(accept).poll(cx)) {
Ok(mut stream) => {
let result = Pin::new(&mut stream).poll_write(cx, buf);
pin.state = State::Streaming(stream);
result
}
Err(err) => Poll::Ready(Err(err)),
},
State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.state {
State::Handshaking(_, _) => Poll::Ready(Ok(())),
State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.state {
State::Handshaking(_, _) => Poll::Ready(Ok(())),
State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx),
}
}
}
pub struct TlsAcceptor {
config: Arc<ServerConfig>,
incoming: AddrIncoming,
}
impl TlsAcceptor {
pub fn new(config: Arc<ServerConfig>, incoming: AddrIncoming) -> TlsAcceptor {
TlsAcceptor { config, incoming }
}
}
impl Accept for TlsAcceptor {
type Conn = TlsStream;
type Error = io::Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let pin = self.get_mut();
match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) {
Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))),
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
}
}
}

View file

@ -22,8 +22,12 @@ name = "bench_announce_response_to_bytes"
path = "benches/bench_announce_response_to_bytes.rs"
harness = false
[features]
with-axum = ["axum"]
[dependencies]
anyhow = "1"
axum = { version = "0.5", optional = true, default-features = false }
hex = { version = "0.4", default-features = false }
httparse = "1"
itoa = "1"

View file

@ -21,6 +21,7 @@ pub fn bench(c: &mut Criterion) {
incomplete: 500,
peers: ResponsePeerListV4(peers),
peers6: ResponsePeerListV6(Vec::new()),
warning_message: None,
};
let response = Response::Announce(announce_response);

View file

@ -24,7 +24,7 @@ pub struct InfoHash(
pub [u8; 20],
);
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AnnounceEvent {
Started,
Stopped,
@ -52,6 +52,17 @@ impl FromStr for AnnounceEvent {
}
}
impl AnnounceEvent {
pub fn as_str(&self) -> Option<&str> {
match self {
Self::Started => Some("started"),
Self::Stopped => Some("stopped"),
Self::Completed => Some("completed"),
Self::Empty => None,
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for InfoHash {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {

View file

@ -11,17 +11,20 @@ pub struct AnnounceRequest {
pub info_hash: InfoHash,
pub peer_id: PeerId,
pub port: u16,
pub bytes_uploaded: usize,
pub bytes_downloaded: usize,
pub bytes_left: usize,
pub event: AnnounceEvent,
pub compact: bool,
/// Number of response peers wanted
pub numwant: Option<usize>,
pub key: Option<SmartString<LazyCompact>>,
}
impl AnnounceRequest {
fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<()> {
output.write_all(b"GET /announce?info_hash=")?;
fn write<W: Write>(&self, output: &mut W, url_suffix: &[u8]) -> ::std::io::Result<()> {
output.write_all(b"GET /announce")?;
output.write_all(url_suffix)?;
output.write_all(b"?info_hash=")?;
urlencode_20_bytes(self.info_hash.0, output)?;
output.write_all(b"&peer_id=")?;
@ -30,7 +33,13 @@ impl AnnounceRequest {
output.write_all(b"&port=")?;
output.write_all(itoa::Buffer::new().format(self.port).as_bytes())?;
output.write_all(b"&uploaded=0&downloaded=0&left=")?;
output.write_all(b"&uploaded=")?;
output.write_all(itoa::Buffer::new().format(self.bytes_uploaded).as_bytes())?;
output.write_all(b"&downloaded=")?;
output.write_all(itoa::Buffer::new().format(self.bytes_downloaded).as_bytes())?;
output.write_all(b"&left=")?;
output.write_all(itoa::Buffer::new().format(self.bytes_left).as_bytes())?;
match self.event {
@ -40,9 +49,6 @@ impl AnnounceRequest {
AnnounceEvent::Empty => (),
};
output.write_all(b"&compact=")?;
output.write_all(itoa::Buffer::new().format(self.compact as u8).as_bytes())?;
if let Some(numwant) = self.numwant {
output.write_all(b"&numwant=")?;
output.write_all(itoa::Buffer::new().format(numwant).as_bytes())?;
@ -57,6 +63,105 @@ impl AnnounceRequest {
Ok(())
}
pub fn from_query_string(query_string: &str) -> anyhow::Result<Self> {
// -- Parse key-value pairs
let mut opt_info_hash = None;
let mut opt_peer_id = None;
let mut opt_port = None;
let mut opt_bytes_left = None;
let mut opt_bytes_uploaded = None;
let mut opt_bytes_downloaded = None;
let mut event = AnnounceEvent::default();
let mut opt_numwant = None;
let mut opt_key = None;
let query_string_bytes = query_string.as_bytes();
let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes);
let mut position = 0usize;
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) {
let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len());
let key = query_string
.get(position..equal_sign_index)
.with_context(|| format!("no key at {}..{}", position, equal_sign_index))?;
let value = query_string
.get(equal_sign_index + 1..segment_end)
.with_context(|| {
format!("no value at {}..{}", equal_sign_index + 1, segment_end)
})?;
match key {
"info_hash" => {
let value = urldecode_20_bytes(value)?;
opt_info_hash = Some(InfoHash(value));
}
"peer_id" => {
let value = urldecode_20_bytes(value)?;
opt_peer_id = Some(PeerId(value));
}
"port" => {
opt_port = Some(value.parse::<u16>().with_context(|| "parse port")?);
}
"left" => {
opt_bytes_left = Some(value.parse::<usize>().with_context(|| "parse left")?);
}
"uploaded" => {
opt_bytes_uploaded =
Some(value.parse::<usize>().with_context(|| "parse uploaded")?);
}
"downloaded" => {
opt_bytes_downloaded =
Some(value.parse::<usize>().with_context(|| "parse downloaded")?);
}
"event" => {
event = value
.parse::<AnnounceEvent>()
.map_err(|err| anyhow::anyhow!("invalid event: {}", err))?;
}
"compact" => {
if value != "1" {
return Err(anyhow::anyhow!("compact set, but not to 1"));
}
}
"numwant" => {
opt_numwant = Some(value.parse::<usize>().with_context(|| "parse numwant")?);
}
"key" => {
if value.len() > 100 {
return Err(anyhow::anyhow!("'key' is too long"));
}
opt_key = Some(::urlencoding::decode(value)?.into());
}
k => {
::log::debug!("ignored unrecognized key: {}", k)
}
}
if segment_end == query_string.len() {
break;
} else {
position = segment_end + 1;
}
}
Ok(AnnounceRequest {
info_hash: opt_info_hash.with_context(|| "no info_hash")?,
peer_id: opt_peer_id.with_context(|| "no peer_id")?,
port: opt_port.with_context(|| "no port")?,
bytes_uploaded: opt_bytes_uploaded.with_context(|| "no uploaded")?,
bytes_downloaded: opt_bytes_downloaded.with_context(|| "no downloaded")?,
bytes_left: opt_bytes_left.with_context(|| "no left")?,
event,
numwant: opt_numwant,
key: opt_key,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
@ -65,8 +170,10 @@ pub struct ScrapeRequest {
}
impl ScrapeRequest {
fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<()> {
output.write_all(b"GET /scrape?")?;
fn write<W: Write>(&self, output: &mut W, url_suffix: &[u8]) -> ::std::io::Result<()> {
output.write_all(b"GET /scrape")?;
output.write_all(url_suffix)?;
output.write_all(b"?")?;
let mut first = true;
@ -85,6 +192,53 @@ impl ScrapeRequest {
Ok(())
}
pub fn from_query_string(query_string: &str) -> anyhow::Result<Self> {
// -- Parse key-value pairs
let mut info_hashes = Vec::new();
let query_string_bytes = query_string.as_bytes();
let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes);
let mut position = 0usize;
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) {
let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len());
let key = query_string
.get(position..equal_sign_index)
.with_context(|| format!("no key at {}..{}", position, equal_sign_index))?;
let value = query_string
.get(equal_sign_index + 1..segment_end)
.with_context(|| {
format!("no value at {}..{}", equal_sign_index + 1, segment_end)
})?;
match key {
"info_hash" => {
let value = urldecode_20_bytes(value)?;
info_hashes.push(InfoHash(value));
}
k => {
::log::debug!("ignored unrecognized key: {}", k)
}
}
if segment_end == query_string.len() {
break;
} else {
position = segment_end + 1;
}
}
if info_hashes.is_empty() {
return Err(anyhow::anyhow!("No info hashes sent"));
}
Ok(ScrapeRequest { info_hashes })
}
}
#[derive(Debug)]
@ -147,111 +301,21 @@ impl Request {
let location = split_parts.next().with_context(|| "no location")?;
let query_string = split_parts.next().with_context(|| "no query string")?;
// -- Parse key-value pairs
let mut info_hashes = Vec::new();
let mut opt_peer_id = None;
let mut opt_port = None;
let mut opt_bytes_left = None;
let mut event = AnnounceEvent::default();
let mut opt_numwant = None;
let mut opt_key = None;
let query_string_bytes = query_string.as_bytes();
let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes);
let mut position = 0usize;
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes) {
let segment_end = ampersand_iter.next().unwrap_or_else(|| query_string.len());
let key = query_string
.get(position..equal_sign_index)
.with_context(|| format!("no key at {}..{}", position, equal_sign_index))?;
let value = query_string
.get(equal_sign_index + 1..segment_end)
.with_context(|| {
format!("no value at {}..{}", equal_sign_index + 1, segment_end)
})?;
match key {
"info_hash" => {
let value = urldecode_20_bytes(value)?;
info_hashes.push(InfoHash(value));
}
"peer_id" => {
let value = urldecode_20_bytes(value)?;
opt_peer_id = Some(PeerId(value));
}
"port" => {
opt_port = Some(value.parse::<u16>().with_context(|| "parse port")?);
}
"left" => {
opt_bytes_left = Some(value.parse::<usize>().with_context(|| "parse left")?);
}
"event" => {
event = value
.parse::<AnnounceEvent>()
.map_err(|err| anyhow::anyhow!("invalid event: {}", err))?;
}
"compact" => {
if value != "1" {
return Err(anyhow::anyhow!("compact set, but not to 1"));
}
}
"numwant" => {
opt_numwant = Some(value.parse::<usize>().with_context(|| "parse numwant")?);
}
"key" => {
if value.len() > 100 {
return Err(anyhow::anyhow!("'key' is too long"));
}
opt_key = Some(::urlencoding::decode(value)?.into());
}
k => {
::log::debug!("ignored unrecognized key: {}", k)
}
}
if segment_end == query_string.len() {
break;
} else {
position = segment_end + 1;
}
}
// -- Put together request
if location == "/announce" {
let request = AnnounceRequest {
info_hash: info_hashes.pop().with_context(|| "no info_hash")?,
peer_id: opt_peer_id.with_context(|| "no peer_id")?,
port: opt_port.with_context(|| "no port")?,
bytes_left: opt_bytes_left.with_context(|| "no left")?,
event,
compact: true,
numwant: opt_numwant,
key: opt_key,
};
Ok(Request::Announce(request))
Ok(Request::Announce(AnnounceRequest::from_query_string(
query_string,
)?))
} else {
if info_hashes.is_empty() {
return Err(anyhow::anyhow!("No info hashes sent"));
}
let request = ScrapeRequest { info_hashes };
Ok(Request::Scrape(request))
Ok(Request::Scrape(ScrapeRequest::from_query_string(
query_string,
)?))
}
}
pub fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<()> {
pub fn write<W: Write>(&self, output: &mut W, url_suffix: &[u8]) -> ::std::io::Result<()> {
match self {
Self::Announce(r) => r.write(output),
Self::Scrape(r) => r.write(output),
Self::Announce(r) => r.write(output, url_suffix),
Self::Scrape(r) => r.write(output, url_suffix),
}
}
}
@ -262,7 +326,7 @@ mod tests {
use super::*;
static ANNOUNCE_REQUEST_PATH: &str = "/announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=0&downloaded=0&left=1&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started";
static ANNOUNCE_REQUEST_PATH: &str = "/announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=1&downloaded=2&left=3&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started";
static SCRAPE_REQUEST_PATH: &str =
"/scrape?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9";
static REFERENCE_INFO_HASH: [u8; 20] = [
@ -279,9 +343,10 @@ mod tests {
info_hash: InfoHash(REFERENCE_INFO_HASH),
peer_id: PeerId(REFERENCE_PEER_ID),
port: 12345,
bytes_left: 1,
bytes_uploaded: 1,
bytes_downloaded: 2,
bytes_left: 3,
event: AnnounceEvent::Started,
compact: true,
numwant: Some(0),
key: Some("4ab4b877".into()),
})
@ -325,9 +390,10 @@ mod tests {
info_hash: Arbitrary::arbitrary(g),
peer_id: Arbitrary::arbitrary(g),
port: Arbitrary::arbitrary(g),
bytes_uploaded: Arbitrary::arbitrary(g),
bytes_downloaded: Arbitrary::arbitrary(g),
bytes_left: Arbitrary::arbitrary(g),
event: Arbitrary::arbitrary(g),
compact: true,
numwant: Arbitrary::arbitrary(g),
key: key.map(|key| key.into()),
}
@ -373,7 +439,7 @@ mod tests {
let mut bytes = Vec::new();
request.write(&mut bytes).unwrap();
request.write(&mut bytes, &[]).unwrap();
let parsed_request = Request::from_bytes(&bytes[..]).unwrap();

View file

@ -51,10 +51,17 @@ pub struct AnnounceResponse {
pub peers: ResponsePeerListV4,
#[serde(default)]
pub peers6: ResponsePeerListV6,
// Serialize as string if Some, otherwise skip
#[serde(
rename = "warning message",
skip_serializing_if = "Option::is_none",
serialize_with = "serialize_optional_string"
)]
pub warning_message: Option<String>,
}
impl AnnounceResponse {
fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<usize> {
pub fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<usize> {
let mut bytes_written = 0usize;
bytes_written += output.write(b"d8:completei")?;
@ -93,12 +100,34 @@ impl AnnounceResponse {
bytes_written += output.write(&u128::from(peer.ip_address).to_be_bytes())?;
bytes_written += output.write(&peer.port.to_be_bytes())?;
}
if let Some(ref warning_message) = self.warning_message {
let message_bytes = warning_message.as_bytes();
bytes_written += output.write(b"15:warning message")?;
bytes_written +=
output.write(itoa::Buffer::new().format(message_bytes.len()).as_bytes())?;
bytes_written += output.write(b":")?;
bytes_written += output.write(message_bytes)?;
}
bytes_written += output.write(b"e")?;
Ok(bytes_written)
}
}
#[cfg(feature = "with-axum")]
impl axum::response::IntoResponse for AnnounceResponse {
fn into_response(self) -> axum::response::Response {
let mut response_bytes = Vec::with_capacity(128);
self.write(&mut response_bytes).unwrap();
([("Content-type", "text/plain")], response_bytes).into_response()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScrapeResponse {
/// BTreeMap instead of HashMap since keys need to be serialized in order
@ -106,7 +135,7 @@ pub struct ScrapeResponse {
}
impl ScrapeResponse {
fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<usize> {
pub fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<usize> {
let mut bytes_written = 0usize;
bytes_written += output.write(b"d5:filesd")?;
@ -129,6 +158,17 @@ impl ScrapeResponse {
}
}
#[cfg(feature = "with-axum")]
impl axum::response::IntoResponse for ScrapeResponse {
fn into_response(self) -> axum::response::Response {
let mut response_bytes = Vec::with_capacity(128);
self.write(&mut response_bytes).unwrap();
([("Content-type", "text/plain")], response_bytes).into_response()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureResponse {
#[serde(rename = "failure reason")]
@ -142,7 +182,7 @@ impl FailureResponse {
}
}
fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<usize> {
pub fn write<W: Write>(&self, output: &mut W) -> ::std::io::Result<usize> {
let mut bytes_written = 0usize;
let reason_bytes = self.failure_reason.as_bytes();
@ -157,6 +197,17 @@ impl FailureResponse {
}
}
#[cfg(feature = "with-axum")]
impl axum::response::IntoResponse for FailureResponse {
fn into_response(self) -> axum::response::Response {
let mut response_bytes = Vec::with_capacity(64);
self.write(&mut response_bytes).unwrap();
([("Content-type", "text/plain")], response_bytes).into_response()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Response {
@ -178,6 +229,17 @@ impl Response {
}
}
#[cfg(feature = "with-axum")]
impl axum::response::IntoResponse for Response {
fn into_response(self) -> axum::response::Response {
match self {
Self::Announce(r) => r.into_response(),
Self::Scrape(r) => r.into_response(),
Self::Failure(r) => r.into_response(),
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeer<Ipv4Addr> {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
@ -232,6 +294,7 @@ impl quickcheck::Arbitrary for AnnounceResponse {
incomplete: usize::arbitrary(g),
peers: ResponsePeerListV4::arbitrary(g),
peers6: ResponsePeerListV6::arbitrary(g),
warning_message: quickcheck::Arbitrary::arbitrary(g),
}
}
}
@ -264,11 +327,18 @@ mod tests {
fn test_announce_response_to_bytes(response: AnnounceResponse) -> bool {
let reference = bendy::serde::to_bytes(&Response::Announce(response.clone())).unwrap();
let mut output = Vec::new();
let mut hand_written = Vec::new();
response.write(&mut output).unwrap();
response.write(&mut hand_written).unwrap();
output == reference
let success = hand_written == reference;
if !success {
println!("reference: {}", String::from_utf8_lossy(&reference));
println!("hand_written: {}", String::from_utf8_lossy(&hand_written));
}
success
}
#[quickcheck]

View file

@ -57,6 +57,17 @@ pub fn urldecode_20_bytes(value: &str) -> anyhow::Result<[u8; 20]> {
Ok(out_arr)
}
#[inline]
pub fn serialize_optional_string<S>(v: &Option<String>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match v {
Some(s) => serializer.serialize_str(s.as_str()),
None => Err(serde::ser::Error::custom("use skip_serializing_if")),
}
}
#[inline]
pub fn serialize_20_bytes<S>(bytes: &[u8; 20], serializer: S) -> Result<S::Ok, S::Error>
where

View file

@ -19,10 +19,10 @@ name = "aquatic_udp"
cpu-pinning = ["aquatic_common/with-hwloc"]
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = "0.2.0"
aquatic_toml_config = "0.2.0"
aquatic_udp_protocol = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" }
anyhow = "1"
cfg-if = "1"

View file

@ -11,11 +11,11 @@ readme = "../README.md"
name = "aquatic_udp_bench"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = "0.2.0"
aquatic_toml_config = "0.2.0"
aquatic_udp = "0.2.0"
aquatic_udp_protocol = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp = { version = "0.2.0", path = "../aquatic_udp" }
aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" }
anyhow = "1"
crossbeam-channel = "0.5"

View file

@ -16,10 +16,10 @@ cpu-pinning = ["aquatic_common/with-hwloc"]
name = "aquatic_udp_load_test"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = "0.2.0"
aquatic_toml_config = "0.2.0"
aquatic_udp_protocol = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common" }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" }
anyhow = "1"
hashbrown = "0.12"

View file

@ -16,10 +16,10 @@ name = "aquatic_ws"
name = "aquatic_ws"
[dependencies]
aquatic_cli_helpers = "0.2.0"
aquatic_common = { version = "0.2.0", features = ["with-glommio"] }
aquatic_toml_config = "0.2.0"
aquatic_ws_protocol = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["rustls-config", "with-glommio"] }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" }
anyhow = "1"
async-tungstenite = "0.17"

View file

@ -5,8 +5,6 @@ use aquatic_common::CanonicalSocketAddr;
pub use aquatic_common::ValidUntil;
pub type TlsConfig = futures_rustls::rustls::ServerConfig;
#[derive(Default, Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,

View file

@ -2,12 +2,11 @@ pub mod common;
pub mod config;
pub mod workers;
use std::fs::File;
use std::io::BufReader;
use std::sync::{atomic::AtomicUsize, Arc};
use aquatic_common::cpu_pinning::glommio::{get_worker_placement, set_affinity_for_util_worker};
use aquatic_common::cpu_pinning::WorkerIndex;
use aquatic_common::rustls_config::create_rustls_config;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use signal_hook::{consts::SIGUSR1, iterator::Signals};
@ -64,7 +63,10 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> {
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let tls_config = Arc::new(create_tls_config(&config).unwrap());
let tls_config = Arc::new(create_rustls_config(
&config.network.tls_certificate_path,
&config.network.tls_private_key_path,
)?);
let mut executors = Vec::new();
@ -150,32 +152,3 @@ fn run_workers(config: Config, state: State) -> anyhow::Result<()> {
Ok(())
}
fn create_tls_config(config: &Config) -> anyhow::Result<rustls::ServerConfig> {
let certs = {
let f = File::open(&config.network.tls_certificate_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::certs(&mut f)?
.into_iter()
.map(|bytes| rustls::Certificate(bytes))
.collect()
};
let private_key = {
let f = File::open(&config.network.tls_private_key_path)?;
let mut f = BufReader::new(f);
rustls_pemfile::pkcs8_private_keys(&mut f)?
.first()
.map(|bytes| rustls::PrivateKey(bytes.clone()))
.ok_or(anyhow::anyhow!("No private keys in file"))?
};
let tls_config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key)?;
Ok(tls_config)
}

View file

@ -8,6 +8,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::CanonicalSocketAddr;
use aquatic_ws_protocol::*;
use async_tungstenite::WebSocketStream;
@ -49,7 +50,7 @@ struct ConnectionReference {
pub async fn run_socket_worker(
config: Config,
state: State,
tls_config: Arc<TlsConfig>,
tls_config: Arc<RustlsConfig>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
@ -214,7 +215,7 @@ async fn run_connection(
out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>,
out_message_consumer_id: ConsumerId,
connection_id: ConnectionId,
tls_config: Arc<TlsConfig>,
tls_config: Arc<RustlsConfig>,
stream: TcpStream,
) -> anyhow::Result<()> {
let peer_addr = stream

View file

@ -13,13 +13,13 @@ readme = "../README.md"
name = "aquatic_ws_load_test"
[dependencies]
async-tungstenite = "0.17"
aquatic_cli_helpers = "0.2.0"
aquatic_common = { version = "0.2.0", features = ["with-glommio"] }
aquatic_toml_config = "0.2.0"
aquatic_ws_protocol = "0.2.0"
aquatic_cli_helpers = { version = "0.2.0", path = "../aquatic_cli_helpers" }
aquatic_common = { version = "0.2.0", path = "../aquatic_common", features = ["with-glommio"] }
aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_ws_protocol = { version = "0.2.0", path = "../aquatic_ws_protocol" }
anyhow = "1"
async-tungstenite = "0.17"
futures = "0.3"
futures-rustls = "0.22"
glommio = "0.7"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

View file

@ -1,129 +0,0 @@
# Working procedure for testing file transfer with aquatic_ws
- On VPS, create identity (using real certificate), run tracker with TLS
- On VPS, create torrent with external url as announce. Edit file and put
external url not only as announce, but in announce list too.
- On VPS, disallow traffic to other trackers by adding them to /etc/hosts
or maybe with firewall, since webtorrent-hybrid adds its own trackers
willy-nilly. To get a list of the tracker urls which are actually used,
the node application under heading "Seed application" can be used as a
starting point.
- I opened the listening port in the VPS firewall too (this might not be
necessary if running both clients on the VPS, see below)
- On VPS, run webtorrent-hybrid download --keep-seeding ./abc.torrent
- On desktop/non-VPS computer, fetch torrent file, run
webtorrent-hybrid download ./abc.torrent
I actually got it to work running this client on the VPS too.
## Seed application
```js
// Start webtorrent seeder from data file, create torrent, write it to file,
// output info
var WebTorrent = require('webtorrent-hybrid')
var fs = require('fs')
// WebTorrent seems to use same peer id for different
// clients in some cases (I don't know how)
peerId = "ae61b6f4a5be4ada48333891512db5e90347d736"
announceUrl = 'ws://127.0.0.1:3000'
dataFile = './files-seed/ws-ipv4'
torrentFile = './torrents/ws-ipv4.torrent'
function createSeeder(){
console.log('creating seeder..')
var seeder = new WebTorrent({ dht: false, webSeeds: false, peerId: peerId })
seeder.on('error', function(err) {
console.log('seeder error: ' + err)
})
var addOpts = {
announceList: [[announceUrl]],
announce: [announceUrl],
private: true
}
seeder.seed(dataFile, addOpts, function(torrent){
console.log("seeding")
// Log torrent info, including actually used trackers
console.log(torrent)
fs.writeFile(torrentFile, torrent.torrentFile, function(err){
if (err){
console.log(err)
}
})
torrent.on('warning', function(err){
console.log(err)
});
torrent.on('error', function(err){
console.log(err)
});
torrent.on('download', function(bytes){
console.log('downloaded bytes: ' + bytes)
});
torrent.on('upload', function(bytes){
console.log('uploaded bytes: ' + bytes)
});
torrent.on('wire', function(wire, addr){
console.log('connected to peer with addr: ' + addr)
});
torrent.on('noPeers', function(announceType){
console.log('no peers received with announce type: ' + announceType)
})
torrent.on('done', function(){
console.log('done')
});
})
}
createSeeder()
```
## Simplifications to procedure that might work
- using fake certificate and routing certificate url to localhost in
/etc/hosts, meaning all of this could maybe be run locally/in Docker (but I
think sdp negotiations tend to fail in that case..)
## Issues with Docker implementation
- webtorrent-hybrid adds its own trackers when opening torrents, even if they
have been removed from file! The really robust way to get around this would
be to block all outgoing traffic with e.g. iptables before starting tests,
but I couldn't get it to work
## Notes on testing locally
Official tracker does not successfully handle file transfer on localhost
on my machine between two instances of the official client (webtorrent-hybrid),
probably due to sdp negotiation issues. This was with plain `ws` protocol.
## Possibly useful collection of commands
```sh
npm install -g webtorrent-hybrid
npm install -g bittorrent-tracker # Reference tracker
bittorrent-tracker --ws -p 3000 & # Reference tracker
mkdir files-seed files-leech torrents
webtorrent create files-seed/ws-ipv4 --announce "wss://127.0.0.1:3000" > torrents/ws-ipv4.torrent
cd files-seed
webtorrent-hybrid seed torrents/ws-ipv4.torrent --keep-seeding -q &
cd ../files-leech
webtorrent-hybrid download torrents/ws-ipv4.torrent -q &
```

View file

@ -1,18 +1,17 @@
#/bin/bash
# Generate self-signed TLS cert and private key for local testing
set -e
mkdir -p tmp/tls
TLS_DIR="./tmp/tls"
cd tmp/tls
mkdir -p "$TLS_DIR"
cd "$TLS_DIR"
openssl ecparam -genkey -name prime256v1 -out key.pem
openssl req -new -sha256 -key key.pem -out csr.csr -subj "/C=GB/ST=Test/L=Test/O=Test/OU=Test/CN=example.com"
openssl req -x509 -sha256 -nodes -days 365 -key key.pem -in csr.csr -out cert.crt
sudo cp cert.crt /usr/local/share/ca-certificates/snakeoil.crt
sudo update-ca-certificates
openssl pkcs8 -in key.pem -topk8 -nocrypt -out key.pk8
# openssl pkcs12 -export -passout "pass:p" -out identity.pfx -inkey key.pem -in cert.crt
echo "tls_certificate_path = \"$TLS_DIR/cert.crt\""
echo "tls_private_key_path = \"$TLS_DIR/key.pk8\""

View file

@ -0,0 +1,5 @@
#!/bin/sh
. ./scripts/env-native-cpu-without-avx-512
cargo run --profile "release-debug" --bin aquatic_http_private -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic_http -- $@
cargo run --profile "release-debug" --bin aquatic_http -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic_udp -- $@
cargo run --profile "release-debug" --bin aquatic_udp -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic_ws -- $@
cargo run --profile "release-debug" --bin aquatic_ws -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic -- $@
cargo run --profile "release-debug" --bin aquatic -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic_http_load_test -- $@
cargo run --profile "release-debug" --bin aquatic_http_load_test -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic_udp_load_test -- $@
cargo run --profile "release-debug" --bin aquatic_udp_load_test -- $@

View file

@ -2,4 +2,4 @@
. ./scripts/env-native-cpu-without-avx-512
cargo run --release --bin aquatic_ws_load_test -- $@
cargo run --profile "release-debug" --bin aquatic_ws_load_test -- $@