Merge pull request #10 from greatest-ape/glommio2

add experimental aquatic_udp glommio implementation; improve AccessList code
This commit is contained in:
Joakim Frostegård 2021-10-21 16:11:05 +02:00 committed by GitHub
commit 2f07ba9898
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 1073 additions and 293 deletions

313
Cargo.lock generated
View file

@ -109,7 +109,7 @@ dependencies = [
"rand",
"serde",
"smartstring",
"socket2",
"socket2 0.4.2",
]
[[package]]
@ -159,8 +159,12 @@ dependencies = [
"aquatic_cli_helpers",
"aquatic_common",
"aquatic_udp_protocol",
"cfg-if",
"crossbeam-channel",
"futures-lite",
"glommio",
"hashbrown 0.11.2",
"hex",
"histogram",
"indexmap",
"log",
@ -172,7 +176,7 @@ dependencies = [
"quickcheck_macros",
"rand",
"serde",
"socket2",
"socket2 0.4.2",
]
[[package]]
@ -208,7 +212,7 @@ dependencies = [
"rand",
"rand_distr",
"serde",
"socket2",
"socket2 0.4.2",
]
[[package]]
@ -244,7 +248,7 @@ dependencies = [
"quickcheck_macros",
"rand",
"serde",
"socket2",
"socket2 0.4.2",
"tungstenite",
]
@ -306,7 +310,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -353,6 +357,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitmaps"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "303cec55cd9c5fde944b061b902f142b52a8bb5438cc822481ea1e3ebc96bbcb"
[[package]]
name = "block-buffer"
version = "0.9.0"
@ -374,6 +384,12 @@ dependencies = [
"serde",
]
[[package]]
name = "buddy-alloc"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5"
[[package]]
name = "bumpalo"
version = "3.7.1"
@ -392,6 +408,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "cast"
version = "0.2.7"
@ -423,7 +445,7 @@ dependencies = [
"num-integer",
"num-traits",
"time",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -437,6 +459,15 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "console"
version = "0.15.0"
@ -447,7 +478,7 @@ dependencies = [
"libc",
"once_cell",
"terminal_size",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -511,6 +542,20 @@ dependencies = [
"itertools",
]
[[package]]
name = "crossbeam"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
@ -541,10 +586,20 @@ dependencies = [
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"memoffset 0.6.4",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
@ -592,6 +647,12 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "enclose"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1056f553da426e9c025a662efa48b52e62e0a3a7648aa2d15aeaaf7f0d329357"
[[package]]
name = "encode_unicode"
version = "0.3.6"
@ -630,6 +691,15 @@ dependencies = [
"synstructure",
]
[[package]]
name = "fastrand"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b394ed3d285a429378d3b384b9eb1285267e7df4b166df24b7a6939a04dc392e"
dependencies = [
"instant",
]
[[package]]
name = "float-cmp"
version = "0.8.0"
@ -670,6 +740,33 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures-core"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]]
name = "futures-io"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "generic-array"
version = "0.14.4"
@ -697,6 +794,37 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7"
[[package]]
name = "glommio"
version = "0.6.0"
source = "git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5#4e6b14772da2f4325271fbcf12d24cf91ed466e5"
dependencies = [
"ahash 0.7.6",
"bitflags",
"bitmaps",
"buddy-alloc",
"cc",
"concurrent-queue",
"crossbeam",
"enclose",
"futures-lite",
"intrusive-collections",
"lazy_static",
"libc",
"lockfree",
"log",
"membarrier",
"nix",
"pin-project-lite",
"rlimit",
"scoped-tls",
"scopeguard",
"smallvec",
"socket2 0.3.19",
"tracing",
"typenum",
]
[[package]]
name = "half"
version = "1.8.0"
@ -813,6 +941,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "intrusive-collections"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb4ed164b4cf1c6bd6e18c097490331a0e58fbb0f39e8f6b5ac7f168006511cd"
dependencies = [
"memoffset 0.5.6",
]
[[package]]
name = "itertools"
version = "0.10.1"
@ -837,6 +974,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -873,6 +1020,15 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "lockfree"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23"
dependencies = [
"owned-alloc",
]
[[package]]
name = "log"
version = "0.4.14"
@ -888,12 +1044,33 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "membarrier"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925b0811d7e5fb2b666b5906c5047b7ec23aab78edc4d51b7b0f82dc5c955b1c"
dependencies = [
"cfg-if",
"kernel32-sys",
"lazy_static",
"libc",
]
[[package]]
name = "memchr"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memoffset"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa"
dependencies = [
"autocfg",
]
[[package]]
name = "memoffset"
version = "0.6.4"
@ -932,7 +1109,7 @@ dependencies = [
"log",
"miow",
"ntapi",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -941,7 +1118,7 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -972,7 +1149,7 @@ dependencies = [
"cc",
"cfg-if",
"libc",
"memoffset",
"memoffset 0.6.4",
]
[[package]]
@ -987,7 +1164,7 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -1096,6 +1273,18 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "owned-alloc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -1118,7 +1307,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -1127,6 +1316,12 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project-lite"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443"
[[package]]
name = "pkg-config"
version = "0.3.20"
@ -1330,7 +1525,16 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]
name = "rlimit"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc0bf25554376fd362f54332b8410a625c71f15445bca32ffdfdf4ec9ac91726"
dependencies = [
"libc",
]
[[package]]
@ -1370,9 +1574,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
dependencies = [
"lazy_static",
"winapi",
"winapi 0.3.9",
]
[[package]]
name = "scoped-tls"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -1532,6 +1742,17 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "socket2"
version = "0.4.2"
@ -1539,7 +1760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
dependencies = [
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -1582,7 +1803,7 @@ dependencies = [
"rand",
"redox_syscall",
"remove_dir_all",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -1601,7 +1822,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df"
dependencies = [
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -1640,7 +1861,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi",
"winapi 0.3.9",
]
[[package]]
@ -1677,6 +1898,38 @@ dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
[[package]]
name = "tungstenite"
version = "0.15.0"
@ -1777,6 +2030,12 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.2"
@ -1784,7 +2043,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi 0.3.9",
"winapi-util",
]
@ -1858,6 +2117,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]]
name = "winapi"
version = "0.3.9"
@ -1868,6 +2133,12 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
@ -1880,7 +2151,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
"winapi 0.3.9",
]
[[package]]

View file

@ -1,5 +1,11 @@
# TODO
* aquatic_udp glommio
* update access lists
* clean connections
* update peer valid until
* privdrop
* access lists:
* use arc-swap Cache
* add CI tests

View file

@ -18,6 +18,12 @@ pub enum AccessListMode {
Off,
}
impl AccessListMode {
pub fn is_on(&self) -> bool {
!matches!(self, Self::Off)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AccessListConfig {
pub mode: AccessListMode,
@ -36,54 +42,71 @@ impl Default for AccessListConfig {
}
}
pub struct AccessList(ArcSwap<HashSet<[u8; 20]>>);
#[derive(Default)]
pub struct AccessList(HashSet<[u8; 20]>);
impl Default for AccessList {
fn default() -> Self {
Self(ArcSwap::from(Arc::new(HashSet::default())))
impl AccessList {
pub fn insert_from_line(&mut self, line: &str) -> anyhow::Result<()> {
self.0.insert(parse_info_hash(line)?);
Ok(())
}
pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool {
match mode {
AccessListMode::White => self.0.contains(info_hash),
AccessListMode::Black => !self.0.contains(info_hash),
AccessListMode::Off => true,
}
}
}
impl AccessList {
fn parse_info_hash(line: String) -> anyhow::Result<[u8; 20]> {
let mut bytes = [0u8; 20];
pub trait AccessListQuery {
fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>;
fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool;
}
hex::decode_to_slice(line, &mut bytes)?;
pub type AccessListArcSwap = ArcSwap<AccessList>;
Ok(bytes)
}
pub fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> {
impl AccessListQuery for AccessListArcSwap {
fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut new_list = HashSet::new();
for line in reader.lines() {
new_list.insert(Self::parse_info_hash(line?)?);
new_list.insert(parse_info_hash(&line?)?);
}
self.0.store(Arc::new(new_list));
self.store(Arc::new(AccessList(new_list)));
Ok(())
}
pub fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool {
match list_mode {
AccessListMode::White => self.0.load().contains(info_hash_bytes),
AccessListMode::Black => !self.0.load().contains(info_hash_bytes),
fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool {
match mode {
AccessListMode::White => self.load().0.contains(info_hash_bytes),
AccessListMode::Black => !self.load().0.contains(info_hash_bytes),
AccessListMode::Off => true,
}
}
}
fn parse_info_hash(line: &str) -> anyhow::Result<[u8; 20]> {
let mut bytes = [0u8; 20];
hex::decode_to_slice(line, &mut bytes)?;
Ok(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_info_hash() {
let f = AccessList::parse_info_hash;
let f = parse_info_hash;
assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeee".into()).is_ok());
assert!(f("aaaabbbbccccddddeeeeaaaabbbbccccddddeeeef".into()).is_err());

View file

@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::AccessList;
use aquatic_common::access_list::{AccessList, AccessListArcSwap};
use crossbeam_channel::{Receiver, Sender};
use either::Either;
use hashbrown::HashMap;
@ -165,7 +165,7 @@ impl TorrentMaps {
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessList>,
pub access_list: Arc<AccessListArcSwap>,
pub torrent_maps: Arc<Mutex<TorrentMaps>>,
}

View file

@ -31,7 +31,10 @@ pub fn run(config: Config) -> anyhow::Result<()> {
tasks::update_access_list(&config, &state);
state.torrent_maps.lock().clean(&config, &state.access_list);
state
.torrent_maps
.lock()
.clean(&config, &state.access_list.load_full());
}
}

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec::Drain;
use aquatic_common::access_list::AccessListQuery;
use aquatic_http_protocol::request::Request;
use hashbrown::HashMap;
use log::{debug, error, info};

View file

@ -1,6 +1,6 @@
use histogram::Histogram;
use aquatic_common::access_list::AccessListMode;
use aquatic_common::access_list::{AccessListMode, AccessListQuery};
use crate::{common::*, config::Config};

View file

@ -14,13 +14,18 @@ path = "src/lib/lib.rs"
[[bin]]
name = "aquatic_udp"
[features]
with-glommio = ["glommio", "futures-lite"]
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_udp_protocol = "0.1.0"
cfg-if = "1"
crossbeam-channel = "0.5"
hashbrown = "0.11.2"
hex = "0.4"
histogram = "0.6"
indexmap = "1"
log = "0.4"
@ -32,6 +37,9 @@ rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
socket2 = { version = "0.4.1", features = ["all"] }
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
futures-lite = { version = "1", optional = true }
[dev-dependencies]
quickcheck = "1.0"
quickcheck_macros = "1.0"

View file

@ -1,52 +1,10 @@
use std::net::{IpAddr, SocketAddr};
use std::vec::Drain;
use parking_lot::MutexGuard;
use rand::rngs::SmallRng;
use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers};
use aquatic_udp_protocol::*;
use aquatic_common::extract_response_peers;
use crate::common::*;
use crate::config::Config;
#[inline]
pub fn handle_announce_requests(
config: &Config,
torrents: &mut MutexGuard<TorrentMaps>,
rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request, src)| {
let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
let response = match peer_ip {
IpAddr::V4(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
),
};
(ConnectedResponse::Announce(response), src)
}));
}
fn handle_announce_request<I: Ip>(
pub fn handle_announce_request<I: Ip>(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMap<I>,

View file

@ -1,17 +1,19 @@
use std::borrow::Borrow;
use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant;
use hashbrown::HashMap;
use indexmap::IndexMap;
use parking_lot::Mutex;
pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*;
use crate::config::Config;
pub mod announce;
pub mod network;
pub const MAX_PACKET_SIZE: usize = 4096;
pub trait Ip: Hash + PartialEq + Eq + Clone + Copy {
@ -30,25 +32,6 @@ impl Ip for Ipv6Addr {
}
}
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
}
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
}
}
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
@ -124,19 +107,19 @@ pub struct TorrentMaps {
impl TorrentMaps {
/// Remove disallowed and inactive torrents
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessList>) {
pub fn clean<T: Borrow<AccessList>>(&mut self, config: &Config, access_list: T) {
let now = Instant::now();
let access_list_mode = config.access_list.mode;
self.ipv4.retain(|info_hash, torrent| {
access_list.allows(access_list_mode, &info_hash.0)
access_list.borrow().allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv4.shrink_to_fit();
self.ipv6.retain(|info_hash, torrent| {
access_list.allows(access_list_mode, &info_hash.0)
access_list.borrow().allows(access_list_mode, &info_hash.0)
&& Self::clean_torrent_and_peers(now, torrent)
});
self.ipv6.shrink_to_fit();
@ -170,31 +153,6 @@ impl TorrentMaps {
}
}
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessList>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessList::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}
#[cfg(test)]
mod tests {
#[test]

View file

@ -0,0 +1,30 @@
use std::{net::SocketAddr, time::Instant};
pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*;
use hashbrown::HashMap;
#[derive(Default)]
pub struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>);
impl ConnectionMap {
pub fn insert(
&mut self,
connection_id: ConnectionId,
socket_addr: SocketAddr,
valid_until: ValidUntil,
) {
self.0.insert((connection_id, socket_addr), valid_until);
}
pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool {
self.0.contains_key(&(connection_id, socket_addr))
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0 > now);
self.0.shrink_to_fit();
}
}

View file

@ -0,0 +1,32 @@
use std::cell::RefCell;
use std::rc::Rc;
use futures_lite::AsyncBufReadExt;
use glommio::io::{BufferedFile, StreamReaderBuilder};
use glommio::prelude::*;
use crate::common::*;
use crate::config::Config;
pub async fn update_access_list(config: Config, access_list: Rc<RefCell<AccessList>>) {
if config.access_list.mode.is_on() {
let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap();
let mut reader = StreamReaderBuilder::new(access_list_file).build();
loop {
let mut buf = String::with_capacity(42);
match reader.read_line(&mut buf).await {
Ok(_) => {
access_list.borrow_mut().insert_from_line(&buf);
}
Err(err) => {
break;
}
}
yield_if_needed().await;
}
}
}

View file

@ -0,0 +1,102 @@
use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::time::Duration;
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use rand::prelude::SmallRng;
use rand::SeedableRng;
use crate::common::announce::handle_announce_request;
use crate::common::*;
use crate::config::Config;
use crate::glommio::common::update_access_list;
pub async fn run_request_worker(
config: Config,
request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
) {
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap();
let response_senders = Rc::new(response_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let access_list = Rc::new(RefCell::new(AccessList::default()));
// Periodically clean torrents and update access list
TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || {
enclose!((config, torrents, access_list) move || async move {
update_access_list(config.clone(), access_list.clone()).await;
torrents.borrow_mut().clean(&config, &*access_list.borrow());
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
let mut handles = Vec::new();
for (_, receiver) in request_receivers.streams() {
let handle = spawn_local(handle_request_stream(
config.clone(),
torrents.clone(),
response_senders.clone(),
receiver,
))
.detach();
handles.push(handle);
}
for handle in handles {
handle.await;
}
}
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
response_senders: Rc<Senders<(AnnounceResponse, SocketAddr)>>,
mut stream: S,
) where
S: Stream<Item = (usize, AnnounceRequest, SocketAddr)> + ::std::marker::Unpin,
{
let mut rng = SmallRng::from_entropy();
// Needs to be updated periodically: use timer?
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
while let Some((producer_index, request, addr)) = stream.next().await {
let response = match addr.ip() {
IpAddr::V4(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut().ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut().ipv6,
request,
ip,
peer_valid_until,
),
};
::log::debug!("preparing to send response to channel: {:?}", response);
if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) {
::log::warn!("response_sender.try_send: {:?}", err);
}
yield_if_needed().await;
}
}

View file

@ -0,0 +1,68 @@
//! Work-in-progress glommio (io_uring) implementation
//!
//! * Doesn't support scrape requests
//! * Currently not faster than mio implementation
use std::sync::{atomic::AtomicUsize, Arc};
use glommio::channels::channel_mesh::MeshBuilder;
use glommio::prelude::*;
use crate::config::Config;
mod common;
pub mod handlers;
pub mod network;
pub const SHARED_CHANNEL_SIZE: usize = 4096;
pub fn run(config: Config) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let mut executors = Vec::new();
for _ in 0..(config.socket_workers) {
let config = config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let executor = LocalExecutorBuilder::default().spawn(|| async move {
network::run_socket_worker(
config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
)
.await
});
executors.push(executor);
}
for _ in 0..(config.request_workers) {
let config = config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let executor = LocalExecutorBuilder::default().spawn(|| async move {
handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await
});
executors.push(executor);
}
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}

View file

@ -0,0 +1,207 @@
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_unbounded, LocalSender};
use glommio::net::UdpSocket;
use glommio::prelude::*;
use rand::prelude::{Rng, SeedableRng, StdRng};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;
pub async fn run_socket_worker(
config: Config,
request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
) {
let (local_sender, local_receiver) = new_unbounded();
let mut socket = UdpSocket::bind(config.network.address).unwrap();
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
socket.set_buffer_size(recv_buffer_size);
}
let socket = Rc::new(socket);
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap();
spawn_local(read_requests(
config.clone(),
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
))
.detach();
for (_, receiver) in response_receivers.streams().into_iter() {
spawn_local(send_responses(
socket.clone(),
receiver.map(|(response, addr)| (response.into(), addr)),
))
.detach();
}
send_responses(socket, local_receiver.stream()).await;
}
async fn read_requests(
config: Config,
request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>,
response_consumer_index: usize,
local_sender: LocalSender<(Response, SocketAddr)>,
socket: Rc<UdpSocket>,
) {
let mut rng = StdRng::from_entropy();
let access_list_mode = config.access_list.mode;
// Needs to be updated periodically: use timer?
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
// Needs to be updated periodically: use timer?
let access_list = AccessList::default();
// Needs to be cleaned periodically: use timer?
let mut connections = ConnectionMap::default();
let mut buf = [0u8; 2048];
loop {
match socket.recv_from(&mut buf).await {
Ok((amt, src)) => {
let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents);
::log::debug!("read request: {:?}", request);
match request {
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.insert(connection_id, src, valid_until);
let response = Response::Connect(ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
});
local_sender.try_send((response, src)).unwrap();
}
Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) {
if access_list.allows(access_list_mode, &request.info_hash.0) {
let request_consumer_index =
(request.info_hash.0[0] as usize) % config.request_workers;
if let Err(err) = request_senders.try_send_to(
request_consumer_index,
(response_consumer_index, request, src),
) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
local_sender.try_send((response, src)).unwrap();
}
}
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Scrape requests not supported".into(),
});
local_sender.try_send((response, src)).unwrap();
}
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if connections.contains(connection_id, src) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
};
local_sender.try_send((response.into(), src)).unwrap();
}
}
}
}
}
Err(err) => {
::log::error!("recv_from: {:?}", err);
}
}
yield_if_needed().await;
}
}
async fn send_responses<S>(socket: Rc<UdpSocket>, mut stream: S)
where
S: Stream<Item = (Response, SocketAddr)> + ::std::marker::Unpin,
{
let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]);
while let Some((response, src)) = stream.next().await {
buf.set_position(0);
::log::debug!("preparing to send response: {:?}", response.clone());
response
.write(&mut buf, ip_version_from_ip(src.ip()))
.expect("write response");
let position = buf.position() as usize;
if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await {
::log::info!("send_to failed: {:?}", err);
}
yield_if_needed().await;
}
}
fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,
IpAddr::V6(ip) => {
if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() {
IpVersion::IPv4
} else {
IpVersion::IPv6
}
}
}
}

View file

@ -1,121 +1,21 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread::Builder;
use std::time::Duration;
use anyhow::Context;
use crossbeam_channel::unbounded;
use privdrop::PrivDrop;
use cfg_if::cfg_if;
pub mod common;
pub mod config;
pub mod handlers;
pub mod network;
pub mod tasks;
#[cfg(all(feature = "with-glommio", target_os = "linux"))]
pub mod glommio;
pub mod mio;
use common::State;
use config::Config;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::default();
tasks::update_access_list(&config, &state.access_list);
let num_bound_sockets = start_workers(config.clone(), state.clone())?;
if config.privileges.drop_privileges {
let mut counter = 0usize;
loop {
let sockets = num_bound_sockets.load(Ordering::SeqCst);
if sockets == config.socket_workers {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
}
cfg_if! {
if #[cfg(all(feature = "with-glommio", target_os = "linux"))] {
glommio::run(config)
} else {
mio::run(config)
}
}
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::update_access_list(&config, &state.access_list);
state.torrents.lock().clean(&config, &state.access_list);
}
}
pub fn start_workers(config: Config, state: State) -> ::anyhow::Result<Arc<AtomicUsize>> {
let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();
for i in 0..config.request_workers {
let state = state.clone();
let config = config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
handlers::run_request_worker(state, config, request_receiver, response_sender)
})
.with_context(|| "spawn request worker")?;
}
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();
let request_sender = request_sender.clone();
let response_receiver = response_receiver.clone();
let num_bound_sockets = num_bound_sockets.clone();
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
network::run_socket_worker(
state,
config,
i,
request_sender,
response_receiver,
num_bound_sockets,
)
})
.with_context(|| "spawn socket worker")?;
}
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
Builder::new()
.name("statistics-collector".to_string())
.spawn(move || loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
tasks::gather_and_print_statistics(&state, &config);
})
.with_context(|| "spawn statistics worker")?;
}
Ok(num_bound_sockets)
}

View file

@ -0,0 +1,49 @@
use aquatic_common::access_list::AccessListArcSwap;
use parking_lot::Mutex;
use std::sync::{atomic::AtomicUsize, Arc};
use crate::common::*;
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
}
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
}
}
}
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}

View file

@ -0,0 +1,49 @@
use std::net::{IpAddr, SocketAddr};
use std::vec::Drain;
use parking_lot::MutexGuard;
use rand::rngs::SmallRng;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*;
use crate::common::announce::handle_announce_request;
use crate::common::*;
use crate::config::Config;
use crate::mio::common::*;
#[inline]
pub fn handle_announce_requests(
config: &Config,
torrents: &mut MutexGuard<TorrentMaps>,
rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request, src)| {
let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
let response = match peer_ip {
IpAddr::V4(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
),
};
(ConnectedResponse::Announce(response), src)
}));
}

View file

@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng};
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
use crate::mio::common::*;
mod announce;
mod scrape;

View file

@ -6,6 +6,8 @@ use parking_lot::MutexGuard;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*;
use crate::mio::common::*;
use crate::common::*;
#[inline]

View file

@ -0,0 +1,142 @@
use std::thread::Builder;
use std::time::Duration;
use std::{
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use anyhow::Context;
use crossbeam_channel::unbounded;
use privdrop::PrivDrop;
pub mod common;
pub mod handlers;
pub mod network;
pub mod tasks;
use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery};
use crate::config::Config;
use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::default();
update_access_list(&config, &state.access_list);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?;
if config.privileges.drop_privileges {
let mut counter = 0usize;
loop {
let sockets = num_bound_sockets.load(Ordering::SeqCst);
if sockets == config.socket_workers {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
}
}
}
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
update_access_list(&config, &state.access_list);
state
.torrents
.lock()
.clean(&config, state.access_list.load_full().deref());
}
}
pub fn start_workers(
config: Config,
state: State,
num_bound_sockets: Arc<AtomicUsize>,
) -> ::anyhow::Result<()> {
let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();
for i in 0..config.request_workers {
let state = state.clone();
let config = config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
handlers::run_request_worker(state, config, request_receiver, response_sender)
})
.with_context(|| "spawn request worker")?;
}
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();
let request_sender = request_sender.clone();
let response_receiver = response_receiver.clone();
let num_bound_sockets = num_bound_sockets.clone();
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
network::run_socket_worker(
state,
config,
i,
request_sender,
response_receiver,
num_bound_sockets,
)
})
.with_context(|| "spawn socket worker")?;
}
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
Builder::new()
.name("statistics-collector".to_string())
.spawn(move || loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
tasks::gather_and_print_statistics(&state, &config);
})
.with_context(|| "spawn statistics worker")?;
}
Ok(())
}
pub fn update_access_list(config: &Config, access_list: &Arc<AccessListArcSwap>) {
match config.access_list.mode {
AccessListMode::White | AccessListMode::Black => {
if let Err(err) = access_list.update_from_path(&config.access_list.path) {
::log::error!("Update access list from path: {:?}", err);
}
}
AccessListMode::Off => {}
}
}

View file

@ -4,11 +4,11 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use std::time::Duration;
use std::vec::Drain;
use aquatic_common::access_list::AccessListQuery;
use crossbeam_channel::{Receiver, Sender};
use hashbrown::HashMap;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
use rand::prelude::{Rng, SeedableRng, StdRng};
@ -16,33 +16,11 @@ use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;
#[derive(Default)]
struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>);
impl ConnectionMap {
fn insert(
&mut self,
connection_id: ConnectionId,
socket_addr: SocketAddr,
valid_until: ValidUntil,
) {
self.0.insert((connection_id, socket_addr), valid_until);
}
fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool {
self.0.contains_key(&(connection_id, socket_addr))
}
fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0 > now);
self.0.shrink_to_fit();
}
}
use super::common::*;
pub fn run_socket_worker(
state: State,

View file

@ -1,24 +1,10 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use histogram::Histogram;
use aquatic_common::access_list::AccessListMode;
use crate::common::*;
use super::common::*;
use crate::config::Config;
pub fn update_access_list(config: &Config, access_list: &Arc<AccessList>) {
match config.access_list.mode {
AccessListMode::White | AccessListMode::Black => {
if let Err(err) = access_list.update_from_path(&config.access_list.path) {
::log::error!("Update access list from path: {:?}", err);
}
}
AccessListMode::Off => {}
}
}
pub fn gather_and_print_statistics(state: &State, config: &Config) {
let interval = config.statistics.interval;

View file

@ -8,6 +8,7 @@ use rand_distr::Pareto;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use crate::common::*;
use crate::config::BenchConfig;

View file

@ -15,7 +15,8 @@ use std::time::Duration;
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp::handlers;
use aquatic_udp::mio::common::*;
use aquatic_udp::mio::handlers;
use config::BenchConfig;

View file

@ -8,6 +8,7 @@ use rand_distr::Pareto;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use crate::common::*;
use crate::config::BenchConfig;

View file

@ -2,7 +2,7 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::AccessList;
use aquatic_common::access_list::{AccessList, AccessListArcSwap};
use crossbeam_channel::{Receiver, Sender};
use hashbrown::HashMap;
use indexmap::IndexMap;
@ -136,7 +136,7 @@ impl TorrentMaps {
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessList>,
pub access_list: Arc<AccessListArcSwap>,
pub torrent_maps: Arc<Mutex<TorrentMaps>>,
}

View file

@ -33,7 +33,10 @@ pub fn run(config: Config) -> anyhow::Result<()> {
tasks::update_access_list(&config, &state);
state.torrent_maps.lock().clean(&config, &state.access_list);
state
.torrent_maps
.lock()
.clean(&config, &state.access_list.load_full());
}
}

View file

@ -2,6 +2,7 @@ use std::io::ErrorKind;
use std::time::Duration;
use std::vec::Drain;
use aquatic_common::access_list::AccessListQuery;
use crossbeam_channel::Receiver;
use hashbrown::HashMap;
use log::{debug, error, info};

View file

@ -1,4 +1,4 @@
use aquatic_common::access_list::AccessListMode;
use aquatic_common::access_list::{AccessListMode, AccessListQuery};
use histogram::Histogram;
use crate::common::*;