Merge pull request #73 from greatest-ape/2022-04-13

udp: use blake3 hmac for connection IDs, other improvements; run cargo update
This commit is contained in:
Joakim Frostegård 2022-04-16 10:35:15 +02:00 committed by GitHub
commit 99792eefc3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1398 additions and 1187 deletions

126
Cargo.lock generated
View file

@ -209,8 +209,12 @@ dependencies = [
"aquatic_common", "aquatic_common",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_udp_protocol", "aquatic_udp_protocol",
"blake3",
"cfg-if", "cfg-if",
"constant_time_eq 0.2.1",
"crossbeam-channel", "crossbeam-channel",
"getrandom",
"hashbrown 0.12.0",
"hex", "hex",
"log", "log",
"mimalloc", "mimalloc",
@ -350,6 +354,12 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
[[package]]
name = "arrayref"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c527152e37cf757a3f78aae5a06fbeefdb07ccc535c980a3208ee3060dd544"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.4.12" version = "0.4.12"
@ -359,6 +369,12 @@ dependencies = [
"nodrop", "nodrop",
] ]
[[package]]
name = "arrayvec"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.53" version = "0.1.53"
@ -426,9 +442,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]] [[package]]
name = "axum" name = "axum"
version = "0.5.0" version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5611d4977882c5af1c0f7a34d51b5d87f784f86912bb543986b014ea4995ef93" checksum = "47594e438a243791dba58124b6669561f5baa14cb12046641d8008bf035e5a25"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum-core", "axum-core",
@ -456,9 +472,9 @@ dependencies = [
[[package]] [[package]]
name = "axum-core" name = "axum-core"
version = "0.2.0" version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95cd109b3e93c9541dcce5b0219dcf89169dcc58c1bebed65082808324258afb" checksum = "9a671c9ae99531afdd5d3ee8340b8da547779430689947144c140fc74a740244"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
@ -524,6 +540,20 @@ version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb" checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb"
[[package]]
name = "blake3"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f"
dependencies = [
"arrayref",
"arrayvec 0.7.2",
"cc",
"cfg-if",
"constant_time_eq 0.1.5",
"digest 0.10.3",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.9.0" version = "0.9.0"
@ -655,6 +685,18 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b"
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "constant_time_eq"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df04a53a7e91248c27eb6bfc1db165e8f47453e98478e4609f9cce020bb3c65a"
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.2" version = "0.2.2"
@ -854,6 +896,7 @@ checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [ dependencies = [
"block-buffer 0.10.2", "block-buffer 0.10.2",
"crypto-common", "crypto-common",
"subtle",
] ]
[[package]] [[package]]
@ -1348,9 +1391,9 @@ checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]] [[package]]
name = "httparse" name = "httparse"
version = "1.6.0" version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba"
[[package]] [[package]]
name = "httpdate" name = "httpdate"
@ -1481,9 +1524,9 @@ checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.56" version = "0.3.57"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397"
dependencies = [ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
@ -1509,9 +1552,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.121" version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd"
[[package]] [[package]]
name = "libm" name = "libm"
@ -1756,7 +1799,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465" checksum = "bafe4179722c2894288ee77a9f044f02811c86af699344c498b0840c698a2465"
dependencies = [ dependencies = [
"arrayvec", "arrayvec 0.4.12",
"itoa 0.4.8", "itoa 0.4.8",
] ]
@ -2056,9 +2099,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.36" version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]
@ -2087,9 +2130,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.17" version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -2136,9 +2179,9 @@ dependencies = [
[[package]] [[package]]
name = "rayon" name = "rayon"
version = "1.5.1" version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" checksum = "fd249e82c21598a9a426a4e00dd7adc1d640b22445ec8545feef801d1a74c221"
dependencies = [ dependencies = [
"autocfg 1.1.0", "autocfg 1.1.0",
"crossbeam-deque", "crossbeam-deque",
@ -2148,14 +2191,13 @@ dependencies = [
[[package]] [[package]]
name = "rayon-core" name = "rayon-core"
version = "1.9.1" version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" checksum = "9f51245e1e62e1f1629cbfec37b5793bbabcaeb90f30e94d2ba03564687353e4"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"crossbeam-deque", "crossbeam-deque",
"crossbeam-utils", "crossbeam-utils",
"lazy_static",
"num_cpus", "num_cpus",
] ]
@ -2468,9 +2510,9 @@ dependencies = [
[[package]] [[package]]
name = "simdutf8" name = "simdutf8"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]] [[package]]
name = "simple_logger" name = "simple_logger"
@ -2493,9 +2535,9 @@ checksum = "76a77a8fd93886010f05e7ea0720e569d6d16c65329dbe3ec033bbbccccb017b"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.5" version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
[[package]] [[package]]
name = "smallvec" name = "smallvec"
@ -2681,9 +2723,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.90" version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2926,9 +2968,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.32" version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"log", "log",
@ -2950,9 +2992,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-core" name = "tracing-core"
version = "0.1.23" version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" checksum = "6dfce9f3241b150f36e8e54bb561a742d5daa1a47b5dd9a5ce369fd4a4db2210"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
] ]
@ -3116,9 +3158,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.79" version = "0.2.80"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"wasm-bindgen-macro", "wasm-bindgen-macro",
@ -3126,9 +3168,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-backend" name = "wasm-bindgen-backend"
version = "0.2.79" version = "0.2.80"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"lazy_static", "lazy_static",
@ -3141,9 +3183,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.79" version = "0.2.80"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5"
dependencies = [ dependencies = [
"quote", "quote",
"wasm-bindgen-macro-support", "wasm-bindgen-macro-support",
@ -3151,9 +3193,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro-support" name = "wasm-bindgen-macro-support"
version = "0.2.79" version = "0.2.80"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3164,15 +3206,15 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-shared" name = "wasm-bindgen-shared"
version = "0.2.79" version = "0.2.80"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744"
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.56" version = "0.3.57"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283"
dependencies = [ dependencies = [
"js-sys", "js-sys",
"wasm-bindgen", "wasm-bindgen",

18
TODO.md
View file

@ -2,16 +2,13 @@
## High priority ## High priority
* aquatic_http_private
* Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead
* stored procedure
* test ip format
* check user token length
* site will likely want num_seeders and num_leechers for all torrents..
## Medium priority ## Medium priority
* rename request workers to swarm workers * rename request workers to swarm workers
* save space by making ValidUntil just contain u32 with seconds, measured
some Instant created at application start
* quit whole program if any thread panics * quit whole program if any thread panics
* But it would be nice not to panic in workers, but to return errors instead. * But it would be nice not to panic in workers, but to return errors instead.
Once JoinHandle::is_finished is available in stable Rust (#90470), an Once JoinHandle::is_finished is available in stable Rust (#90470), an
@ -29,6 +26,13 @@
* SinkExt::send maybe doesn't wake up properly? * SinkExt::send maybe doesn't wake up properly?
* related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ?
* aquatic_http_private
* Consider not setting Content-type: text/plain for responses and send vec as default octet stream instead
* stored procedure
* test ip format
* check user token length
* site will likely want num_seeders and num_leechers for all torrents..
* extract_response_peers * extract_response_peers
* don't assume requesting peer is in list? * don't assume requesting peer is in list?

View file

@ -24,8 +24,12 @@ aquatic_toml_config = { version = "0.2.0", path = "../aquatic_toml_config" }
aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" } aquatic_udp_protocol = { version = "0.2.0", path = "../aquatic_udp_protocol" }
anyhow = "1" anyhow = "1"
blake3 = "1"
cfg-if = "1" cfg-if = "1"
constant_time_eq = "0.2"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
getrandom = "0.2"
hashbrown = { version = "0.12", default-features = false }
hex = "0.4" hex = "0.4"
log = "0.4" log = "0.4"
mimalloc = { version = "0.1", default-features = false } mimalloc = { version = "0.1", default-features = false }

View file

@ -1,18 +1,107 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::hash::Hash; use std::hash::Hash;
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use aquatic_common::CanonicalSocketAddr; use anyhow::Context;
use constant_time_eq::constant_time_eq;
use crossbeam_channel::{Sender, TrySendError}; use crossbeam_channel::{Sender, TrySendError};
use getrandom::getrandom;
use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::config::Config; use crate::config::Config;
pub const MAX_PACKET_SIZE: usize = 8192; pub const BUFFER_SIZE: usize = 8192;
/// HMAC (BLAKE3) based ConnectionID creator and validator
///
/// Structure of created ConnectionID (bytes making up inner i64):
/// - &[0..4]: connection expiration time as number of seconds after
/// ConnectionValidator instance was created, encoded as u32 bytes.
/// Value fits around 136 years.
/// - &[4..8]: truncated keyed BLAKE3 hash of above 4 bytes and octets of
/// client IP address
///
/// The purpose of using ConnectionIDs is to prevent IP spoofing, mainly to
/// prevent the tracker from being used as an amplification vector for DDoS
/// attacks. By including 32 bits of BLAKE3 keyed hash output in its contents,
/// such abuse should be rendered impractical.
#[derive(Clone)]
pub struct ConnectionValidator {
start_time: Instant,
max_connection_age: u32,
keyed_hasher: blake3::Hasher,
}
impl ConnectionValidator {
/// Create new instance. Must be created once and cloned if used in several
/// threads.
pub fn new(config: &Config) -> anyhow::Result<Self> {
let mut key = [0; 32];
getrandom(&mut key)
.with_context(|| "Couldn't get random bytes for ConnectionValidator key")?;
let keyed_hasher = blake3::Hasher::new_keyed(&key);
Ok(Self {
keyed_hasher,
start_time: Instant::now(),
max_connection_age: config.cleaning.max_connection_age,
})
}
pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId {
let valid_until =
(self.start_time.elapsed().as_secs() as u32 + self.max_connection_age).to_ne_bytes();
let hash = self.hash(valid_until, source_addr.get().ip());
let mut connection_id_bytes = [0u8; 8];
(&mut connection_id_bytes[..4]).copy_from_slice(&valid_until);
(&mut connection_id_bytes[4..]).copy_from_slice(&hash);
ConnectionId(i64::from_ne_bytes(connection_id_bytes))
}
pub fn connection_id_valid(
&mut self,
source_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
) -> bool {
let bytes = connection_id.0.to_ne_bytes();
let (valid_until, hash) = bytes.split_at(4);
let valid_until: [u8; 4] = valid_until.try_into().unwrap();
if !constant_time_eq(hash, &self.hash(valid_until, source_addr.get().ip())) {
return false;
}
u32::from_ne_bytes(valid_until) > self.start_time.elapsed().as_secs() as u32
}
fn hash(&mut self, valid_until: [u8; 4], ip_addr: IpAddr) -> [u8; 4] {
self.keyed_hasher.update(&valid_until);
match ip_addr {
IpAddr::V4(ip) => self.keyed_hasher.update(&ip.octets()),
IpAddr::V6(ip) => self.keyed_hasher.update(&ip.octets()),
};
let mut hash = [0u8; 4];
self.keyed_hasher.finalize_xof().fill(&mut hash);
self.keyed_hasher.reset();
hash
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct PendingScrapeRequest { pub struct PendingScrapeRequest {
@ -185,9 +274,13 @@ impl State {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::Ipv6Addr; use std::net::{Ipv6Addr, SocketAddr};
use crate::{common::MAX_PACKET_SIZE, config::Config}; use quickcheck_macros::quickcheck;
use crate::config::Config;
use super::*;
#[test] #[test]
fn test_peer_status_from_event_and_bytes_left() { fn test_peer_status_from_event_and_bytes_left() {
@ -213,7 +306,7 @@ mod tests {
// Assumes that announce response with maximum amount of ipv6 peers will // Assumes that announce response with maximum amount of ipv6 peers will
// be the longest // be the longest
#[test] #[test]
fn test_max_package_size() { fn test_buffer_size() {
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
let config = Config::default(); let config = Config::default();
@ -239,6 +332,44 @@ mod tests {
println!("Buffer len: {}", buf.len()); println!("Buffer len: {}", buf.len());
assert!(buf.len() <= MAX_PACKET_SIZE); assert!(buf.len() <= BUFFER_SIZE);
}
#[quickcheck]
fn test_connection_validator(
original_addr: IpAddr,
different_addr: IpAddr,
max_connection_age: u32,
) -> quickcheck::TestResult {
let original_addr = CanonicalSocketAddr::new(SocketAddr::new(original_addr, 0));
let different_addr = CanonicalSocketAddr::new(SocketAddr::new(different_addr, 0));
if original_addr == different_addr {
return quickcheck::TestResult::discard();
}
let mut validator = {
let mut config = Config::default();
config.cleaning.max_connection_age = max_connection_age;
ConnectionValidator::new(&config).unwrap()
};
let connection_id = validator.create_connection_id(original_addr);
let original_valid = validator.connection_id_valid(original_addr, connection_id);
let different_valid = validator.connection_id_valid(different_addr, connection_id);
if different_valid {
return quickcheck::TestResult::failed();
}
if max_connection_age == 0 {
quickcheck::TestResult::from_bool(!original_valid)
} else {
// Note: depends on that running this test takes less than a second
quickcheck::TestResult::from_bool(original_valid)
}
} }
} }

View file

@ -163,8 +163,6 @@ impl Default for StatisticsConfig {
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)] #[serde(default)]
pub struct CleaningConfig { pub struct CleaningConfig {
/// Clean connections this often (seconds)
pub connection_cleaning_interval: u64,
/// Clean torrents this often (seconds) /// Clean torrents this often (seconds)
pub torrent_cleaning_interval: u64, pub torrent_cleaning_interval: u64,
/// Clean pending scrape responses this often (seconds) /// Clean pending scrape responses this often (seconds)
@ -173,8 +171,8 @@ pub struct CleaningConfig {
/// lingering for a long time. However, the cleaning also returns unused /// lingering for a long time. However, the cleaning also returns unused
/// allocated memory to the OS, so the interval can be configured here. /// allocated memory to the OS, so the interval can be configured here.
pub pending_scrape_cleaning_interval: u64, pub pending_scrape_cleaning_interval: u64,
/// Remove connections that are older than this (seconds) /// Allow clients to use a connection token for this long (seconds)
pub max_connection_age: u64, pub max_connection_age: u32,
/// Remove peers who have not announced for this long (seconds) /// Remove peers who have not announced for this long (seconds)
pub max_peer_age: u64, pub max_peer_age: u64,
/// Remove pending scrape responses that have not been returned from request /// Remove pending scrape responses that have not been returned from request
@ -185,7 +183,6 @@ pub struct CleaningConfig {
impl Default for CleaningConfig { impl Default for CleaningConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
connection_cleaning_interval: 60,
torrent_cleaning_interval: 60 * 2, torrent_cleaning_interval: 60 * 2,
pending_scrape_cleaning_interval: 60 * 10, pending_scrape_cleaning_interval: 60 * 10,
max_connection_age: 60 * 2, max_connection_age: 60 * 2,

View file

@ -2,39 +2,39 @@ pub mod common;
pub mod config; pub mod config;
pub mod workers; pub mod workers;
use aquatic_common::PanicSentinelWatcher;
use config::Config;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::thread::Builder; use std::thread::Builder;
use anyhow::Context; use anyhow::Context;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::PrivilegeDropper;
use crossbeam_channel::{bounded, unbounded}; use crossbeam_channel::{bounded, unbounded};
use aquatic_common::access_list::update_access_list;
use signal_hook::consts::{SIGTERM, SIGUSR1}; use signal_hook::consts::{SIGTERM, SIGUSR1};
use signal_hook::iterator::Signals; use signal_hook::iterator::Signals;
use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State}; use aquatic_common::access_list::update_access_list;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::PanicSentinelWatcher;
use crate::common::RequestWorkerIndex; use common::{
ConnectedRequestSender, ConnectedResponseSender, ConnectionValidator, RequestWorkerIndex,
SocketWorkerIndex, State,
};
use config::Config;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::new(config.request_workers);
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new([SIGUSR1, SIGTERM])?; let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
let state = State::new(config.request_workers);
let connection_validator = ConnectionValidator::new(&config)?;
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
update_access_list(&config.access_list, &state.access_list)?;
let mut request_senders = Vec::new(); let mut request_senders = Vec::new();
let mut request_receivers = BTreeMap::new(); let mut request_receivers = BTreeMap::new();
@ -97,6 +97,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let sentinel = sentinel.clone(); let sentinel = sentinel.clone();
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
let connection_validator = connection_validator.clone();
let request_sender = let request_sender =
ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone()); ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone());
let response_receiver = response_receivers.remove(&i).unwrap(); let response_receiver = response_receivers.remove(&i).unwrap();
@ -118,6 +119,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
state, state,
config, config,
i, i,
connection_validator,
request_sender, request_sender,
response_receiver, response_receiver,
priv_dropper, priv_dropper,

View file

@ -1,445 +0,0 @@
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::AmortizedIndexMap;
use aquatic_common::CanonicalSocketAddr;
use aquatic_common::PanicSentinel;
use aquatic_common::ValidUntil;
use crossbeam_channel::Receiver;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::extract_response_peers;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
#[derive(Clone, Debug)]
struct Peer<I: Ip> {
pub ip_address: I,
pub port: Port,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
type PeerMap<I> = AmortizedIndexMap<PeerId, Peer<I>>;
struct TorrentData<I: Ip> {
pub peers: PeerMap<I>,
pub num_seeders: usize,
pub num_leechers: usize,
}
impl<I: Ip> Default for TorrentData<I> {
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)]
struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>,
}
impl TorrentMaps {
/// Remove disallowed and inactive torrents
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
let now = Instant::now();
let access_list_mode = config.access_list.mode;
let mut access_list_cache = create_access_list_cache(access_list);
self.ipv4.retain(|info_hash, torrent| {
access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv4.shrink_to_fit();
self.ipv6.retain(|info_hash, torrent| {
access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv6.shrink_to_fit();
}
/// Returns true if torrent is to be kept
#[inline]
fn clean_torrent_and_peers<I: Ip>(now: Instant, torrent: &mut TorrentData<I>) -> bool {
let num_seeders = &mut torrent.num_seeders;
let num_leechers = &mut torrent.num_leechers;
torrent.peers.retain(|_, peer| {
let keep = peer.valid_until.0 > now;
if !keep {
match peer.status {
PeerStatus::Seeding => {
*num_seeders -= 1;
}
PeerStatus::Leeching => {
*num_leechers -= 1;
}
_ => (),
};
}
keep
});
torrent.peers.shrink_to_fit();
!torrent.peers.is_empty()
}
}
pub fn run_request_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
response_sender: ConnectedResponseSender,
worker_index: RequestWorkerIndex,
) {
let mut torrents = TorrentMaps::default();
let mut small_rng = SmallRng::from_entropy();
let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms);
let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
let statistics_update_interval = Duration::from_secs(config.statistics.interval);
let mut last_cleaning = Instant::now();
let mut last_statistics_update = Instant::now();
let mut iter_counter = 0usize;
loop {
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
let response = match request {
ConnectedRequest::Announce(request) => handle_announce_request(
&config,
&mut small_rng,
&mut torrents,
request,
src,
peer_valid_until,
),
ConnectedRequest::Scrape(request) => {
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request))
}
};
response_sender.try_send_to(sender_index, response, src);
}
if iter_counter % 128 == 0 {
let now = Instant::now();
peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age);
if now > last_cleaning + cleaning_interval {
torrents.clean(&config, &state.access_list);
if config.statistics.active() {
let peers_ipv4 = torrents.ipv4.values().map(|t| t.peers.len()).sum();
let peers_ipv6 = torrents.ipv6.values().map(|t| t.peers.len()).sum();
state.statistics_ipv4.peers[worker_index.0]
.store(peers_ipv4, Ordering::Release);
state.statistics_ipv6.peers[worker_index.0]
.store(peers_ipv6, Ordering::Release);
}
last_cleaning = now;
}
if config.statistics.active()
&& now > last_statistics_update + statistics_update_interval
{
state.statistics_ipv4.torrents[worker_index.0]
.store(torrents.ipv4.len(), Ordering::Release);
state.statistics_ipv6.torrents[worker_index.0]
.store(torrents.ipv6.len(), Ordering::Release);
last_statistics_update = now;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMaps,
request: AnnounceRequest,
src: CanonicalSocketAddr,
peer_valid_until: ValidUntil,
) -> ConnectedResponse {
match src.get().ip() {
IpAddr::V4(ip) => ConnectedResponse::AnnounceIpv4(handle_announce_request_inner(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
)),
IpAddr::V6(ip) => ConnectedResponse::AnnounceIpv6(handle_announce_request_inner(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
)),
}
}
fn handle_announce_request_inner<I: Ip>(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMap<I>,
request: AnnounceRequest,
peer_ip: I,
peer_valid_until: ValidUntil,
) -> AnnounceResponse<I> {
let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left);
let peer = Peer {
ip_address: peer_ip,
port: request.port,
status: peer_status,
valid_until: peer_valid_until,
};
let torrent_data = torrents.entry(request.info_hash).or_default();
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(request.peer_id, peer)
}
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(request.peer_id, peer)
}
PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
}
_ => {}
}
let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0);
let response_peers = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
request.peer_id,
Peer::to_response_peer,
);
AnnounceResponse {
transaction_id: request.transaction_id,
announce_interval: AnnounceInterval(config.protocol.peer_announce_interval),
leechers: NumberOfPeers(torrent_data.num_leechers as i32),
seeders: NumberOfPeers(torrent_data.num_seeders as i32),
peers: response_peers,
}
}
#[inline]
fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize {
if peers_wanted <= 0 {
config.protocol.max_response_peers as usize
} else {
::std::cmp::min(
config.protocol.max_response_peers as usize,
peers_wanted as usize,
)
}
}
fn handle_scrape_request(
torrents: &mut TorrentMaps,
src: CanonicalSocketAddr,
request: PendingScrapeRequest,
) -> PendingScrapeResponse {
const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0);
let mut torrent_stats: BTreeMap<usize, TorrentScrapeStatistics> = BTreeMap::new();
if src.is_ipv4() {
torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| {
let s = if let Some(torrent_data) = torrents.ipv4.get(&info_hash) {
create_torrent_scrape_statistics(
torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32,
)
} else {
EMPTY_STATS
};
(i, s)
}));
} else {
torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| {
let s = if let Some(torrent_data) = torrents.ipv6.get(&info_hash) {
create_torrent_scrape_statistics(
torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32,
)
} else {
EMPTY_STATS
};
(i, s)
}));
}
PendingScrapeResponse {
slab_key: request.slab_key,
torrent_stats,
}
}
#[inline(always)]
const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics {
TorrentScrapeStatistics {
seeders: NumberOfPeers(seeders),
completed: NumberOfDownloads(0), // No implementation planned
leechers: NumberOfPeers(leechers),
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::net::Ipv4Addr;
use quickcheck::{quickcheck, TestResult};
use rand::thread_rng;
use super::*;
fn gen_peer_id(i: u32) -> PeerId {
let mut peer_id = PeerId([0; 20]);
peer_id.0[0..4].copy_from_slice(&i.to_ne_bytes());
peer_id
}
fn gen_peer(i: u32) -> Peer<Ipv4Addr> {
Peer {
ip_address: Ipv4Addr::from(i.to_be_bytes()),
port: Port(1),
status: PeerStatus::Leeching,
valid_until: ValidUntil::new(0),
}
}
#[test]
fn test_extract_response_peers() {
fn prop(data: (u16, u16)) -> TestResult {
let gen_num_peers = data.0 as u32;
let req_num_peers = data.1 as usize;
let mut peer_map: PeerMap<Ipv4Addr> = Default::default();
let mut opt_sender_key = None;
let mut opt_sender_peer = None;
for i in 0..gen_num_peers {
let key = gen_peer_id(i);
let peer = gen_peer((i << 16) + i);
if i == 0 {
opt_sender_key = Some(key);
opt_sender_peer = Some(peer.to_response_peer());
}
peer_map.insert(key, peer);
}
let mut rng = thread_rng();
let peers = extract_response_peers(
&mut rng,
&peer_map,
req_num_peers,
opt_sender_key.unwrap_or_else(|| gen_peer_id(1)),
Peer::to_response_peer,
);
// Check that number of returned peers is correct
let mut success = peers.len() <= req_num_peers;
if req_num_peers >= gen_num_peers as usize {
success &= peers.len() == gen_num_peers as usize
|| peers.len() + 1 == gen_num_peers as usize;
}
// Check that returned peers are unique (no overlap) and that sender
// isn't returned
let mut ip_addresses = HashSet::with_capacity(peers.len());
for peer in peers {
if peer == opt_sender_peer.clone().unwrap()
|| ip_addresses.contains(&peer.ip_address)
{
success = false;
break;
}
ip_addresses.insert(peer.ip_address);
}
TestResult::from_bool(success)
}
quickcheck(prop as fn((u16, u16)) -> TestResult);
}
}

View file

@ -0,0 +1,185 @@
mod storage;
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use crossbeam_channel::Receiver;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ValidUntil};
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
use storage::{Peer, TorrentMap, TorrentMaps};
pub fn run_request_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
response_sender: ConnectedResponseSender,
worker_index: RequestWorkerIndex,
) {
let mut torrents = TorrentMaps::default();
let mut rng = SmallRng::from_entropy();
let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms);
let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
let statistics_update_interval = Duration::from_secs(config.statistics.interval);
let mut last_cleaning = Instant::now();
let mut last_statistics_update = Instant::now();
let mut iter_counter = 0usize;
loop {
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
let response = match (request, src.get().ip()) {
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
let response = handle_announce_request(
&config,
&mut rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
);
ConnectedResponse::AnnounceIpv4(response)
}
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
let response = handle_announce_request(
&config,
&mut rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
);
ConnectedResponse::AnnounceIpv6(response)
}
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.ipv4, request))
}
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.ipv6, request))
}
};
response_sender.try_send_to(sender_index, response, src);
}
// Run periodic tasks
if iter_counter % 128 == 0 {
let now = Instant::now();
peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age);
if now > last_cleaning + cleaning_interval {
let (ipv4, ipv6) = torrents.clean_and_get_num_peers(&config, &state.access_list);
if config.statistics.active() {
state.statistics_ipv4.peers[worker_index.0].store(ipv4, Ordering::Release);
state.statistics_ipv6.peers[worker_index.0].store(ipv6, Ordering::Release);
}
last_cleaning = now;
}
if config.statistics.active()
&& now > last_statistics_update + statistics_update_interval
{
state.statistics_ipv4.torrents[worker_index.0]
.store(torrents.ipv4.num_torrents(), Ordering::Release);
state.statistics_ipv6.torrents[worker_index.0]
.store(torrents.ipv6.num_torrents(), Ordering::Release);
last_statistics_update = now;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn handle_announce_request<I: Ip>(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMap<I>,
request: AnnounceRequest,
peer_ip: I,
peer_valid_until: ValidUntil,
) -> AnnounceResponse<I> {
let max_num_peers_to_take = if request.peers_wanted.0 <= 0 {
config.protocol.max_response_peers as usize
} else {
::std::cmp::min(
config.protocol.max_response_peers as usize,
request.peers_wanted.0.try_into().unwrap(),
)
};
let peer = Peer {
ip_address: peer_ip,
port: request.port,
status: PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left),
valid_until: peer_valid_until,
};
let torrent_data = torrents.0.entry(request.info_hash).or_default();
torrent_data.update_peer(request.peer_id, peer);
let response_peers =
torrent_data.extract_response_peers(rng, request.peer_id, max_num_peers_to_take);
AnnounceResponse {
transaction_id: request.transaction_id,
announce_interval: AnnounceInterval(config.protocol.peer_announce_interval),
leechers: NumberOfPeers(torrent_data.num_leechers() as i32),
seeders: NumberOfPeers(torrent_data.num_seeders() as i32),
peers: response_peers,
}
}
fn handle_scrape_request<I: Ip>(
torrents: &mut TorrentMap<I>,
request: PendingScrapeRequest,
) -> PendingScrapeResponse {
const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0);
let torrent_stats = request
.info_hashes
.into_iter()
.map(|(i, info_hash)| {
let stats = torrents
.0
.get(&info_hash)
.map(|torrent_data| torrent_data.scrape_statistics())
.unwrap_or(EMPTY_STATS);
(i, stats)
})
.collect();
PendingScrapeResponse {
slab_key: request.slab_key,
torrent_stats,
}
}
#[inline(always)]
const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics {
TorrentScrapeStatistics {
seeders: NumberOfPeers(seeders),
completed: NumberOfDownloads(0), // No implementation planned
leechers: NumberOfPeers(leechers),
}
}

View file

@ -0,0 +1,297 @@
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::{
access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode},
extract_response_peers, AmortizedIndexMap, ValidUntil,
};
use aquatic_udp_protocol::*;
use rand::prelude::SmallRng;
use crate::common::*;
use crate::config::Config;
use super::create_torrent_scrape_statistics;
#[derive(Clone, Debug)]
pub struct Peer<I: Ip> {
pub ip_address: I,
pub port: Port,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
pub fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
pub type PeerMap<I> = AmortizedIndexMap<PeerId, Peer<I>>;
pub struct TorrentData<I: Ip> {
peers: PeerMap<I>,
num_seeders: usize,
num_leechers: usize,
}
impl<I: Ip> TorrentData<I> {
pub fn update_peer(&mut self, peer_id: PeerId, peer: Peer<I>) {
let opt_removed_peer = match peer.status {
PeerStatus::Leeching => {
self.num_leechers += 1;
self.peers.insert(peer_id, peer)
}
PeerStatus::Seeding => {
self.num_seeders += 1;
self.peers.insert(peer_id, peer)
}
PeerStatus::Stopped => self.peers.remove(&peer_id),
};
match opt_removed_peer.map(|peer| peer.status) {
Some(PeerStatus::Leeching) => {
self.num_leechers -= 1;
}
Some(PeerStatus::Seeding) => {
self.num_seeders -= 1;
}
_ => {}
}
}
pub fn extract_response_peers(
&self,
rng: &mut SmallRng,
peer_id: PeerId,
max_num_peers_to_take: usize,
) -> Vec<ResponsePeer<I>> {
extract_response_peers(
rng,
&self.peers,
max_num_peers_to_take,
peer_id,
Peer::to_response_peer,
)
}
pub fn num_leechers(&self) -> usize {
self.num_leechers
}
pub fn num_seeders(&self) -> usize {
self.num_seeders
}
pub fn scrape_statistics(&self) -> TorrentScrapeStatistics {
create_torrent_scrape_statistics(
self.num_seeders.try_into().unwrap_or(i32::MAX),
self.num_leechers.try_into().unwrap_or(i32::MAX),
)
}
/// Remove inactive peers and reclaim space
fn clean(&mut self, now: Instant) {
self.peers.retain(|_, peer| {
if peer.valid_until.0 > now {
true
} else {
match peer.status {
PeerStatus::Seeding => {
self.num_seeders -= 1;
}
PeerStatus::Leeching => {
self.num_leechers -= 1;
}
_ => (),
};
false
}
});
if !self.peers.is_empty() {
self.peers.shrink_to_fit();
}
}
}
impl<I: Ip> Default for TorrentData<I> {
fn default() -> Self {
Self {
peers: Default::default(),
num_seeders: 0,
num_leechers: 0,
}
}
}
#[derive(Default)]
pub struct TorrentMap<I: Ip>(pub AmortizedIndexMap<InfoHash, TorrentData<I>>);
impl<I: Ip> TorrentMap<I> {
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
fn clean_and_get_num_peers(
&mut self,
access_list_cache: &mut AccessListCache,
access_list_mode: AccessListMode,
now: Instant,
) -> usize {
let mut num_peers = 0;
self.0.retain(|info_hash, torrent| {
if !access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
{
return false;
}
torrent.clean(now);
num_peers += torrent.peers.len();
!torrent.peers.is_empty()
});
self.0.shrink_to_fit();
num_peers
}
pub fn num_torrents(&self) -> usize {
self.0.len()
}
}
pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4Addr>,
pub ipv6: TorrentMap<Ipv6Addr>,
}
impl Default for TorrentMaps {
fn default() -> Self {
Self {
ipv4: TorrentMap(Default::default()),
ipv6: TorrentMap(Default::default()),
}
}
}
impl TorrentMaps {
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
pub fn clean_and_get_num_peers(
&mut self,
config: &Config,
access_list: &Arc<AccessListArcSwap>,
) -> (usize, usize) {
let mut cache = create_access_list_cache(access_list);
let mode = config.access_list.mode;
let now = Instant::now();
let ipv4 = self.ipv4.clean_and_get_num_peers(&mut cache, mode, now);
let ipv6 = self.ipv6.clean_and_get_num_peers(&mut cache, mode, now);
(ipv4, ipv6)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::net::Ipv4Addr;
use quickcheck::{quickcheck, TestResult};
use rand::thread_rng;
use super::*;
fn gen_peer_id(i: u32) -> PeerId {
let mut peer_id = PeerId([0; 20]);
peer_id.0[0..4].copy_from_slice(&i.to_ne_bytes());
peer_id
}
fn gen_peer(i: u32) -> Peer<Ipv4Addr> {
Peer {
ip_address: Ipv4Addr::from(i.to_be_bytes()),
port: Port(1),
status: PeerStatus::Leeching,
valid_until: ValidUntil::new(0),
}
}
#[test]
fn test_extract_response_peers() {
fn prop(data: (u16, u16)) -> TestResult {
let gen_num_peers = data.0 as u32;
let req_num_peers = data.1 as usize;
let mut peer_map: PeerMap<Ipv4Addr> = Default::default();
let mut opt_sender_key = None;
let mut opt_sender_peer = None;
for i in 0..gen_num_peers {
let key = gen_peer_id(i);
let peer = gen_peer((i << 16) + i);
if i == 0 {
opt_sender_key = Some(key);
opt_sender_peer = Some(peer.to_response_peer());
}
peer_map.insert(key, peer);
}
let mut rng = thread_rng();
let peers = extract_response_peers(
&mut rng,
&peer_map,
req_num_peers,
opt_sender_key.unwrap_or_else(|| gen_peer_id(1)),
Peer::to_response_peer,
);
// Check that number of returned peers is correct
let mut success = peers.len() <= req_num_peers;
if req_num_peers >= gen_num_peers as usize {
success &= peers.len() == gen_num_peers as usize
|| peers.len() + 1 == gen_num_peers as usize;
}
// Check that returned peers are unique (no overlap) and that sender
// isn't returned
let mut ip_addresses = HashSet::with_capacity(peers.len());
for peer in peers {
if peer == opt_sender_peer.clone().unwrap()
|| ip_addresses.contains(&peer.ip_address)
{
success = false;
break;
}
ip_addresses.insert(peer.ip_address);
}
TestResult::from_bool(success)
}
quickcheck(prop as fn((u16, u16)) -> TestResult);
}
}

View file

@ -1,667 +0,0 @@
use std::collections::BTreeMap;
use std::io::{Cursor, ErrorKind};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use std::vec::Drain;
use anyhow::Context;
use aquatic_common::privileges::PrivilegeDropper;
use crossbeam_channel::Receiver;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
use rand::prelude::{Rng, SeedableRng, StdRng};
use slab::Slab;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListCache;
use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr};
use aquatic_common::{PanicSentinel, ValidUntil};
use aquatic_udp_protocol::*;
use socket2::{Domain, Protocol, Socket, Type};
use crate::common::*;
use crate::config::Config;
#[derive(Default)]
pub struct ConnectionMap(AmortizedIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>);
impl ConnectionMap {
pub fn insert(
&mut self,
connection_id: ConnectionId,
socket_addr: CanonicalSocketAddr,
valid_until: ValidUntil,
) {
self.0.insert((connection_id, socket_addr), valid_until);
}
pub fn contains(&self, connection_id: ConnectionId, socket_addr: CanonicalSocketAddr) -> bool {
self.0.contains_key(&(connection_id, socket_addr))
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0 > now);
self.0.shrink_to_fit();
}
}
#[derive(Debug)]
pub struct PendingScrapeResponseSlabEntry {
num_pending: usize,
valid_until: ValidUntil,
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
transaction_id: TransactionId,
}
#[derive(Default)]
pub struct PendingScrapeResponseSlab(Slab<PendingScrapeResponseSlabEntry>);
impl PendingScrapeResponseSlab {
pub fn prepare_split_requests(
&mut self,
config: &Config,
request: ScrapeRequest,
valid_until: ValidUntil,
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
let mut split_requests: AmortizedIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
Default::default();
if request.info_hashes.is_empty() {
::log::warn!(
"Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes"
);
return split_requests;
}
let vacant_entry = self.0.vacant_entry();
let slab_key = vacant_entry.key();
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
slab_key,
info_hashes: BTreeMap::new(),
});
split_request.info_hashes.insert(i, info_hash);
}
vacant_entry.insert(PendingScrapeResponseSlabEntry {
num_pending: split_requests.len(),
valid_until,
torrent_stats: Default::default(),
transaction_id: request.transaction_id,
});
split_requests
}
pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> {
let finished = if let Some(entry) = self.0.get_mut(response.slab_key) {
entry.num_pending -= 1;
entry
.torrent_stats
.extend(response.torrent_stats.into_iter());
entry.num_pending == 0
} else {
::log::warn!(
"PendingScrapeResponseSlab.add didn't find entry for key {:?}",
response.slab_key
);
false
};
if finished {
let entry = self.0.remove(response.slab_key);
Some(Response::Scrape(ScrapeResponse {
transaction_id: entry.transaction_id,
torrent_stats: entry.torrent_stats.into_values().collect(),
}))
} else {
None
}
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|k, v| {
let keep = v.valid_until.0 > now;
if !keep {
::log::warn!(
"Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}",
k,
v
);
}
keep
});
self.0.shrink_to_fit();
}
}
pub fn run_socket_worker(
_sentinel: PanicSentinel,
state: State,
config: Config,
token_num: usize,
request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
priv_dropper: PrivilegeDropper,
) {
let mut rng = StdRng::from_entropy();
let mut buffer = [0u8; MAX_PACKET_SIZE];
let mut socket =
UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket"));
let mut poll = Poll::new().expect("create poll");
let interests = Interest::READABLE;
poll.registry()
.register(&mut socket, Token(token_num), interests)
.unwrap();
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connections = ConnectionMap::default();
let mut pending_scrape_responses = PendingScrapeResponseSlab::default();
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new();
let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms);
let connection_cleaning_duration =
Duration::from_secs(config.cleaning.connection_cleaning_interval);
let pending_scrape_cleaning_duration =
Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval);
let mut connection_valid_until = ValidUntil::new(config.cleaning.max_connection_age);
let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age);
let mut last_connection_cleaning = Instant::now();
let mut last_pending_scrape_cleaning = Instant::now();
let mut iter_counter = 0usize;
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
for event in events.iter() {
let token = event.token();
if (token.0 == token_num) & event.is_readable() {
read_requests(
&config,
&state,
&mut connections,
&mut pending_scrape_responses,
&mut access_list_cache,
&mut rng,
&mut socket,
&mut buffer,
&request_sender,
&mut local_responses,
connection_valid_until,
pending_scrape_valid_until,
);
}
}
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
);
// Run periodic ValidUntil updates and state cleaning
if iter_counter % 128 == 0 {
let now = Instant::now();
connection_valid_until =
ValidUntil::new_with_now(now, config.cleaning.max_connection_age);
pending_scrape_valid_until =
ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age);
if now > last_connection_cleaning + connection_cleaning_duration {
connections.clean();
last_connection_cleaning = now;
}
if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration {
pending_scrape_responses.clean();
last_pending_scrape_cleaning = now;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
#[inline]
fn read_requests(
config: &Config,
state: &State,
connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng,
socket: &mut UdpSocket,
buffer: &mut [u8],
request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
connection_valid_until: ValidUntil,
pending_scrape_valid_until: ValidUntil,
) {
let mut requests_received_ipv4: usize = 0;
let mut requests_received_ipv6: usize = 0;
let mut bytes_received_ipv4: usize = 0;
let mut bytes_received_ipv6 = 0;
loop {
match socket.recv_from(&mut buffer[..]) {
Ok((amt, src)) => {
let res_request =
Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents);
let src = CanonicalSocketAddr::new(src);
// Update statistics for converted address
if src.is_ipv4() {
if res_request.is_ok() {
requests_received_ipv4 += 1;
}
bytes_received_ipv4 += amt;
} else {
if res_request.is_ok() {
requests_received_ipv6 += 1;
}
bytes_received_ipv6 += amt;
}
handle_request(
config,
connections,
pending_scrape_responses,
access_list_cache,
rng,
request_sender,
local_responses,
connection_valid_until,
pending_scrape_valid_until,
res_request,
src,
);
}
Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
break;
}
::log::info!("recv_from error: {}", err);
}
}
}
if config.statistics.active() {
state
.statistics_ipv4
.requests_received
.fetch_add(requests_received_ipv4, Ordering::Release);
state
.statistics_ipv6
.requests_received
.fetch_add(requests_received_ipv6, Ordering::Release);
state
.statistics_ipv4
.bytes_received
.fetch_add(bytes_received_ipv4, Ordering::Release);
state
.statistics_ipv6
.bytes_received
.fetch_add(bytes_received_ipv6, Ordering::Release);
}
}
pub fn handle_request(
config: &Config,
connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng,
request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
connection_valid_until: ValidUntil,
pending_scrape_valid_until: ValidUntil,
res_request: Result<Request, RequestParseError>,
src: CanonicalSocketAddr,
) {
let access_list_mode = config.access_list.mode;
match res_request {
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.insert(connection_id, src, connection_valid_until);
let response = Response::Connect(ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
});
local_responses.push((response, src))
}
Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) {
if access_list_cache
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let worker_index =
RequestWorkerIndex::from_info_hash(config, request.info_hash);
request_sender.try_send_to(
worker_index,
ConnectedRequest::Announce(request),
src,
);
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
local_responses.push((response, src))
}
}
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
let split_requests = pending_scrape_responses.prepare_split_requests(
config,
request,
pending_scrape_valid_until,
);
for (request_worker_index, request) in split_requests {
request_sender.try_send_to(
request_worker_index,
ConnectedRequest::Scrape(request),
src,
);
}
}
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if connections.contains(connection_id, src) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
};
local_responses.push((response.into(), src));
}
}
}
}
}
#[inline]
fn send_responses(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
local_responses: Drain<(Response, CanonicalSocketAddr)>,
) {
for (response, addr) in local_responses {
send_response(state, config, socket, buffer, response, addr);
}
for (response, addr) in response_receiver.try_iter() {
let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
if let Some(response) = opt_response {
send_response(state, config, socket, buffer, response, addr);
}
}
}
fn send_response(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response: Response,
addr: CanonicalSocketAddr,
) {
let mut cursor = Cursor::new(buffer);
let canonical_addr_is_ipv4 = addr.is_ipv4();
let addr = if config.network.address.is_ipv4() {
addr.get_ipv4()
.expect("found peer ipv6 address while running bound to ipv4 address")
} else {
addr.get_ipv6_mapped()
};
match response.write(&mut cursor) {
Ok(()) => {
let amt = cursor.position() as usize;
match socket.send_to(&cursor.get_ref()[..amt], addr) {
Ok(amt) if config.statistics.active() => {
let stats = if canonical_addr_is_ipv4 {
&state.statistics_ipv4
} else {
&state.statistics_ipv6
};
stats.bytes_sent.fetch_add(amt, Ordering::Relaxed);
match response {
Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
}
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
}
}
}
Ok(_) => {}
Err(err) => {
::log::info!("send_to error: {}", err);
}
}
}
Err(err) => {
::log::error!("Response::write error: {:?}", err);
}
}
}
pub fn create_socket(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<::std::net::UdpSocket> {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?
};
if config.network.only_ipv6 {
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.set_nonblocking(true)
.with_context(|| "socket: set nonblocking")?;
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) {
::log::error!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
}
socket
.bind(&config.network.address.into())
.with_context(|| format!("socket: bind to {}", config.network.address))?;
priv_dropper.after_socket_creation()?;
Ok(socket.into())
}
#[cfg(test)]
mod tests {
use quickcheck::TestResult;
use quickcheck_macros::quickcheck;
use super::*;
#[quickcheck]
fn test_pending_scrape_response_map(
request_data: Vec<(i32, i64, u8)>,
request_workers: u8,
) -> TestResult {
if request_workers == 0 {
return TestResult::discard();
}
let mut config = Config::default();
config.request_workers = request_workers as usize;
let valid_until = ValidUntil::new(1);
let mut map = PendingScrapeResponseSlab::default();
let mut requests = Vec::new();
for (t, c, b) in request_data {
if b == 0 {
return TestResult::discard();
}
let mut info_hashes = Vec::new();
for i in 0..b {
let info_hash = InfoHash([i; 20]);
info_hashes.push(info_hash);
}
let request = ScrapeRequest {
transaction_id: TransactionId(t),
connection_id: ConnectionId(c),
info_hashes,
};
requests.push(request);
}
let mut all_split_requests = Vec::new();
for request in requests.iter() {
let split_requests =
map.prepare_split_requests(&config, request.to_owned(), valid_until);
all_split_requests.push(
split_requests
.into_iter()
.collect::<Vec<(RequestWorkerIndex, PendingScrapeRequest)>>(),
);
}
assert_eq!(map.0.len(), requests.len());
let mut responses = Vec::new();
for split_requests in all_split_requests {
for (worker_index, split_request) in split_requests {
assert!(worker_index.0 < request_workers as usize);
let torrent_stats = split_request
.info_hashes
.into_iter()
.map(|(i, info_hash)| {
(
i,
TorrentScrapeStatistics {
seeders: NumberOfPeers((info_hash.0[0]) as i32),
leechers: NumberOfPeers(0),
completed: NumberOfDownloads(0),
},
)
})
.collect();
let response = PendingScrapeResponse {
slab_key: split_request.slab_key,
torrent_stats,
};
if let Some(response) = map.add_and_get_finished(response) {
responses.push(response);
}
}
}
assert!(map.0.is_empty());
assert_eq!(responses.len(), requests.len());
TestResult::from_bool(true)
}
}

View file

@ -0,0 +1,158 @@
mod requests;
mod responses;
mod storage;
use std::time::{Duration, Instant};
use anyhow::Context;
use crossbeam_channel::Receiver;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Protocol, Socket, Type};
use aquatic_common::{
access_list::create_access_list_cache, privileges::PrivilegeDropper, CanonicalSocketAddr,
PanicSentinel, ValidUntil,
};
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
use requests::read_requests;
use responses::send_responses;
use storage::PendingScrapeResponseSlab;
pub fn run_socket_worker(
_sentinel: PanicSentinel,
state: State,
config: Config,
token_num: usize,
mut connection_validator: ConnectionValidator,
request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
priv_dropper: PrivilegeDropper,
) {
let mut buffer = [0u8; BUFFER_SIZE];
let mut socket =
UdpSocket::from_std(create_socket(&config, priv_dropper).expect("create socket"));
let mut poll = Poll::new().expect("create poll");
let interests = Interest::READABLE;
poll.registry()
.register(&mut socket, Token(token_num), interests)
.unwrap();
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut pending_scrape_responses = PendingScrapeResponseSlab::default();
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new();
let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms);
let pending_scrape_cleaning_duration =
Duration::from_secs(config.cleaning.pending_scrape_cleaning_interval);
let mut pending_scrape_valid_until = ValidUntil::new(config.cleaning.max_pending_scrape_age);
let mut last_pending_scrape_cleaning = Instant::now();
let mut iter_counter = 0usize;
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
for event in events.iter() {
let token = event.token();
if (token.0 == token_num) & event.is_readable() {
read_requests(
&config,
&state,
&mut connection_validator,
&mut pending_scrape_responses,
&mut access_list_cache,
&mut socket,
&mut buffer,
&request_sender,
&mut local_responses,
pending_scrape_valid_until,
);
}
}
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
);
// Run periodic ValidUntil updates and state cleaning
if iter_counter % 256 == 0 {
let now = Instant::now();
pending_scrape_valid_until =
ValidUntil::new_with_now(now, config.cleaning.max_pending_scrape_age);
if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration {
pending_scrape_responses.clean();
last_pending_scrape_cleaning = now;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn create_socket(
config: &Config,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<::std::net::UdpSocket> {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?
};
if config.network.only_ipv6 {
socket
.set_only_v6(true)
.with_context(|| "socket: set only ipv6")?;
}
socket
.set_reuse_port(true)
.with_context(|| "socket: set reuse port")?;
socket
.set_nonblocking(true)
.with_context(|| "socket: set nonblocking")?;
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) {
::log::error!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
}
socket
.bind(&config.network.address.into())
.with_context(|| format!("socket: bind to {}", config.network.address))?;
priv_dropper.after_socket_creation()?;
Ok(socket.into())
}

View file

@ -0,0 +1,178 @@
use std::io::ErrorKind;
use std::sync::atomic::Ordering;
use mio::net::UdpSocket;
use aquatic_common::{access_list::AccessListCache, CanonicalSocketAddr, ValidUntil};
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
use super::storage::PendingScrapeResponseSlab;
pub fn read_requests(
config: &Config,
state: &State,
connection_validator: &mut ConnectionValidator,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
access_list_cache: &mut AccessListCache,
socket: &mut UdpSocket,
buffer: &mut [u8],
request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
pending_scrape_valid_until: ValidUntil,
) {
let mut requests_received_ipv4: usize = 0;
let mut requests_received_ipv6: usize = 0;
let mut bytes_received_ipv4: usize = 0;
let mut bytes_received_ipv6 = 0;
loop {
match socket.recv_from(&mut buffer[..]) {
Ok((amt, src)) => {
let res_request =
Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents);
let src = CanonicalSocketAddr::new(src);
// Update statistics for converted address
if src.is_ipv4() {
if res_request.is_ok() {
requests_received_ipv4 += 1;
}
bytes_received_ipv4 += amt;
} else {
if res_request.is_ok() {
requests_received_ipv6 += 1;
}
bytes_received_ipv6 += amt;
}
handle_request(
config,
connection_validator,
pending_scrape_responses,
access_list_cache,
request_sender,
local_responses,
pending_scrape_valid_until,
res_request,
src,
);
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break;
}
Err(err) => {
::log::warn!("recv_from error: {:#}", err);
}
}
}
if config.statistics.active() {
state
.statistics_ipv4
.requests_received
.fetch_add(requests_received_ipv4, Ordering::Release);
state
.statistics_ipv6
.requests_received
.fetch_add(requests_received_ipv6, Ordering::Release);
state
.statistics_ipv4
.bytes_received
.fetch_add(bytes_received_ipv4, Ordering::Release);
state
.statistics_ipv6
.bytes_received
.fetch_add(bytes_received_ipv6, Ordering::Release);
}
}
fn handle_request(
config: &Config,
connection_validator: &mut ConnectionValidator,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
access_list_cache: &mut AccessListCache,
request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
pending_scrape_valid_until: ValidUntil,
res_request: Result<Request, RequestParseError>,
src: CanonicalSocketAddr,
) {
let access_list_mode = config.access_list.mode;
match res_request {
Ok(Request::Connect(request)) => {
let connection_id = connection_validator.create_connection_id(src);
let response = Response::Connect(ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
});
local_responses.push((response, src))
}
Ok(Request::Announce(request)) => {
if connection_validator.connection_id_valid(src, request.connection_id) {
if access_list_cache
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let worker_index =
RequestWorkerIndex::from_info_hash(config, request.info_hash);
request_sender.try_send_to(
worker_index,
ConnectedRequest::Announce(request),
src,
);
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
local_responses.push((response, src))
}
}
}
Ok(Request::Scrape(request)) => {
if connection_validator.connection_id_valid(src, request.connection_id) {
let split_requests = pending_scrape_responses.prepare_split_requests(
config,
request,
pending_scrape_valid_until,
);
for (request_worker_index, request) in split_requests {
request_sender.try_send_to(
request_worker_index,
ConnectedRequest::Scrape(request),
src,
);
}
}
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if connection_validator.connection_id_valid(src, connection_id) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
};
local_responses.push((response.into(), src));
}
}
}
}
}

View file

@ -0,0 +1,104 @@
use std::io::Cursor;
use std::sync::atomic::Ordering;
use std::vec::Drain;
use crossbeam_channel::Receiver;
use mio::net::UdpSocket;
use aquatic_common::CanonicalSocketAddr;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
use super::storage::PendingScrapeResponseSlab;
pub fn send_responses(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
local_responses: Drain<(Response, CanonicalSocketAddr)>,
) {
for (response, addr) in local_responses {
send_response(state, config, socket, buffer, response, addr);
}
for (response, addr) in response_receiver.try_iter() {
let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses
.add_and_get_finished(r)
.map(Response::Scrape),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
if let Some(response) = opt_response {
send_response(state, config, socket, buffer, response, addr);
}
}
}
fn send_response(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response: Response,
addr: CanonicalSocketAddr,
) {
let mut cursor = Cursor::new(buffer);
let canonical_addr_is_ipv4 = addr.is_ipv4();
let addr = if config.network.address.is_ipv4() {
addr.get_ipv4()
.expect("found peer ipv6 address while running bound to ipv4 address")
} else {
addr.get_ipv6_mapped()
};
match response.write(&mut cursor) {
Ok(()) => {
let amt = cursor.position() as usize;
match socket.send_to(&cursor.get_ref()[..amt], addr) {
Ok(amt) if config.statistics.active() => {
let stats = if canonical_addr_is_ipv4 {
&state.statistics_ipv4
} else {
&state.statistics_ipv6
};
stats.bytes_sent.fetch_add(amt, Ordering::Relaxed);
match response {
Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
}
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
}
}
}
Ok(_) => {}
Err(err) => {
::log::warn!("send_to error: {:#}", err);
}
}
}
Err(err) => {
::log::error!("Response::write error: {:?}", err);
}
}
}

View file

@ -0,0 +1,221 @@
use std::collections::BTreeMap;
use std::time::Instant;
use hashbrown::HashMap;
use slab::Slab;
use aquatic_common::ValidUntil;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
#[derive(Debug)]
pub struct PendingScrapeResponseSlabEntry {
num_pending: usize,
valid_until: ValidUntil,
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
transaction_id: TransactionId,
}
#[derive(Default)]
pub struct PendingScrapeResponseSlab(Slab<PendingScrapeResponseSlabEntry>);
impl PendingScrapeResponseSlab {
pub fn prepare_split_requests(
&mut self,
config: &Config,
request: ScrapeRequest,
valid_until: ValidUntil,
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
let capacity = config.request_workers.min(request.info_hashes.len());
let mut split_requests: HashMap<RequestWorkerIndex, PendingScrapeRequest> =
HashMap::with_capacity(capacity);
if request.info_hashes.is_empty() {
::log::warn!(
"Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes"
);
return split_requests;
}
let vacant_entry = self.0.vacant_entry();
let slab_key = vacant_entry.key();
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
slab_key,
info_hashes: BTreeMap::new(),
});
split_request.info_hashes.insert(i, info_hash);
}
vacant_entry.insert(PendingScrapeResponseSlabEntry {
num_pending: split_requests.len(),
valid_until,
torrent_stats: Default::default(),
transaction_id: request.transaction_id,
});
split_requests
}
pub fn add_and_get_finished(
&mut self,
response: PendingScrapeResponse,
) -> Option<ScrapeResponse> {
let finished = if let Some(entry) = self.0.get_mut(response.slab_key) {
entry.num_pending -= 1;
entry
.torrent_stats
.extend(response.torrent_stats.into_iter());
entry.num_pending == 0
} else {
::log::warn!(
"PendingScrapeResponseSlab.add didn't find entry for key {:?}",
response.slab_key
);
false
};
if finished {
let entry = self.0.remove(response.slab_key);
Some(ScrapeResponse {
transaction_id: entry.transaction_id,
torrent_stats: entry.torrent_stats.into_values().collect(),
})
} else {
None
}
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|k, v| {
if v.valid_until.0 > now {
true
} else {
::log::warn!(
"Unconsumed PendingScrapeResponseSlab entry. {:?}: {:?}",
k,
v
);
false
}
});
self.0.shrink_to_fit();
}
}
#[cfg(test)]
mod tests {
use quickcheck::TestResult;
use quickcheck_macros::quickcheck;
use super::*;
#[quickcheck]
fn test_pending_scrape_response_slab(
request_data: Vec<(i32, i64, u8)>,
request_workers: u8,
) -> TestResult {
if request_workers == 0 {
return TestResult::discard();
}
let mut config = Config::default();
config.request_workers = request_workers as usize;
let valid_until = ValidUntil::new(1);
let mut map = PendingScrapeResponseSlab::default();
let mut requests = Vec::new();
for (t, c, b) in request_data {
if b == 0 {
return TestResult::discard();
}
let mut info_hashes = Vec::new();
for i in 0..b {
let info_hash = InfoHash([i; 20]);
info_hashes.push(info_hash);
}
let request = ScrapeRequest {
transaction_id: TransactionId(t),
connection_id: ConnectionId(c),
info_hashes,
};
requests.push(request);
}
let mut all_split_requests = Vec::new();
for request in requests.iter() {
let split_requests =
map.prepare_split_requests(&config, request.to_owned(), valid_until);
all_split_requests.push(
split_requests
.into_iter()
.collect::<Vec<(RequestWorkerIndex, PendingScrapeRequest)>>(),
);
}
assert_eq!(map.0.len(), requests.len());
let mut responses = Vec::new();
for split_requests in all_split_requests {
for (worker_index, split_request) in split_requests {
assert!(worker_index.0 < request_workers as usize);
let torrent_stats = split_request
.info_hashes
.into_iter()
.map(|(i, info_hash)| {
(
i,
TorrentScrapeStatistics {
seeders: NumberOfPeers((info_hash.0[0]) as i32),
leechers: NumberOfPeers(0),
completed: NumberOfDownloads(0),
},
)
})
.collect();
let response = PendingScrapeResponse {
slab_key: split_request.slab_key,
torrent_stats,
};
if let Some(response) = map.add_and_get_finished(response) {
responses.push(response);
}
}
}
assert!(map.0.is_empty());
assert_eq!(responses.len(), requests.len());
TestResult::from_bool(true)
}
}