Merge pull request #78 from greatest-ape/work-2022-07-03

udp: ignore requests with source port value of zero, improve resend buffer logic, bump MSRV to 1.62
This commit is contained in:
Joakim Frostegård 2022-07-04 09:15:08 +02:00 committed by GitHub
commit 38b3bc7217
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 282 additions and 298 deletions

View file

@ -12,7 +12,7 @@ env:
jobs:
build-test-linux:
runs-on: ubuntu-latest
timeout-minutes: 20
timeout-minutes: 25
steps:
- uses: actions/checkout@v2
- name: Install latest stable Rust

62
Cargo.lock generated
View file

@ -211,7 +211,7 @@ dependencies = [
"aquatic_udp_protocol",
"blake3",
"cfg-if",
"constant_time_eq 0.2.2",
"constant_time_eq 0.2.3",
"crossbeam-channel",
"getrandom",
"hashbrown 0.12.1",
@ -434,9 +434,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.9"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33d590cacd53140ff87cc2e192eb22fc3dc23c5b3f93b0d4f020677f98e8c629"
checksum = "c2cc6e8e8c993cb61a005fab8c1e5093a29199b7253b05a6883999312935c1ff"
dependencies = [
"async-trait",
"axum-core",
@ -676,9 +676,9 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "constant_time_eq"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e31aa570361918e61453e3b5377976b23e4599e8bb5b840380ecd3a20e691d2"
checksum = "8b96535642698e10693d204abac955eb269a05ea99f8363ef70ab54cfe439337"
[[package]]
name = "cpufeatures"
@ -821,9 +821,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
checksum = "5999502d32b9c48d492abe66392408144895020ec4709e549e840799f3bb74c0"
dependencies = [
"generic-array",
"typenum",
@ -891,9 +891,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.6.1"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
checksum = "3f107b87b6afc2a64fd13cac55fe06d6c8859f12d4b14cbcdd2c67d0976781be"
[[package]]
name = "enclose"
@ -1927,18 +1927,18 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.10"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ad3879ad3baf4e44784bc6a718a8698867bb991f8ce24d1bcbe2cfb4c3a75e"
checksum = "78203e83c48cffbe01e4a2d35d566ca4de445d79a85372fc64e378bfc812a260"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.10"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb"
checksum = "710faf75e1b33345361201d36d04e98ac1ed8909151a017ed384700836104c74"
dependencies = [
"proc-macro2",
"quote",
@ -2331,15 +2331,15 @@ dependencies = [
[[package]]
name = "semver"
version = "1.0.10"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a41d061efea015927ac527063765e73601444cdc344ba855bc7bd44578b25e1c"
checksum = "a2333e6df6d6598f2b1974829f853c2b4c5f4a6e503c10af918081aa6f8564e1"
[[package]]
name = "serde"
version = "1.0.137"
version = "1.0.138"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1"
checksum = "1578c6245786b9d168c5447eeacfb96856573ca56c9d68fdcf394be134882a47"
dependencies = [
"serde_derive",
]
@ -2375,9 +2375,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.137"
version = "1.0.138"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
checksum = "023e9b1467aef8a10fb88f25611870ada9800ef7e22afce356bb0d2387b6f27c"
dependencies = [
"proc-macro2",
"quote",
@ -2386,9 +2386,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.81"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c"
checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7"
dependencies = [
"itoa 1.0.2",
"ryu",
@ -2438,9 +2438,9 @@ dependencies = [
[[package]]
name = "simd-json"
version = "0.5.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3868f37d8473eb1410c71a8b88a5411778f72ae82bd72340b2355d1c133f2b6a"
checksum = "d67c573ee0994adb422b5f9ea7c55693ee943c39f4eb20aa76475a5b7039bb87"
dependencies = [
"halfbrown",
"serde",
@ -2482,9 +2482,9 @@ checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
[[package]]
name = "smallvec"
version = "1.8.1"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc88c725d61fc6c3132893370cac4a0200e3fedf5da8331c570664b1987f5ca2"
checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1"
[[package]]
name = "smartstring"
@ -2910,9 +2910,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.21"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c"
checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2"
dependencies = [
"proc-macro2",
"quote",
@ -2973,9 +2973,9 @@ checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c"
[[package]]
name = "unicode-normalization"
version = "0.1.20"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dee68f85cab8cf68dec42158baf3a79a1cdc065a8b103025965d6ccb7f6cbd"
checksum = "854cbdc4f7bc6ae19c820d44abdc3277ac3e1b2b93db20a636825d9322fb60e6"
dependencies = [
"tinyvec",
]
@ -3266,6 +3266,6 @@ checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "zeroize"
version = "1.5.5"
version = "1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94693807d016b2f2d2e14420eb3bfcca689311ff775dcf113d74ea624b7cdf07"
checksum = "20b578acffd8516a6c3f2a1bdefc1ec37e547bb4e0fb8b6b01a4cafc886b4442"

View file

@ -8,6 +8,7 @@ description = "High-performance open UDP BitTorrent tracker"
repository = "https://github.com/greatest-ape/aquatic"
keywords = ["udp", "server", "peer-to-peer", "torrent", "bittorrent"]
readme = "../README.md"
rust-version = "1.62"
[lib]
name = "aquatic_udp"

View file

@ -1,14 +1,10 @@
use std::collections::BTreeMap;
use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Instant;
use anyhow::Context;
use constant_time_eq::constant_time_eq;
use crossbeam_channel::{Sender, TrySendError};
use getrandom::getrandom;
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
@ -18,91 +14,6 @@ use crate::config::Config;
pub const BUFFER_SIZE: usize = 8192;
/// HMAC (BLAKE3) based ConnectionID creator and validator
///
/// Structure of created ConnectionID (bytes making up inner i64):
/// - &[0..4]: connection expiration time as number of seconds after
/// ConnectionValidator instance was created, encoded as u32 bytes.
/// Value fits around 136 years.
/// - &[4..8]: truncated keyed BLAKE3 hash of above 4 bytes and octets of
/// client IP address
///
/// The purpose of using ConnectionIDs is to prevent IP spoofing, mainly to
/// prevent the tracker from being used as an amplification vector for DDoS
/// attacks. By including 32 bits of BLAKE3 keyed hash output in its contents,
/// such abuse should be rendered impractical.
#[derive(Clone)]
pub struct ConnectionValidator {
start_time: Instant,
max_connection_age: u32,
keyed_hasher: blake3::Hasher,
}
impl ConnectionValidator {
/// Create new instance. Must be created once and cloned if used in several
/// threads.
pub fn new(config: &Config) -> anyhow::Result<Self> {
let mut key = [0; 32];
getrandom(&mut key)
.with_context(|| "Couldn't get random bytes for ConnectionValidator key")?;
let keyed_hasher = blake3::Hasher::new_keyed(&key);
Ok(Self {
keyed_hasher,
start_time: Instant::now(),
max_connection_age: config.cleaning.max_connection_age,
})
}
pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId {
let valid_until =
(self.start_time.elapsed().as_secs() as u32 + self.max_connection_age).to_ne_bytes();
let hash = self.hash(valid_until, source_addr.get().ip());
let mut connection_id_bytes = [0u8; 8];
(&mut connection_id_bytes[..4]).copy_from_slice(&valid_until);
(&mut connection_id_bytes[4..]).copy_from_slice(&hash);
ConnectionId(i64::from_ne_bytes(connection_id_bytes))
}
pub fn connection_id_valid(
&mut self,
source_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
) -> bool {
let bytes = connection_id.0.to_ne_bytes();
let (valid_until, hash) = bytes.split_at(4);
let valid_until: [u8; 4] = valid_until.try_into().unwrap();
if !constant_time_eq(hash, &self.hash(valid_until, source_addr.get().ip())) {
return false;
}
u32::from_ne_bytes(valid_until) > self.start_time.elapsed().as_secs() as u32
}
fn hash(&mut self, valid_until: [u8; 4], ip_addr: IpAddr) -> [u8; 4] {
self.keyed_hasher.update(&valid_until);
match ip_addr {
IpAddr::V4(ip) => self.keyed_hasher.update(&ip.octets()),
IpAddr::V6(ip) => self.keyed_hasher.update(&ip.octets()),
};
let mut hash = [0u8; 4];
self.keyed_hasher.finalize_xof().fill(&mut hash);
self.keyed_hasher.reset();
hash
}
}
#[derive(Debug)]
pub struct PendingScrapeRequest {
pub slab_key: usize,
@ -274,9 +185,7 @@ impl State {
#[cfg(test)]
mod tests {
use std::net::{Ipv6Addr, SocketAddr};
use quickcheck_macros::quickcheck;
use std::net::Ipv6Addr;
use crate::config::Config;
@ -334,42 +243,4 @@ mod tests {
assert!(buf.len() <= BUFFER_SIZE);
}
#[quickcheck]
fn test_connection_validator(
original_addr: IpAddr,
different_addr: IpAddr,
max_connection_age: u32,
) -> quickcheck::TestResult {
let original_addr = CanonicalSocketAddr::new(SocketAddr::new(original_addr, 0));
let different_addr = CanonicalSocketAddr::new(SocketAddr::new(different_addr, 0));
if original_addr == different_addr {
return quickcheck::TestResult::discard();
}
let mut validator = {
let mut config = Config::default();
config.cleaning.max_connection_age = max_connection_age;
ConnectionValidator::new(&config).unwrap()
};
let connection_id = validator.create_connection_id(original_addr);
let original_valid = validator.connection_id_valid(original_addr, connection_id);
let different_valid = validator.connection_id_valid(different_addr, connection_id);
if different_valid {
return quickcheck::TestResult::failed();
}
if max_connection_age == 0 {
quickcheck::TestResult::from_bool(!original_valid)
} else {
// Note: depends on that running this test takes less than a second
quickcheck::TestResult::from_bool(original_valid)
}
}
}

View file

@ -86,7 +86,7 @@ pub struct NetworkConfig {
pub socket_recv_buffer_size: usize,
pub poll_event_capacity: usize,
pub poll_timeout_ms: u64,
/// Store this many responses at most for retryin on send failure
/// Store this many responses at most for retrying (once) on send failure
///
/// Useful on operating systems that do not provide an udp send buffer,
/// such as FreeBSD. Setting the value to zero disables resending

View file

@ -17,10 +17,10 @@ use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::PanicSentinelWatcher;
use common::{
ConnectedRequestSender, ConnectedResponseSender, ConnectionValidator, RequestWorkerIndex,
SocketWorkerIndex, State,
ConnectedRequestSender, ConnectedResponseSender, RequestWorkerIndex, SocketWorkerIndex, State,
};
use config::Config;
use workers::socket::validator::ConnectionValidator;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");

View file

@ -1,6 +1,7 @@
mod requests;
mod responses;
mod storage;
pub mod validator;
use std::time::{Duration, Instant};
@ -22,8 +23,7 @@ use crate::config::Config;
use requests::read_requests;
use responses::send_responses;
use storage::PendingScrapeResponseSlab;
use self::responses::send_responses_with_resends;
use validator::ConnectionValidator;
pub fn run_socket_worker(
_sentinel: PanicSentinel,
@ -52,7 +52,7 @@ pub fn run_socket_worker(
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new();
let mut resend_buffer: Vec<(Response, CanonicalSocketAddr)> = Vec::new();
let mut opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new());
let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms);
@ -63,7 +63,6 @@ pub fn run_socket_worker(
let mut last_pending_scrape_cleaning = Instant::now();
let mut iter_counter = 0usize;
let response_resending_active = config.network.resend_buffer_max_len > 0;
loop {
poll.poll(&mut events, Some(poll_timeout))
@ -88,28 +87,16 @@ pub fn run_socket_worker(
}
}
if response_resending_active {
send_responses_with_resends(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
&mut resend_buffer,
);
} else {
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
);
}
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
&response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..),
&mut opt_resend_buffer,
);
// Run periodic ValidUntil updates and state cleaning
if iter_counter % 256 == 0 {

View file

@ -10,6 +10,7 @@ use crate::common::*;
use crate::config::Config;
use super::storage::PendingScrapeResponseSlab;
use super::validator::ConnectionValidator;
pub fn read_requests(
config: &Config,
@ -30,9 +31,15 @@ pub fn read_requests(
loop {
match socket.recv_from(&mut buffer[..]) {
Ok((amt, src)) => {
Ok((bytes_read, src)) => {
if src.port() == 0 {
::log::info!("Ignored request from {} because source port is zero", src);
continue;
}
let res_request =
Request::from_bytes(&buffer[..amt], config.protocol.max_scrape_torrents);
Request::from_bytes(&buffer[..bytes_read], config.protocol.max_scrape_torrents);
let src = CanonicalSocketAddr::new(src);
@ -41,12 +48,12 @@ pub fn read_requests(
if res_request.is_ok() {
requests_received_ipv4 += 1;
}
bytes_received_ipv4 += amt;
bytes_received_ipv4 += bytes_read;
} else {
if res_request.is_ok() {
requests_received_ipv6 += 1;
}
bytes_received_ipv6 += amt;
bytes_received_ipv6 += bytes_read;
}
handle_request(

View file

@ -22,9 +22,24 @@ pub fn send_responses(
response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
local_responses: Drain<(Response, CanonicalSocketAddr)>,
opt_resend_buffer: &mut Option<Vec<(Response, CanonicalSocketAddr)>>,
) {
if let Some(resend_buffer) = opt_resend_buffer {
for (response, addr) in resend_buffer.drain(..) {
send_response(state, config, socket, buffer, response, addr, &mut None);
}
}
for (response, addr) in local_responses {
let _ = send_response(state, config, socket, buffer, &response, addr);
send_response(
state,
config,
socket,
buffer,
response,
addr,
opt_resend_buffer,
);
}
for (response, addr) in response_receiver.try_iter() {
@ -37,132 +52,92 @@ pub fn send_responses(
};
if let Some(response) = opt_response {
let _ = send_response(state, config, socket, buffer, &response, addr);
send_response(
state,
config,
socket,
buffer,
response,
addr,
opt_resend_buffer,
);
}
}
}
pub fn send_responses_with_resends(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
local_responses: Drain<(Response, CanonicalSocketAddr)>,
resend_buffer: &mut Vec<(Response, CanonicalSocketAddr)>,
) {
let resend_buffer_max_len = config.network.resend_buffer_max_len;
for (response, addr) in resend_buffer.drain(..) {
let _ = send_response(state, config, socket, buffer, &response, addr);
}
for (response, addr) in local_responses {
match send_response(state, config, socket, buffer, &response, addr) {
Err(err) if error_should_cause_resend(&err) => {
if resend_buffer.len() < resend_buffer_max_len {
resend_buffer.push((response, addr));
} else {
::log::warn!("response resend buffer full, dropping response");
}
}
_ => (),
}
}
for (response, addr) in response_receiver.try_iter() {
let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses
.add_and_get_finished(r)
.map(Response::Scrape),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
if let Some(response) = opt_response {
match send_response(state, config, socket, buffer, &response, addr) {
Err(err) if error_should_cause_resend(&err) => {
if resend_buffer.len() < resend_buffer_max_len {
resend_buffer.push((response, addr));
} else {
::log::warn!("response resend buffer full, dropping response");
}
}
_ => (),
}
}
}
}
fn error_should_cause_resend(err: &::std::io::Error) -> bool {
(err.raw_os_error() == Some(ENOBUFS)) | (err.kind() == ErrorKind::WouldBlock)
}
fn send_response(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response: &Response,
addr: CanonicalSocketAddr,
) -> std::io::Result<()> {
response: Response,
canonical_addr: CanonicalSocketAddr,
resend_buffer: &mut Option<Vec<(Response, CanonicalSocketAddr)>>,
) {
let mut cursor = Cursor::new(buffer);
let canonical_addr_is_ipv4 = addr.is_ipv4();
if let Err(err) = response.write(&mut cursor) {
::log::error!("Converting response to bytes failed: {:#}", err);
return;
}
let bytes_written = cursor.position() as usize;
let addr = if config.network.address.is_ipv4() {
addr.get_ipv4()
canonical_addr
.get_ipv4()
.expect("found peer ipv6 address while running bound to ipv4 address")
} else {
addr.get_ipv6_mapped()
canonical_addr.get_ipv6_mapped()
};
match response.write(&mut cursor) {
Ok(()) => {
let amt = cursor.position() as usize;
match socket.send_to(&cursor.get_ref()[..bytes_written], addr) {
Ok(amt) if config.statistics.active() => {
let stats = if canonical_addr.is_ipv4() {
&state.statistics_ipv4
} else {
&state.statistics_ipv6
};
match socket.send_to(&cursor.get_ref()[..amt], addr) {
Ok(amt) if config.statistics.active() => {
let stats = if canonical_addr_is_ipv4 {
&state.statistics_ipv4
} else {
&state.statistics_ipv6
};
stats.bytes_sent.fetch_add(amt, Ordering::Relaxed);
stats.bytes_sent.fetch_add(amt, Ordering::Relaxed);
match response {
Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
}
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
}
}
Ok(())
match response {
Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
}
Ok(_) => Ok(()),
Err(err) => {
::log::warn!("Sending response to {} failed: {:#}", addr, err);
Err(err)
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
}
}
}
Ok(_) => (),
Err(err) => {
::log::error!("Converting response to bytes failed: {:#}", err);
match resend_buffer {
Some(resend_buffer)
if (err.raw_os_error() == Some(ENOBUFS))
|| (err.kind() == ErrorKind::WouldBlock) =>
{
if resend_buffer.len() < config.network.resend_buffer_max_len {
::log::info!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err);
Err(err)
resend_buffer.push((response, canonical_addr));
} else {
::log::warn!("Response resend buffer full, dropping response");
}
}
_ => {
::log::warn!("Sending response to {} failed: {:#}", addr, err);
}
}
}
}
}

View file

@ -0,0 +1,143 @@
use std::net::IpAddr;
use std::time::Instant;
use anyhow::Context;
use constant_time_eq::constant_time_eq;
use getrandom::getrandom;
use aquatic_common::CanonicalSocketAddr;
use aquatic_udp_protocol::ConnectionId;
use crate::config::Config;
/// HMAC (BLAKE3) based ConnectionID creator and validator
///
/// Structure of created ConnectionID (bytes making up inner i64):
/// - &[0..4]: connection expiration time as number of seconds after
/// ConnectionValidator instance was created, encoded as u32 bytes.
/// Value fits around 136 years.
/// - &[4..8]: truncated keyed BLAKE3 hash of above 4 bytes and octets of
/// client IP address
///
/// The purpose of using ConnectionIDs is to prevent IP spoofing, mainly to
/// prevent the tracker from being used as an amplification vector for DDoS
/// attacks. By including 32 bits of BLAKE3 keyed hash output in its contents,
/// such abuse should be rendered impractical.
#[derive(Clone)]
pub struct ConnectionValidator {
start_time: Instant,
max_connection_age: u32,
keyed_hasher: blake3::Hasher,
}
impl ConnectionValidator {
/// Create new instance. Must be created once and cloned if used in several
/// threads.
pub fn new(config: &Config) -> anyhow::Result<Self> {
let mut key = [0; 32];
getrandom(&mut key)
.with_context(|| "Couldn't get random bytes for ConnectionValidator key")?;
let keyed_hasher = blake3::Hasher::new_keyed(&key);
Ok(Self {
keyed_hasher,
start_time: Instant::now(),
max_connection_age: config.cleaning.max_connection_age,
})
}
pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId {
let valid_until =
(self.start_time.elapsed().as_secs() as u32 + self.max_connection_age).to_ne_bytes();
let hash = self.hash(valid_until, source_addr.get().ip());
let mut connection_id_bytes = [0u8; 8];
(&mut connection_id_bytes[..4]).copy_from_slice(&valid_until);
(&mut connection_id_bytes[4..]).copy_from_slice(&hash);
ConnectionId(i64::from_ne_bytes(connection_id_bytes))
}
pub fn connection_id_valid(
&mut self,
source_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
) -> bool {
let bytes = connection_id.0.to_ne_bytes();
let (valid_until, hash) = bytes.split_at(4);
let valid_until: [u8; 4] = valid_until.try_into().unwrap();
if !constant_time_eq(hash, &self.hash(valid_until, source_addr.get().ip())) {
return false;
}
u32::from_ne_bytes(valid_until) > self.start_time.elapsed().as_secs() as u32
}
fn hash(&mut self, valid_until: [u8; 4], ip_addr: IpAddr) -> [u8; 4] {
self.keyed_hasher.update(&valid_until);
match ip_addr {
IpAddr::V4(ip) => self.keyed_hasher.update(&ip.octets()),
IpAddr::V6(ip) => self.keyed_hasher.update(&ip.octets()),
};
let mut hash = [0u8; 4];
self.keyed_hasher.finalize_xof().fill(&mut hash);
self.keyed_hasher.reset();
hash
}
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use quickcheck_macros::quickcheck;
use super::*;
#[quickcheck]
fn test_connection_validator(
original_addr: IpAddr,
different_addr: IpAddr,
max_connection_age: u32,
) -> quickcheck::TestResult {
let original_addr = CanonicalSocketAddr::new(SocketAddr::new(original_addr, 0));
let different_addr = CanonicalSocketAddr::new(SocketAddr::new(different_addr, 0));
if original_addr == different_addr {
return quickcheck::TestResult::discard();
}
let mut validator = {
let mut config = Config::default();
config.cleaning.max_connection_age = max_connection_age;
ConnectionValidator::new(&config).unwrap()
};
let connection_id = validator.create_connection_id(original_addr);
let original_valid = validator.connection_id_valid(original_addr, connection_id);
let different_valid = validator.connection_id_valid(different_addr, connection_id);
if different_valid {
return quickcheck::TestResult::failed();
}
if max_connection_age == 0 {
quickcheck::TestResult::from_bool(!original_valid)
} else {
// Note: depends on that running this test takes less than a second
quickcheck::TestResult::from_bool(original_valid)
}
}
}