Merge pull request #189 from greatest-ape/udp-overhaul-workers

udp: rewrite to use shared state instead of socket/swarm workers
This commit is contained in:
Joakim Frostegård 2024-02-11 13:57:18 +01:00 committed by GitHub
commit 69ead985af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 777 additions and 1326 deletions

View file

@ -18,28 +18,28 @@
#### Changed
* Switch from socket worker/swarm worker division to a single type of worker,
for performance reasons. Several config file keys were removed since they
are no longer needed.
* Index peers by packet source IP and provided port, instead of by peer_id.
This prevents users from impersonating others and is likely also slightly
faster for IPv4 peers.
* Remove support for unbounded worker channels
* Add backpressure in socket workers. They will postpone reading from the
socket if sending a request to a swarm worker failed
* Avoid a heap allocation for torrents with two or less peers. This can save
a lot of memory if many torrents are tracked
* Improve announce performance by avoiding having to filter response peers
* In announce response statistics, don't include announcing peer
* Distribute announce responses from swarm workers over socket workers to
decrease performance loss due to underutilized threads
* Harden ConnectionValidator to make IP spoofing even more costly
* Remove config key `network.poll_event_capacity` (always use 1)
* Speed up parsing and serialization of requests and responses by using
[zerocopy](https://crates.io/crates/zerocopy)
* Report socket worker related prometheus stats per worker
* Remove CPU pinning support
#### Fixed
* Quit whole application if any worker thread quits
* Disallow announce requests with port value of 0
* Fix io_uring UB issues
### aquatic_http

33
Cargo.lock generated
View file

@ -315,6 +315,7 @@ dependencies = [
"mimalloc",
"mio",
"num-format",
"parking_lot",
"quickcheck",
"quickcheck_macros",
"rand",
@ -2001,6 +2002,29 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.48.5",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
@ -2269,6 +2293,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "ref-cast"
version = "1.0.22"

View file

@ -31,19 +31,15 @@ Known users:
## Performance of the UDP implementation
![UDP BitTorrent tracker throughput comparison](./documents/aquatic-udp-load-test-illustration-2023-01-11.png)
![UDP BitTorrent tracker throughput](./documents/aquatic-udp-load-test-2024-02-10.png)
More benchmark details are available [here](./documents/aquatic-udp-load-test-2023-01-11.pdf).
More benchmark details are available [here](./documents/aquatic-udp-load-test-2024-02-10.md).
## Usage
Please refer to the README pages for the respective implementations listed in
the table above.
## Architectural overview
![Architectural overview of aquatic](./documents/aquatic-architecture-2024.svg)
## Copyright and license
Copyright (c) Joakim Frostegård

View file

@ -58,6 +58,12 @@ impl UdpCommand {
indexmap::indexmap! {
1 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(1, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(1, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(0, Priority::Medium), // Handle requests within event loop
OpenTrackerUdpRunner::new(1, Priority::High),
@ -74,16 +80,13 @@ impl UdpCommand {
2 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(1, 1, Priority::Medium),
AquaticUdpRunner::with_mio(2, 1, Priority::High),
AquaticUdpRunner::with_mio(2, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(1, 1, Priority::Medium),
AquaticUdpRunner::with_io_uring(2, 1, Priority::High),
AquaticUdpRunner::with_io_uring(2, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(2, Priority::High),
OpenTrackerUdpRunner::new(4, Priority::Medium),
],
UdpTracker::Chihaya => vec![
ChihayaUdpRunner::new(),
@ -97,12 +100,10 @@ impl UdpCommand {
4 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(3, 1, Priority::High),
AquaticUdpRunner::with_mio(4, 1, Priority::Medium),
AquaticUdpRunner::with_mio(4, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(3, 1, Priority::High),
AquaticUdpRunner::with_io_uring(4, 1, Priority::Medium),
AquaticUdpRunner::with_io_uring(4, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(4, Priority::High),
@ -119,10 +120,10 @@ impl UdpCommand {
6 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(5, 1, Priority::High),
AquaticUdpRunner::with_mio(6, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(5, 1, Priority::High),
AquaticUdpRunner::with_io_uring(6, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(6, Priority::High),
@ -139,10 +140,10 @@ impl UdpCommand {
8 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(7, 1, Priority::High),
AquaticUdpRunner::with_mio(8, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(7, 1, Priority::High),
AquaticUdpRunner::with_io_uring(8, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(8, Priority::High),
@ -159,12 +160,10 @@ impl UdpCommand {
12 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(10, 2, Priority::High),
AquaticUdpRunner::with_mio(9, 3, Priority::Medium),
AquaticUdpRunner::with_mio(12, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(10, 2, Priority::High),
AquaticUdpRunner::with_io_uring(9, 3, Priority::Medium),
AquaticUdpRunner::with_io_uring(12, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(12, Priority::High),
@ -181,10 +180,10 @@ impl UdpCommand {
16 => SetConfig {
implementations: indexmap! {
UdpTracker::Aquatic => vec![
AquaticUdpRunner::with_mio(13, 3, Priority::High),
AquaticUdpRunner::with_mio(16, Priority::High),
],
UdpTracker::AquaticIoUring => vec![
AquaticUdpRunner::with_io_uring(13, 3, Priority::High),
AquaticUdpRunner::with_io_uring(16, Priority::High),
],
UdpTracker::OpenTracker => vec![
OpenTrackerUdpRunner::new(16, Priority::High),
@ -211,7 +210,6 @@ impl UdpCommand {
#[derive(Debug, Clone)]
struct AquaticUdpRunner {
socket_workers: usize,
swarm_workers: usize,
use_io_uring: bool,
priority: Priority,
}
@ -219,24 +217,20 @@ struct AquaticUdpRunner {
impl AquaticUdpRunner {
fn with_mio(
socket_workers: usize,
swarm_workers: usize,
priority: Priority,
) -> Rc<dyn ProcessRunner<Command = UdpCommand>> {
Rc::new(Self {
socket_workers,
swarm_workers,
use_io_uring: false,
priority,
})
}
fn with_io_uring(
socket_workers: usize,
swarm_workers: usize,
priority: Priority,
) -> Rc<dyn ProcessRunner<Command = UdpCommand>> {
Rc::new(Self {
socket_workers,
swarm_workers,
use_io_uring: true,
priority,
})
@ -256,7 +250,6 @@ impl ProcessRunner for AquaticUdpRunner {
let mut c = aquatic_udp::config::Config::default();
c.socket_workers = self.socket_workers;
c.swarm_workers = self.swarm_workers;
c.network.address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
c.network.use_io_uring = self.use_io_uring;
c.protocol.max_response_peers = 30;
@ -283,7 +276,6 @@ impl ProcessRunner for AquaticUdpRunner {
fn keys(&self) -> IndexMap<String, String> {
indexmap! {
"socket workers".to_string() => self.socket_workers.to_string(),
"swarm workers".to_string() => self.swarm_workers.to_string(),
}
}
}

View file

@ -163,6 +163,7 @@ pub enum WorkerType {
Socket(usize),
Statistics,
Signals,
Cleaning,
#[cfg(feature = "prometheus")]
Prometheus,
}
@ -174,6 +175,7 @@ impl Display for WorkerType {
Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)),
Self::Statistics => f.write_str("Statistics worker"),
Self::Signals => f.write_str("Signals worker"),
Self::Cleaning => f.write_str("Cleaning worker"),
#[cfg(feature = "prometheus")]
Self::Prometheus => f.write_str("Prometheus worker"),
}

View file

@ -109,6 +109,10 @@ Implements:
`aquatic_http` has not been tested as much as `aquatic_udp`, but likely works
fine in production.
## Architectural overview
![Architectural overview of aquatic](../../documents/aquatic-architecture-2024.svg)
## Copyright and license
Copyright (c) Joakim Frostegård

View file

@ -48,6 +48,7 @@ log = "0.4"
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.8", features = ["net", "os-poll"] }
num-format = "0.4"
parking_lot = "0.12"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }

View file

@ -21,9 +21,9 @@ This is the most mature implementation in the aquatic family. I consider it full
## Performance
![UDP BitTorrent tracker throughput comparison](../../documents/aquatic-udp-load-test-illustration-2023-01-11.png)
![UDP BitTorrent tracker throughput](../../documents/aquatic-udp-load-test-2024-02-10.png)
More benchmark details are available [here](../../documents/aquatic-udp-load-test-2023-01-11.pdf).
More benchmark details are available [here](../../documents/aquatic-udp-load-test-2024-02-10.md).
## Usage

View file

@ -1,18 +1,15 @@
use std::collections::BTreeMap;
use std::hash::Hash;
use std::iter::repeat_with;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use crossbeam_channel::{Receiver, SendError, Sender, TrySendError};
use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::{CanonicalSocketAddr, ServerStartInstant};
use aquatic_common::ServerStartInstant;
use aquatic_udp_protocol::*;
use crossbeam_utils::CachePadded;
use hdrhistogram::Histogram;
use crate::config::Config;
use crate::swarm::TorrentMaps;
pub const BUFFER_SIZE: usize = 8192;
@ -32,145 +29,10 @@ impl IpVersion {
}
}
#[derive(Clone, Copy, Debug)]
pub struct SocketWorkerIndex(pub usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct SwarmWorkerIndex(pub usize);
impl SwarmWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.swarm_workers)
}
}
#[derive(Debug)]
pub struct PendingScrapeRequest {
pub slab_key: usize,
pub info_hashes: BTreeMap<usize, InfoHash>,
}
#[derive(Debug)]
pub struct PendingScrapeResponse {
pub slab_key: usize,
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
}
#[derive(Debug)]
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(PendingScrapeRequest),
}
#[derive(Debug)]
pub enum ConnectedResponse {
AnnounceIpv4(AnnounceResponse<Ipv4AddrBytes>),
AnnounceIpv6(AnnounceResponse<Ipv6AddrBytes>),
Scrape(PendingScrapeResponse),
}
pub struct ConnectedRequestSender {
index: SocketWorkerIndex,
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>>,
}
impl ConnectedRequestSender {
pub fn new(
index: SocketWorkerIndex,
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>>,
) -> Self {
Self { index, senders }
}
pub fn try_send_to(
&self,
index: SwarmWorkerIndex,
request: ConnectedRequest,
addr: CanonicalSocketAddr,
) -> Result<(), (SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)> {
match self.senders[index.0].try_send((self.index, request, addr)) {
Ok(()) => Ok(()),
Err(TrySendError::Full(r)) => Err((index, r.1, r.2)),
Err(TrySendError::Disconnected(_)) => {
panic!("Request channel {} is disconnected", index.0);
}
}
}
}
pub struct ConnectedResponseSender {
senders: Vec<Sender<(CanonicalSocketAddr, ConnectedResponse)>>,
to_any_last_index_picked: usize,
}
impl ConnectedResponseSender {
pub fn new(senders: Vec<Sender<(CanonicalSocketAddr, ConnectedResponse)>>) -> Self {
Self {
senders,
to_any_last_index_picked: 0,
}
}
pub fn try_send_to(
&self,
index: SocketWorkerIndex,
addr: CanonicalSocketAddr,
response: ConnectedResponse,
) -> Result<(), TrySendError<(CanonicalSocketAddr, ConnectedResponse)>> {
self.senders[index.0].try_send((addr, response))
}
pub fn send_to(
&self,
index: SocketWorkerIndex,
addr: CanonicalSocketAddr,
response: ConnectedResponse,
) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> {
self.senders[index.0].send((addr, response))
}
pub fn send_to_any(
&mut self,
addr: CanonicalSocketAddr,
response: ConnectedResponse,
) -> Result<(), SendError<(CanonicalSocketAddr, ConnectedResponse)>> {
let start = self.to_any_last_index_picked + 1;
let mut message = Some((addr, response));
for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) {
match self.senders[i].try_send(message.take().unwrap()) {
Ok(()) => {
self.to_any_last_index_picked = i;
return Ok(());
}
Err(TrySendError::Full(msg)) => {
message = Some(msg);
}
Err(TrySendError::Disconnected(_)) => {
panic!("ConnectedResponseReceiver disconnected");
}
}
}
let (addr, response) = message.unwrap();
self.to_any_last_index_picked = start % self.senders.len();
self.send_to(
SocketWorkerIndex(self.to_any_last_index_picked),
addr,
response,
)
}
}
pub type ConnectedResponseReceiver = Receiver<(CanonicalSocketAddr, ConnectedResponse)>;
#[derive(Clone)]
pub struct Statistics {
pub socket: Vec<CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>>,
pub swarm: Vec<CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>>,
pub swarm: CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
}
impl Statistics {
@ -179,9 +41,7 @@ impl Statistics {
socket: repeat_with(Default::default)
.take(config.socket_workers)
.collect(),
swarm: repeat_with(Default::default)
.take(config.swarm_workers)
.collect(),
swarm: Default::default(),
}
}
}
@ -230,6 +90,7 @@ pub enum StatisticsMessage {
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrent_maps: TorrentMaps,
pub server_start_instant: ServerStartInstant,
}
@ -237,6 +98,7 @@ impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
torrent_maps: TorrentMaps::default(),
server_start_instant: ServerStartInstant::new(),
}
}

View file

@ -11,36 +11,16 @@ use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Number of socket worker. One per physical core is recommended.
/// Number of socket workers
///
/// Socket workers receive requests from clients and parse them.
/// Responses to connect requests are sent back immediately. Announce and
/// scrape requests are passed on to swarm workers, which generate
/// responses and send them back to the socket worker, which sends them
/// to the client.
/// 0 = automatically set to number of available virtual CPUs
pub socket_workers: usize,
/// Number of swarm workers. One is enough in almost all cases
///
/// Swarm workers receive parsed announce and scrape requests from socket
/// workers, generate responses and send them back to the socket workers.
pub swarm_workers: usize,
pub log_level: LogLevel,
/// Maximum number of items in each channel passing requests/responses
/// between workers. A value of zero is no longer allowed.
pub worker_channel_size: usize,
/// How long to block waiting for requests in swarm workers.
///
/// Higher values means that with zero traffic, the worker will not
/// unnecessarily cause the CPU to wake up as often. However, high values
/// (something like larger than 1000) combined with very low traffic can
/// cause delays in torrent cleaning.
pub request_channel_recv_timeout_ms: u64,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
/// Access list configuration
///
/// The file is read on start and when the program receives `SIGUSR1`. If
@ -48,26 +28,19 @@ pub struct Config {
/// emitting of an error-level log message, while successful updates of the
/// access list result in emitting of an info-level log message.
pub access_list: AccessListConfig,
#[cfg(feature = "cpu-pinning")]
pub cpu_pinning: aquatic_common::cpu_pinning::asc::CpuPinningConfigAsc,
}
impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
swarm_workers: 1,
log_level: LogLevel::Error,
worker_channel_size: 1_024,
request_channel_recv_timeout_ms: 100,
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
access_list: AccessListConfig::default(),
#[cfg(feature = "cpu-pinning")]
cpu_pinning: Default::default(),
}
}
}
@ -100,13 +73,6 @@ pub struct NetworkConfig {
pub socket_recv_buffer_size: usize,
/// Poll timeout in milliseconds (mio backend only)
pub poll_timeout_ms: u64,
#[cfg(feature = "io-uring")]
pub use_io_uring: bool,
/// Number of ring entries (io_uring backend only)
///
/// Will be rounded to next power of two if not already one.
#[cfg(feature = "io-uring")]
pub ring_size: u16,
/// Store this many responses at most for retrying (once) on send failure
/// (mio backend only)
///
@ -114,6 +80,13 @@ pub struct NetworkConfig {
/// such as FreeBSD. Setting the value to zero disables resending
/// functionality.
pub resend_buffer_max_len: usize,
#[cfg(feature = "io-uring")]
pub use_io_uring: bool,
/// Number of ring entries (io_uring backend only)
///
/// Will be rounded to next power of two if not already one.
#[cfg(feature = "io-uring")]
pub ring_size: u16,
}
impl NetworkConfig {
@ -132,11 +105,11 @@ impl Default for NetworkConfig {
only_ipv6: false,
socket_recv_buffer_size: 8_000_000,
poll_timeout_ms: 50,
resend_buffer_max_len: 0,
#[cfg(feature = "io-uring")]
use_io_uring: true,
#[cfg(feature = "io-uring")]
ring_size: 128,
resend_buffer_max_len: 0,
}
}
}
@ -239,28 +212,18 @@ impl Default for StatisticsConfig {
pub struct CleaningConfig {
/// Clean torrents this often (seconds)
pub torrent_cleaning_interval: u64,
/// Clean pending scrape responses this often (seconds)
///
/// In regular operation, there should be no pending scrape responses
/// lingering for long enough to have to be cleaned up this way.
pub pending_scrape_cleaning_interval: u64,
/// Allow clients to use a connection token for this long (seconds)
pub max_connection_age: u32,
/// Remove peers who have not announced for this long (seconds)
pub max_peer_age: u32,
/// Remove pending scrape responses that have not been returned from swarm
/// workers for this long (seconds)
pub max_pending_scrape_age: u32,
}
impl Default for CleaningConfig {
fn default() -> Self {
Self {
torrent_cleaning_interval: 60 * 2,
pending_scrape_cleaning_interval: 60 * 10,
max_connection_age: 60 * 2,
max_peer_age: 60 * 20,
max_pending_scrape_age: 60,
}
}
}

View file

@ -1,130 +1,62 @@
pub mod common;
pub mod config;
pub mod swarm;
pub mod workers;
use std::collections::BTreeMap;
use std::thread::{sleep, Builder, JoinHandle};
use std::thread::{available_parallelism, sleep, Builder, JoinHandle};
use std::time::Duration;
use anyhow::Context;
use aquatic_common::WorkerType;
use crossbeam_channel::{bounded, unbounded};
use crossbeam_channel::unbounded;
use signal_hook::consts::SIGUSR1;
use signal_hook::iterator::Signals;
use aquatic_common::access_list::update_access_list;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::PrivilegeDropper;
use common::{
ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, Statistics,
SwarmWorkerIndex,
};
use common::{State, Statistics};
use config::Config;
use workers::socket::ConnectionValidator;
use workers::swarm::SwarmWorker;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> ::anyhow::Result<()> {
pub fn run(mut config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1])?;
if config.socket_workers == 0 {
config.socket_workers = available_parallelism().map(Into::into).unwrap_or(1);
};
let state = State::default();
let statistics = Statistics::new(&config);
let connection_validator = ConnectionValidator::new(&config)?;
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let mut join_handles = Vec::new();
let (statistics_sender, statistics_receiver) = unbounded();
update_access_list(&config.access_list, &state.access_list)?;
let mut request_senders = Vec::new();
let mut request_receivers = BTreeMap::new();
let mut response_senders = Vec::new();
let mut response_receivers = BTreeMap::new();
let (statistics_sender, statistics_receiver) = unbounded();
for i in 0..config.swarm_workers {
let (request_sender, request_receiver) = bounded(config.worker_channel_size);
request_senders.push(request_sender);
request_receivers.insert(i, request_receiver);
}
for i in 0..config.socket_workers {
let (response_sender, response_receiver) = bounded(config.worker_channel_size);
response_senders.push(response_sender);
response_receivers.insert(i, response_receiver);
}
for i in 0..config.swarm_workers {
let config = config.clone();
let state = state.clone();
let request_receiver = request_receivers.remove(&i).unwrap().clone();
let response_sender = ConnectedResponseSender::new(response_senders.clone());
let statistics_sender = statistics_sender.clone();
let statistics = statistics.swarm[i].clone();
let handle = Builder::new()
.name(format!("swarm-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::SwarmWorker(i),
);
let mut worker = SwarmWorker {
config,
state,
statistics,
request_receiver,
response_sender,
statistics_sender,
worker_index: SwarmWorkerIndex(i),
};
worker.run()
})
.with_context(|| "spawn swarm worker")?;
join_handles.push((WorkerType::Swarm(i), handle));
}
let mut join_handles = Vec::new();
// Spawn socket worker threads
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();
let connection_validator = connection_validator.clone();
let request_sender =
ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone());
let response_receiver = response_receivers.remove(&i).unwrap();
let priv_dropper = priv_dropper.clone();
let statistics = statistics.socket[i].clone();
let statistics_sender = statistics_sender.clone();
let handle = Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::SocketWorker(i),
);
workers::socket::run_socket_worker(
config,
state,
statistics,
statistics_sender,
connection_validator,
request_sender,
response_receiver,
priv_dropper,
)
})
@ -133,6 +65,31 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Socket(i), handle));
}
// Spawn cleaning thread
{
let state = state.clone();
let config = config.clone();
let statistics = statistics.swarm.clone();
let statistics_sender = statistics_sender.clone();
let handle = Builder::new().name("cleaning".into()).spawn(move || loop {
sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
state.torrent_maps.clean_and_update_statistics(
&config,
&statistics,
&statistics_sender,
&state.access_list,
state.server_start_instant,
);
})?;
join_handles.push((WorkerType::Cleaning, handle));
}
// Spawn statistics thread
if config.statistics.active() {
let state = state.clone();
let config = config.clone();
@ -140,14 +97,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let handle = Builder::new()
.name("statistics".into())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
workers::statistics::run_statistics_worker(
config,
state,
@ -160,6 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Statistics, handle));
}
// Spawn prometheus endpoint thread
#[cfg(feature = "prometheus")]
if config.statistics.active() && config.statistics.run_prometheus_endpoint {
let handle = aquatic_common::spawn_prometheus_endpoint(
@ -180,14 +130,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let handle: JoinHandle<anyhow::Result<()>> = Builder::new()
.name("signals".into())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
@ -204,14 +146,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
join_handles.push((WorkerType::Signals, handle));
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::Util,
);
// Quit application if any worker returns or panics
loop {
for (i, (_, handle)) in join_handles.iter().enumerate() {
if handle.is_finished() {

View file

@ -1,17 +1,24 @@
use std::iter::repeat_with;
use std::net::IpAddr;
use std::ops::DerefMut;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use aquatic_common::IndexMap;
use aquatic_common::SecondsSinceServerStart;
use aquatic_common::ServerStartInstant;
use aquatic_common::{
access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache, AccessListMode},
ValidUntil,
};
use aquatic_common::{CanonicalSocketAddr, IndexMap};
use aquatic_udp_protocol::*;
use arrayvec::ArrayVec;
use crossbeam_channel::Sender;
use hashbrown::HashMap;
use hdrhistogram::Histogram;
use parking_lot::RwLockUpgradableReadGuard;
use rand::prelude::SmallRng;
use rand::Rng;
@ -20,51 +27,108 @@ use crate::config::Config;
const SMALL_PEER_MAP_CAPACITY: usize = 2;
use aquatic_udp_protocol::InfoHash;
use parking_lot::RwLock;
#[derive(Clone)]
pub struct TorrentMaps {
pub ipv4: TorrentMap<Ipv4AddrBytes>,
pub ipv6: TorrentMap<Ipv6AddrBytes>,
ipv4: TorrentMapShards<Ipv4AddrBytes>,
ipv6: TorrentMapShards<Ipv6AddrBytes>,
}
impl Default for TorrentMaps {
fn default() -> Self {
const NUM_SHARDS: usize = 16;
Self {
ipv4: TorrentMap(Default::default()),
ipv6: TorrentMap(Default::default()),
ipv4: TorrentMapShards::new(NUM_SHARDS),
ipv6: TorrentMapShards::new(NUM_SHARDS),
}
}
}
impl TorrentMaps {
pub fn announce(
&self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
rng: &mut SmallRng,
request: &AnnounceRequest,
src: CanonicalSocketAddr,
valid_until: ValidUntil,
) -> Response {
match src.get().ip() {
IpAddr::V4(ip_address) => Response::AnnounceIpv4(self.ipv4.announce(
config,
statistics_sender,
rng,
request,
ip_address.into(),
valid_until,
)),
IpAddr::V6(ip_address) => Response::AnnounceIpv6(self.ipv6.announce(
config,
statistics_sender,
rng,
request,
ip_address.into(),
valid_until,
)),
}
}
pub fn scrape(&self, request: ScrapeRequest, src: CanonicalSocketAddr) -> ScrapeResponse {
if src.is_ipv4() {
self.ipv4.scrape(request)
} else {
self.ipv6.scrape(request)
}
}
/// Remove forbidden or inactive torrents, reclaim space and update statistics
pub fn clean_and_update_statistics(
&mut self,
&self,
config: &Config,
state: &State,
statistics: &CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
statistics_sender: &Sender<StatisticsMessage>,
access_list: &Arc<AccessListArcSwap>,
server_start_instant: ServerStartInstant,
) {
let mut cache = create_access_list_cache(access_list);
let mode = config.access_list.mode;
let now = state.server_start_instant.seconds_elapsed();
let now = server_start_instant.seconds_elapsed();
let ipv4 =
self.ipv4
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
let ipv6 =
self.ipv6
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
let mut statistics_messages = Vec::new();
let ipv4 = self.ipv4.clean_and_get_statistics(
config,
&mut statistics_messages,
&mut cache,
mode,
now,
);
let ipv6 = self.ipv6.clean_and_get_statistics(
config,
&mut statistics_messages,
&mut cache,
mode,
now,
);
if config.statistics.active() {
statistics.ipv4.peers.store(ipv4.0, Ordering::Relaxed);
statistics.ipv6.peers.store(ipv6.0, Ordering::Relaxed);
statistics.ipv4.torrents.store(ipv4.0, Ordering::Relaxed);
statistics.ipv6.torrents.store(ipv6.0, Ordering::Relaxed);
statistics.ipv4.peers.store(ipv4.1, Ordering::Relaxed);
statistics.ipv6.peers.store(ipv6.1, Ordering::Relaxed);
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err);
}
if let Some(message) = ipv4.2 {
statistics_messages.push(StatisticsMessage::Ipv4PeerHistogram(message));
}
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
if let Some(message) = ipv6.2 {
statistics_messages.push(StatisticsMessage::Ipv6PeerHistogram(message));
}
for message in statistics_messages {
if let Err(err) = statistics_sender.try_send(message) {
::log::error!("couldn't send statistics message: {:#}", err);
}
@ -73,116 +137,200 @@ impl TorrentMaps {
}
}
#[derive(Default)]
pub struct TorrentMap<I: Ip>(pub IndexMap<InfoHash, TorrentData<I>>);
#[derive(Clone)]
pub struct TorrentMapShards<I: Ip>(Arc<[RwLock<TorrentMapShard<I>>]>);
impl<I: Ip> TorrentMap<I> {
pub fn scrape(&mut self, request: PendingScrapeRequest) -> PendingScrapeResponse {
let torrent_stats = request
.info_hashes
.into_iter()
.map(|(i, info_hash)| {
let stats = self
.0
.get(&info_hash)
.map(|torrent_data| torrent_data.scrape_statistics())
.unwrap_or_else(|| TorrentScrapeStatistics {
seeders: NumberOfPeers::new(0),
leechers: NumberOfPeers::new(0),
completed: NumberOfDownloads::new(0),
});
(i, stats)
})
.collect();
PendingScrapeResponse {
slab_key: request.slab_key,
torrent_stats,
}
impl<I: Ip> TorrentMapShards<I> {
fn new(num_shards: usize) -> Self {
Self(
repeat_with(Default::default)
.take(num_shards)
.collect::<Vec<_>>()
.into_boxed_slice()
.into(),
)
}
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
fn clean_and_get_statistics(
&mut self,
fn announce(
&self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
access_list_cache: &mut AccessListCache,
access_list_mode: AccessListMode,
now: SecondsSinceServerStart,
) -> (usize, Option<Histogram<u64>>) {
let mut total_num_peers = 0;
rng: &mut SmallRng,
request: &AnnounceRequest,
ip_address: I,
valid_until: ValidUntil,
) -> AnnounceResponse<I> {
let torrent_data = {
let torrent_map_shard = self.get_shard(&request.info_hash).upgradable_read();
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
{
match Histogram::new(3) {
Ok(histogram) => Some(histogram),
Err(err) => {
::log::error!("Couldn't create peer histogram: {:#}", err);
None
}
// Clone Arc here to avoid keeping lock on whole shard
if let Some(torrent_data) = torrent_map_shard.get(&request.info_hash) {
torrent_data.clone()
} else {
// Don't overwrite entry if created in the meantime
RwLockUpgradableReadGuard::upgrade(torrent_map_shard)
.entry(request.info_hash)
.or_default()
.clone()
}
} else {
None
};
self.0.retain(|info_hash, torrent| {
if !access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
{
return false;
}
let mut peer_map = torrent_data.peer_map.write();
let num_peers = match torrent {
TorrentData::Small(peer_map) => {
peer_map.clean_and_get_num_peers(config, statistics_sender, now)
}
TorrentData::Large(peer_map) => {
let num_peers =
peer_map.clean_and_get_num_peers(config, statistics_sender, now);
peer_map.announce(
config,
statistics_sender,
rng,
request,
ip_address,
valid_until,
)
}
if let Some(peer_map) = peer_map.try_shrink() {
*torrent = TorrentData::Small(peer_map);
}
fn scrape(&self, request: ScrapeRequest) -> ScrapeResponse {
let mut response = ScrapeResponse {
transaction_id: request.transaction_id,
torrent_stats: Vec::with_capacity(request.info_hashes.len()),
};
num_peers
for info_hash in request.info_hashes {
let torrent_map_shard = self.get_shard(&info_hash);
let statistics = if let Some(torrent_data) = torrent_map_shard.read().get(&info_hash) {
torrent_data.peer_map.read().scrape_statistics()
} else {
TorrentScrapeStatistics {
seeders: NumberOfPeers::new(0),
leechers: NumberOfPeers::new(0),
completed: NumberOfDownloads::new(0),
}
};
total_num_peers += num_peers;
response.torrent_stats.push(statistics);
}
match opt_histogram {
Some(ref mut histogram) if num_peers > 0 => {
let n = num_peers.try_into().expect("Couldn't fit usize into u64");
if let Err(err) = histogram.record(n) {
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
}
}
_ => (),
}
num_peers > 0
});
self.0.shrink_to_fit();
(total_num_peers, opt_histogram)
response
}
pub fn num_torrents(&self) -> usize {
self.0.len()
fn clean_and_get_statistics(
&self,
config: &Config,
statistics_messages: &mut Vec<StatisticsMessage>,
access_list_cache: &mut AccessListCache,
access_list_mode: AccessListMode,
now: SecondsSinceServerStart,
) -> (usize, usize, Option<Histogram<u64>>) {
let mut total_num_torrents = 0;
let mut total_num_peers = 0;
let mut opt_histogram: Option<Histogram<u64>> = config
.statistics
.torrent_peer_histograms
.then(|| Histogram::new(3).expect("create peer histogram"));
for torrent_map_shard in self.0.iter() {
for torrent_data in torrent_map_shard.read().values() {
let mut peer_map = torrent_data.peer_map.write();
let num_peers = match peer_map.deref_mut() {
PeerMap::Small(small_peer_map) => {
small_peer_map.clean_and_get_num_peers(config, statistics_messages, now)
}
PeerMap::Large(large_peer_map) => {
let num_peers = large_peer_map.clean_and_get_num_peers(
config,
statistics_messages,
now,
);
if let Some(small_peer_map) = large_peer_map.try_shrink() {
*peer_map = PeerMap::Small(small_peer_map);
}
num_peers
}
};
drop(peer_map);
match opt_histogram.as_mut() {
Some(histogram) if num_peers > 0 => {
if let Err(err) = histogram.record(num_peers as u64) {
::log::error!("Couldn't record {} to histogram: {:#}", num_peers, err);
}
}
_ => (),
}
total_num_peers += num_peers;
torrent_data
.pending_removal
.store(num_peers == 0, Ordering::Release);
}
let mut torrent_map_shard = torrent_map_shard.write();
torrent_map_shard.retain(|info_hash, torrent_data| {
if !access_list_cache
.load()
.allows(access_list_mode, &info_hash.0)
{
return false;
}
// Check pending_removal flag set in previous cleaning step. This
// prevents us from removing TorrentData entries that were just
// added but do not yet contain any peers. Also double-check that
// no peers have been added since we last checked.
if torrent_data
.pending_removal
.fetch_and(false, Ordering::Acquire)
&& torrent_data.peer_map.read().is_empty()
{
return false;
}
true
});
torrent_map_shard.shrink_to_fit();
total_num_torrents += torrent_map_shard.len();
}
(total_num_torrents, total_num_peers, opt_histogram)
}
fn get_shard(&self, info_hash: &InfoHash) -> &RwLock<TorrentMapShard<I>> {
self.0.get(info_hash.0[0] as usize % self.0.len()).unwrap()
}
}
pub enum TorrentData<I: Ip> {
/// Use HashMap instead of IndexMap for better lookup performance
type TorrentMapShard<T> = HashMap<InfoHash, Arc<TorrentData<T>>>;
pub struct TorrentData<T: Ip> {
peer_map: RwLock<PeerMap<T>>,
pending_removal: AtomicBool,
}
impl<I: Ip> Default for TorrentData<I> {
fn default() -> Self {
Self {
peer_map: Default::default(),
pending_removal: Default::default(),
}
}
}
pub enum PeerMap<I: Ip> {
Small(SmallPeerMap<I>),
Large(LargePeerMap<I>),
}
impl<I: Ip> TorrentData<I> {
pub fn announce(
impl<I: Ip> PeerMap<I> {
fn announce(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
@ -298,7 +446,7 @@ impl<I: Ip> TorrentData<I> {
response
}
pub fn scrape_statistics(&self) -> TorrentScrapeStatistics {
fn scrape_statistics(&self) -> TorrentScrapeStatistics {
let (seeders, leechers) = match self {
Self::Small(peer_map) => peer_map.num_seeders_leechers(),
Self::Large(peer_map) => peer_map.num_seeders_leechers(),
@ -310,9 +458,16 @@ impl<I: Ip> TorrentData<I> {
completed: NumberOfDownloads::new(0),
}
}
fn is_empty(&self) -> bool {
match self {
Self::Small(peer_map) => peer_map.0.is_empty(),
Self::Large(peer_map) => peer_map.peers.is_empty(),
}
}
}
impl<I: Ip> Default for TorrentData<I> {
impl<I: Ip> Default for PeerMap<I> {
fn default() -> Self {
Self::Small(SmallPeerMap(ArrayVec::default()))
}
@ -357,20 +512,14 @@ impl<I: Ip> SmallPeerMap<I> {
fn clean_and_get_num_peers(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
statistics_messages: &mut Vec<StatisticsMessage>,
now: SecondsSinceServerStart,
) -> usize {
self.0.retain(|(_, peer)| {
let keep = peer.valid_until.valid(now);
if !keep
&& config.statistics.peer_clients
&& statistics_sender
.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
.is_err()
{
// Should never happen in practice
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
if !keep && config.statistics.peer_clients {
statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id));
}
keep
@ -421,12 +570,11 @@ impl<I: Ip> LargePeerMap<I> {
/// Extract response peers
///
/// If there are more peers in map than `max_num_peers_to_take`, do a random
/// selection of peers from first and second halves of map in order to avoid
/// returning too homogeneous peers.
///
/// Does NOT filter out announcing peer.
pub fn extract_response_peers(
/// If there are more peers in map than `max_num_peers_to_take`, do a
/// random selection of peers from first and second halves of map in
/// order to avoid returning too homogeneous peers. This is a lot more
/// cache-friendly than doing a fully random selection.
fn extract_response_peers(
&self,
rng: &mut impl Rng,
max_num_peers_to_take: usize,
@ -456,10 +604,10 @@ impl<I: Ip> LargePeerMap<I> {
let mut peers = Vec::with_capacity(max_num_peers_to_take);
if let Some(slice) = self.peers.get_range(offset_half_one..end_half_one) {
peers.extend(slice.keys());
peers.extend(slice.keys().copied());
}
if let Some(slice) = self.peers.get_range(offset_half_two..end_half_two) {
peers.extend(slice.keys());
peers.extend(slice.keys().copied());
}
peers
@ -469,7 +617,7 @@ impl<I: Ip> LargePeerMap<I> {
fn clean_and_get_num_peers(
&mut self,
config: &Config,
statistics_sender: &Sender<StatisticsMessage>,
statistics_messages: &mut Vec<StatisticsMessage>,
now: SecondsSinceServerStart,
) -> usize {
self.peers.retain(|_, peer| {
@ -479,13 +627,8 @@ impl<I: Ip> LargePeerMap<I> {
if peer.is_seeder {
self.num_seeders -= 1;
}
if config.statistics.peer_clients
&& statistics_sender
.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
.is_err()
{
// Should never happen in practice
::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
if config.statistics.peer_clients {
statistics_messages.push(StatisticsMessage::PeerRemoved(peer.peer_id));
}
}

View file

@ -1,3 +1,2 @@
pub mod socket;
pub mod statistics;
pub mod swarm;

View file

@ -1,9 +1,10 @@
use std::io::{Cursor, ErrorKind};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use std::time::Duration;
use anyhow::Context;
use aquatic_common::access_list::AccessListCache;
use crossbeam_channel::Sender;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
@ -12,40 +13,26 @@ use aquatic_common::{
ValidUntil,
};
use aquatic_udp_protocol::*;
use rand::rngs::SmallRng;
use rand::SeedableRng;
use crate::common::*;
use crate::config::Config;
use super::storage::PendingScrapeResponseSlab;
use super::validator::ConnectionValidator;
use super::{create_socket, EXTRA_PACKET_SIZE_IPV4, EXTRA_PACKET_SIZE_IPV6};
enum HandleRequestError {
RequestChannelFull(Vec<(SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>),
}
#[derive(Clone, Copy, Debug)]
enum PollMode {
Regular,
SkipPolling,
SkipReceiving,
}
pub struct SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
statistics_sender: Sender<StatisticsMessage>,
access_list_cache: AccessListCache,
validator: ConnectionValidator,
pending_scrape_responses: PendingScrapeResponseSlab,
socket: UdpSocket,
opt_resend_buffer: Option<Vec<(CanonicalSocketAddr, Response)>>,
buffer: [u8; BUFFER_SIZE],
polling_mode: PollMode,
/// Storage for requests that couldn't be sent to swarm worker because channel was full
pending_requests: Vec<(SwarmWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
rng: SmallRng,
peer_valid_until: ValidUntil,
}
impl SocketWorker {
@ -53,35 +40,36 @@ impl SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
statistics_sender: Sender<StatisticsMessage>,
validator: ConnectionValidator,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
let socket = UdpSocket::from_std(create_socket(&config, priv_dropper)?);
let access_list_cache = create_access_list_cache(&shared_state.access_list);
let opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new());
let peer_valid_until = ValidUntil::new(
shared_state.server_start_instant,
config.cleaning.max_peer_age,
);
let mut worker = Self {
config,
shared_state,
statistics,
statistics_sender,
validator,
request_sender,
response_receiver,
access_list_cache,
pending_scrape_responses: Default::default(),
socket,
opt_resend_buffer,
buffer: [0; BUFFER_SIZE],
polling_mode: PollMode::Regular,
pending_requests: Default::default(),
rng: SmallRng::from_entropy(),
peer_valid_until,
};
worker.run_inner()
}
pub fn run_inner(&mut self) -> anyhow::Result<()> {
let mut opt_resend_buffer =
(self.config.network.resend_buffer_max_len > 0).then_some(Vec::new());
let mut events = Events::with_capacity(1);
let mut poll = Poll::new().context("create poll")?;
@ -91,94 +79,41 @@ impl SocketWorker {
let poll_timeout = Duration::from_millis(self.config.network.poll_timeout_ms);
let pending_scrape_cleaning_duration =
Duration::from_secs(self.config.cleaning.pending_scrape_cleaning_interval);
let mut pending_scrape_valid_until = ValidUntil::new(
self.shared_state.server_start_instant,
self.config.cleaning.max_pending_scrape_age,
);
let mut last_pending_scrape_cleaning = Instant::now();
let mut iter_counter = 0usize;
let mut iter_counter = 0u64;
loop {
match self.polling_mode {
PollMode::Regular => {
poll.poll(&mut events, Some(poll_timeout)).context("poll")?;
poll.poll(&mut events, Some(poll_timeout)).context("poll")?;
for event in events.iter() {
if event.is_readable() {
self.read_and_handle_requests(pending_scrape_valid_until);
}
}
}
PollMode::SkipPolling => {
self.polling_mode = PollMode::Regular;
// Continue reading from socket without polling, since
// reading was previouly cancelled
self.read_and_handle_requests(pending_scrape_valid_until);
}
PollMode::SkipReceiving => {
::log::debug!("Postponing receiving requests because swarm worker channel is full. This means that the OS will be relied on to buffer incoming packets. To prevent this, raise config.worker_channel_size.");
self.polling_mode = PollMode::SkipPolling;
for event in events.iter() {
if event.is_readable() {
self.read_and_handle_requests(&mut opt_resend_buffer);
}
}
// If resend buffer is enabled, send any responses in it
if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() {
if let Some(resend_buffer) = opt_resend_buffer.as_mut() {
for (addr, response) in resend_buffer.drain(..) {
send_response(
&self.config,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut None,
response,
addr,
);
self.send_response(&mut None, addr, response);
}
}
// Check channel for any responses generated by swarm workers
self.handle_swarm_worker_responses();
// Try sending pending requests
while let Some((index, request, addr)) = self.pending_requests.pop() {
if let Err(r) = self.request_sender.try_send_to(index, request, addr) {
self.pending_requests.push(r);
self.polling_mode = PollMode::SkipReceiving;
break;
}
}
// Run periodic ValidUntil updates and state cleaning
if iter_counter % 256 == 0 {
let seconds_since_start = self.shared_state.server_start_instant.seconds_elapsed();
self.validator.update_elapsed();
pending_scrape_valid_until = ValidUntil::new_with_now(
seconds_since_start,
self.config.cleaning.max_pending_scrape_age,
self.peer_valid_until = ValidUntil::new(
self.shared_state.server_start_instant,
self.config.cleaning.max_peer_age,
);
let now = Instant::now();
if now > last_pending_scrape_cleaning + pending_scrape_cleaning_duration {
self.pending_scrape_responses.clean(seconds_since_start);
last_pending_scrape_cleaning = now;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn read_and_handle_requests(&mut self, pending_scrape_valid_until: ValidUntil) {
fn read_and_handle_requests(
&mut self,
opt_resend_buffer: &mut Option<Vec<(CanonicalSocketAddr, Response)>>,
) {
let max_scrape_torrents = self.config.protocol.max_scrape_torrents;
loop {
@ -222,13 +157,8 @@ impl SocketWorker {
statistics.requests.fetch_add(1, Ordering::Relaxed);
}
if let Err(HandleRequestError::RequestChannelFull(failed_requests)) =
self.handle_request(pending_scrape_valid_until, request, src)
{
self.pending_requests.extend(failed_requests);
self.polling_mode = PollMode::SkipReceiving;
break;
if let Some(response) = self.handle_request(request, src) {
self.send_response(opt_resend_buffer, src, response);
}
}
Err(RequestParseError::Sendable {
@ -241,15 +171,7 @@ impl SocketWorker {
message: err.into(),
};
send_response(
&self.config,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
Response::Error(response),
src,
);
self.send_response(opt_resend_buffer, src, Response::Error(response));
::log::debug!("request parse error (sent error response): {:?}", err);
}
@ -271,34 +193,15 @@ impl SocketWorker {
}
}
fn handle_request(
&mut self,
pending_scrape_valid_until: ValidUntil,
request: Request,
src: CanonicalSocketAddr,
) -> Result<(), HandleRequestError> {
fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) -> Option<Response> {
let access_list_mode = self.config.access_list.mode;
match request {
Request::Connect(request) => {
let connection_id = self.validator.create_connection_id(src);
let response = ConnectResponse {
connection_id,
return Some(Response::Connect(ConnectResponse {
connection_id: self.validator.create_connection_id(src),
transaction_id: request.transaction_id,
};
send_response(
&self.config,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
Response::Connect(response),
src,
);
Ok(())
}));
}
Request::Announce(request) => {
if self
@ -310,34 +213,22 @@ impl SocketWorker {
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let worker_index =
SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash);
self.request_sender
.try_send_to(worker_index, ConnectedRequest::Announce(request), src)
.map_err(|request| {
HandleRequestError::RequestChannelFull(vec![request])
})
} else {
let response = ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
};
send_response(
let response = self.shared_state.torrent_maps.announce(
&self.config,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
Response::Error(response),
&self.statistics_sender,
&mut self.rng,
&request,
src,
self.peer_valid_until,
);
Ok(())
return Some(response);
} else {
return Some(Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
}));
}
} else {
Ok(())
}
}
Request::Scrape(request) => {
@ -345,136 +236,85 @@ impl SocketWorker {
.validator
.connection_id_valid(src, request.connection_id)
{
let split_requests = self.pending_scrape_responses.prepare_split_requests(
&self.config,
request,
pending_scrape_valid_until,
);
return Some(Response::Scrape(
self.shared_state.torrent_maps.scrape(request, src),
));
}
}
}
let mut failed = Vec::new();
None
}
for (swarm_worker_index, request) in split_requests {
if let Err(request) = self.request_sender.try_send_to(
swarm_worker_index,
ConnectedRequest::Scrape(request),
src,
) {
failed.push(request);
}
}
fn send_response(
&mut self,
opt_resend_buffer: &mut Option<Vec<(CanonicalSocketAddr, Response)>>,
canonical_addr: CanonicalSocketAddr,
response: Response,
) {
let mut buffer = Cursor::new(&mut self.buffer[..]);
if failed.is_empty() {
Ok(())
} else {
Err(HandleRequestError::RequestChannelFull(failed))
}
if let Err(err) = response.write_bytes(&mut buffer) {
::log::error!("failed writing response to buffer: {:#}", err);
return;
}
let bytes_written = buffer.position() as usize;
let addr = if self.config.network.address.is_ipv4() {
canonical_addr
.get_ipv4()
.expect("found peer ipv6 address while running bound to ipv4 address")
} else {
canonical_addr.get_ipv6_mapped()
};
match self
.socket
.send_to(&buffer.into_inner()[..bytes_written], addr)
{
Ok(bytes_sent) if self.config.statistics.active() => {
let stats = if canonical_addr.is_ipv4() {
let stats = &self.statistics.ipv4;
stats
.bytes_sent
.fetch_add(bytes_sent + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed);
stats
} else {
Ok(())
}
}
}
}
let stats = &self.statistics.ipv6;
fn handle_swarm_worker_responses(&mut self) {
for (addr, response) in self.response_receiver.try_iter() {
let response = match response {
ConnectedResponse::Scrape(response) => {
if let Some(r) = self
.pending_scrape_responses
.add_and_get_finished(&response)
{
Response::Scrape(r)
} else {
continue;
stats
.bytes_sent
.fetch_add(bytes_sent + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed);
stats
};
match response {
Response::Connect(_) => {
stats.responses_connect.fetch_add(1, Ordering::Relaxed);
}
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats.responses_announce.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_error.fetch_add(1, Ordering::Relaxed);
}
}
ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r),
ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r),
};
send_response(
&self.config,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
response,
addr,
);
}
}
}
fn send_response(
config: &Config,
statistics: &CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
socket: &mut UdpSocket,
buffer: &mut [u8],
opt_resend_buffer: &mut Option<Vec<(CanonicalSocketAddr, Response)>>,
response: Response,
canonical_addr: CanonicalSocketAddr,
) {
let mut buffer = Cursor::new(&mut buffer[..]);
if let Err(err) = response.write_bytes(&mut buffer) {
::log::error!("failed writing response to buffer: {:#}", err);
return;
}
let bytes_written = buffer.position() as usize;
let addr = if config.network.address.is_ipv4() {
canonical_addr
.get_ipv4()
.expect("found peer ipv6 address while running bound to ipv4 address")
} else {
canonical_addr.get_ipv6_mapped()
};
match socket.send_to(&buffer.into_inner()[..bytes_written], addr) {
Ok(amt) if config.statistics.active() => {
let stats = if canonical_addr.is_ipv4() {
let stats = &statistics.ipv4;
stats
.bytes_sent
.fetch_add(amt + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed);
stats
} else {
let stats = &statistics.ipv6;
stats
.bytes_sent
.fetch_add(amt + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed);
stats
};
match response {
Response::Connect(_) => {
stats.responses_connect.fetch_add(1, Ordering::Relaxed);
}
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats.responses_announce.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_error.fetch_add(1, Ordering::Relaxed);
}
}
}
Ok(_) => (),
Err(err) => {
match opt_resend_buffer.as_mut() {
Ok(_) => (),
Err(err) => match opt_resend_buffer.as_mut() {
Some(resend_buffer)
if (err.raw_os_error() == Some(libc::ENOBUFS))
|| (err.kind() == ErrorKind::WouldBlock) =>
{
if resend_buffer.len() < config.network.resend_buffer_max_len {
if resend_buffer.len() < self.config.network.resend_buffer_max_len {
::log::debug!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err);
resend_buffer.push((canonical_addr, response));
@ -485,7 +325,9 @@ fn send_response(
_ => {
::log::warn!("Sending response to {} failed: {:#}", addr, err);
}
}
},
}
::log::debug!("send response fn finished");
}
}

View file

@ -1,17 +1,16 @@
mod mio;
mod storage;
#[cfg(all(target_os = "linux", feature = "io-uring"))]
mod uring;
mod validator;
use anyhow::Context;
use aquatic_common::privileges::PrivilegeDropper;
use crossbeam_channel::Sender;
use socket2::{Domain, Protocol, Socket, Type};
use crate::{
common::{
CachePaddedArc, ConnectedRequestSender, ConnectedResponseReceiver, IpVersionStatistics,
SocketWorkerStatistics, State,
CachePaddedArc, IpVersionStatistics, SocketWorkerStatistics, State, StatisticsMessage,
},
config::Config,
};
@ -43,9 +42,8 @@ pub fn run_socket_worker(
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
statistics_sender: Sender<StatisticsMessage>,
validator: ConnectionValidator,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
@ -56,9 +54,8 @@ pub fn run_socket_worker(
config,
shared_state,
statistics,
statistics_sender,
validator,
request_sender,
response_receiver,
priv_dropper,
);
}
@ -67,9 +64,8 @@ pub fn run_socket_worker(
config,
shared_state,
statistics,
statistics_sender,
validator,
request_sender,
response_receiver,
priv_dropper,
)
}

View file

@ -1,218 +0,0 @@
use std::collections::BTreeMap;
use hashbrown::HashMap;
use slab::Slab;
use aquatic_common::{SecondsSinceServerStart, ValidUntil};
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
#[derive(Debug)]
pub struct PendingScrapeResponseSlabEntry {
num_pending: usize,
valid_until: ValidUntil,
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
transaction_id: TransactionId,
}
#[derive(Default)]
pub struct PendingScrapeResponseSlab(Slab<PendingScrapeResponseSlabEntry>);
impl PendingScrapeResponseSlab {
pub fn prepare_split_requests(
&mut self,
config: &Config,
request: ScrapeRequest,
valid_until: ValidUntil,
) -> impl IntoIterator<Item = (SwarmWorkerIndex, PendingScrapeRequest)> {
let capacity = config.swarm_workers.min(request.info_hashes.len());
let mut split_requests: HashMap<SwarmWorkerIndex, PendingScrapeRequest> =
HashMap::with_capacity(capacity);
if request.info_hashes.is_empty() {
::log::warn!(
"Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes"
);
return split_requests;
}
let vacant_entry = self.0.vacant_entry();
let slab_key = vacant_entry.key();
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
.entry(SwarmWorkerIndex::from_info_hash(config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
slab_key,
info_hashes: BTreeMap::new(),
});
split_request.info_hashes.insert(i, info_hash);
}
vacant_entry.insert(PendingScrapeResponseSlabEntry {
num_pending: split_requests.len(),
valid_until,
torrent_stats: Default::default(),
transaction_id: request.transaction_id,
});
split_requests
}
pub fn add_and_get_finished(
&mut self,
response: &PendingScrapeResponse,
) -> Option<ScrapeResponse> {
let finished = if let Some(entry) = self.0.get_mut(response.slab_key) {
entry.num_pending -= 1;
entry.torrent_stats.extend(response.torrent_stats.iter());
entry.num_pending == 0
} else {
::log::warn!(
"PendingScrapeResponseSlab.add didn't find entry for key {:?}",
response.slab_key
);
false
};
if finished {
let entry = self.0.remove(response.slab_key);
Some(ScrapeResponse {
transaction_id: entry.transaction_id,
torrent_stats: entry.torrent_stats.into_values().collect(),
})
} else {
None
}
}
pub fn clean(&mut self, now: SecondsSinceServerStart) {
self.0.retain(|k, v| {
if v.valid_until.valid(now) {
true
} else {
::log::warn!(
"Unconsumed PendingScrapeResponseSlab entry. {:?}: {:?}",
k,
v
);
false
}
});
self.0.shrink_to_fit();
}
}
#[cfg(test)]
mod tests {
use aquatic_common::ServerStartInstant;
use quickcheck::TestResult;
use quickcheck_macros::quickcheck;
use super::*;
#[quickcheck]
fn test_pending_scrape_response_slab(
request_data: Vec<(i32, i64, u8)>,
swarm_workers: u8,
) -> TestResult {
if swarm_workers == 0 {
return TestResult::discard();
}
let config = Config {
swarm_workers: swarm_workers as usize,
..Default::default()
};
let valid_until = ValidUntil::new(ServerStartInstant::new(), 1);
let mut map = PendingScrapeResponseSlab::default();
let mut requests = Vec::new();
for (t, c, b) in request_data {
if b == 0 {
return TestResult::discard();
}
let mut info_hashes = Vec::new();
for i in 0..b {
let info_hash = InfoHash([i; 20]);
info_hashes.push(info_hash);
}
let request = ScrapeRequest {
transaction_id: TransactionId::new(t),
connection_id: ConnectionId::new(c),
info_hashes,
};
requests.push(request);
}
let mut all_split_requests = Vec::new();
for request in requests.iter() {
let split_requests =
map.prepare_split_requests(&config, request.to_owned(), valid_until);
all_split_requests.push(
split_requests
.into_iter()
.collect::<Vec<(SwarmWorkerIndex, PendingScrapeRequest)>>(),
);
}
assert_eq!(map.0.len(), requests.len());
let mut responses = Vec::new();
for split_requests in all_split_requests {
for (worker_index, split_request) in split_requests {
assert!(worker_index.0 < swarm_workers as usize);
let torrent_stats = split_request
.info_hashes
.into_iter()
.map(|(i, info_hash)| {
(
i,
TorrentScrapeStatistics {
seeders: NumberOfPeers::new((info_hash.0[0]) as i32),
leechers: NumberOfPeers::new(0),
completed: NumberOfDownloads::new(0),
},
)
})
.collect();
let response = PendingScrapeResponse {
slab_key: split_request.slab_key,
torrent_stats,
};
if let Some(response) = map.add_and_get_finished(&response) {
responses.push(response);
}
}
}
assert!(map.0.is_empty());
assert_eq!(responses.len(), requests.len());
TestResult::from_bool(true)
}
}

View file

@ -11,6 +11,7 @@ use std::sync::atomic::Ordering;
use anyhow::Context;
use aquatic_common::access_list::AccessListCache;
use crossbeam_channel::Sender;
use io_uring::opcode::Timeout;
use io_uring::types::{Fixed, Timespec};
use io_uring::{IoUring, Probe};
@ -20,6 +21,8 @@ use aquatic_common::{
ValidUntil,
};
use aquatic_udp_protocol::*;
use rand::rngs::SmallRng;
use rand::SeedableRng;
use crate::common::*;
use crate::config::Config;
@ -28,7 +31,6 @@ use self::buf_ring::BufRing;
use self::recv_helper::RecvHelper;
use self::send_buffers::{ResponseType, SendBuffers};
use super::storage::PendingScrapeResponseSlab;
use super::validator::ConnectionValidator;
use super::{create_socket, EXTRA_PACKET_SIZE_IPV4, EXTRA_PACKET_SIZE_IPV6};
@ -48,7 +50,6 @@ const RESPONSE_BUF_LEN: usize = 2048;
const USER_DATA_RECV: u64 = u64::MAX;
const USER_DATA_PULSE_TIMEOUT: u64 = u64::MAX - 1;
const USER_DATA_CLEANING_TIMEOUT: u64 = u64::MAX - 2;
const SOCKET_IDENTIFIER: Fixed = Fixed(0);
@ -76,22 +77,20 @@ pub struct SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
statistics_sender: Sender<StatisticsMessage>,
access_list_cache: AccessListCache,
validator: ConnectionValidator,
#[allow(dead_code)]
socket: UdpSocket,
pending_scrape_responses: PendingScrapeResponseSlab,
buf_ring: BufRing,
send_buffers: SendBuffers,
recv_helper: RecvHelper,
local_responses: VecDeque<(Response, CanonicalSocketAddr)>,
local_responses: VecDeque<(CanonicalSocketAddr, Response)>,
resubmittable_sqe_buf: Vec<io_uring::squeue::Entry>,
recv_sqe: io_uring::squeue::Entry,
pulse_timeout_sqe: io_uring::squeue::Entry,
cleaning_timeout_sqe: io_uring::squeue::Entry,
pending_scrape_valid_until: ValidUntil,
peer_valid_until: ValidUntil,
rng: SmallRng,
}
impl SocketWorker {
@ -99,9 +98,8 @@ impl SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
statistics_sender: Sender<StatisticsMessage>,
validator: ConnectionValidator,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
priv_dropper: PrivilegeDropper,
) -> anyhow::Result<()> {
let ring_entries = config.network.ring_size.next_power_of_two();
@ -136,57 +134,40 @@ impl SocketWorker {
let recv_sqe = recv_helper.create_entry(buf_ring.bgid());
// This timeout enables regular updates of pending_scrape_valid_until
// and wakes the main loop to send any pending responses in the case
// of no incoming requests
// This timeout enables regular updates of ConnectionValidator and
// peer_valid_until
let pulse_timeout_sqe = {
let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(1))) as *const _;
let timespec_ptr = Box::into_raw(Box::new(Timespec::new().sec(5))) as *const _;
Timeout::new(timespec_ptr)
.build()
.user_data(USER_DATA_PULSE_TIMEOUT)
};
let cleaning_timeout_sqe = {
let timespec_ptr = Box::into_raw(Box::new(
Timespec::new().sec(config.cleaning.pending_scrape_cleaning_interval),
)) as *const _;
let resubmittable_sqe_buf = vec![recv_sqe.clone(), pulse_timeout_sqe.clone()];
Timeout::new(timespec_ptr)
.build()
.user_data(USER_DATA_CLEANING_TIMEOUT)
};
let resubmittable_sqe_buf = vec![
recv_sqe.clone(),
pulse_timeout_sqe.clone(),
cleaning_timeout_sqe.clone(),
];
let pending_scrape_valid_until = ValidUntil::new(
let peer_valid_until = ValidUntil::new(
shared_state.server_start_instant,
config.cleaning.max_pending_scrape_age,
config.cleaning.max_peer_age,
);
let mut worker = Self {
config,
shared_state,
statistics,
statistics_sender,
validator,
request_sender,
response_receiver,
access_list_cache,
pending_scrape_responses: Default::default(),
send_buffers,
recv_helper,
local_responses: Default::default(),
buf_ring,
recv_sqe,
pulse_timeout_sqe,
cleaning_timeout_sqe,
resubmittable_sqe_buf,
socket,
pending_scrape_valid_until,
peer_valid_until,
rng: SmallRng::from_entropy(),
};
CurrentRing::with(|ring| worker.run_inner(ring));
@ -210,7 +191,7 @@ impl SocketWorker {
// Enqueue local responses
for _ in 0..sq_space {
if let Some((response, addr)) = self.local_responses.pop_front() {
if let Some((addr, response)) = self.local_responses.pop_front() {
match self.send_buffers.prepare_entry(response, addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
@ -218,7 +199,7 @@ impl SocketWorker {
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers(response)) => {
self.local_responses.push_front((response, addr));
self.local_responses.push_front((addr, response));
break;
}
@ -231,43 +212,6 @@ impl SocketWorker {
}
}
// Enqueue swarm worker responses
for _ in 0..(sq_space - num_send_added) {
let (addr, response) = if let Ok(r) = self.response_receiver.try_recv() {
r
} else {
break;
};
let response = match response {
ConnectedResponse::AnnounceIpv4(r) => Response::AnnounceIpv4(r),
ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r),
ConnectedResponse::Scrape(r) => {
if let Some(r) = self.pending_scrape_responses.add_and_get_finished(&r) {
Response::Scrape(r)
} else {
continue;
}
}
};
match self.send_buffers.prepare_entry(response, addr) {
Ok(entry) => {
unsafe { ring.submission().push(&entry).unwrap() };
num_send_added += 1;
}
Err(send_buffers::Error::NoBuffers(response)) => {
self.local_responses.push_back((response, addr));
break;
}
Err(send_buffers::Error::SerializationFailed(err)) => {
::log::error!("Failed serializing response: {:#}", err);
}
}
}
// Wait for all sendmsg entries to complete. If none were added,
// wait for at least one recvmsg or timeout in order to avoid
// busy-polling if there is no incoming data.
@ -286,28 +230,25 @@ impl SocketWorker {
fn handle_cqe(&mut self, cqe: io_uring::cqueue::Entry) {
match cqe.user_data() {
USER_DATA_RECV => {
self.handle_recv_cqe(&cqe);
if let Some((addr, response)) = self.handle_recv_cqe(&cqe) {
self.local_responses.push_back((addr, response));
}
if !io_uring::cqueue::more(cqe.flags()) {
self.resubmittable_sqe_buf.push(self.recv_sqe.clone());
}
}
USER_DATA_PULSE_TIMEOUT => {
self.pending_scrape_valid_until = ValidUntil::new(
self.validator.update_elapsed();
self.peer_valid_until = ValidUntil::new(
self.shared_state.server_start_instant,
self.config.cleaning.max_pending_scrape_age,
self.config.cleaning.max_peer_age,
);
self.resubmittable_sqe_buf
.push(self.pulse_timeout_sqe.clone());
}
USER_DATA_CLEANING_TIMEOUT => {
self.pending_scrape_responses
.clean(self.shared_state.server_start_instant.seconds_elapsed());
self.resubmittable_sqe_buf
.push(self.cleaning_timeout_sqe.clone());
}
send_buffer_index => {
let result = cqe.result();
@ -352,12 +293,15 @@ impl SocketWorker {
}
}
fn handle_recv_cqe(&mut self, cqe: &io_uring::cqueue::Entry) {
fn handle_recv_cqe(
&mut self,
cqe: &io_uring::cqueue::Entry,
) -> Option<(CanonicalSocketAddr, Response)> {
let result = cqe.result();
if result < 0 {
if -result == libc::ENOBUFS {
::log::info!("recv failed due to lack of buffers. If increasing ring size doesn't help, get faster hardware");
::log::info!("recv failed due to lack of buffers, try increasing ring size");
} else {
::log::warn!(
"recv failed: {:#}",
@ -365,7 +309,7 @@ impl SocketWorker {
);
}
return;
return None;
}
let buffer = unsafe {
@ -374,23 +318,48 @@ impl SocketWorker {
Ok(None) => {
::log::error!("Couldn't get recv buffer");
return;
return None;
}
Err(err) => {
::log::error!("Couldn't get recv buffer: {:#}", err);
return;
return None;
}
}
};
let addr = match self.recv_helper.parse(buffer.as_slice()) {
match self.recv_helper.parse(buffer.as_slice()) {
Ok((request, addr)) => {
self.handle_request(request, addr);
if self.config.statistics.active() {
let (statistics, extra_bytes) = if addr.is_ipv4() {
(&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4)
} else {
(&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6)
};
addr
statistics
.bytes_received
.fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed);
statistics.requests.fetch_add(1, Ordering::Relaxed);
}
return self.handle_request(request, addr);
}
Err(self::recv_helper::Error::RequestParseError(err, addr)) => {
if self.config.statistics.active() {
if addr.is_ipv4() {
self.statistics
.ipv4
.bytes_received
.fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed);
} else {
self.statistics
.ipv6
.bytes_received
.fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed);
}
}
match err {
RequestParseError::Sendable {
connection_id,
@ -405,60 +374,43 @@ impl SocketWorker {
message: err.into(),
};
self.local_responses.push_back((response.into(), addr));
return Some((addr, Response::Error(response)));
}
}
RequestParseError::Unsendable { err } => {
::log::debug!("Couldn't parse request from {:?}: {}", addr, err);
}
}
addr
}
Err(self::recv_helper::Error::InvalidSocketAddress) => {
::log::debug!("Ignored request claiming to be from port 0");
return;
}
Err(self::recv_helper::Error::RecvMsgParseError) => {
::log::error!("RecvMsgOut::parse failed");
return;
}
Err(self::recv_helper::Error::RecvMsgTruncated) => {
::log::warn!("RecvMsgOut::parse failed: sockaddr or payload truncated");
return;
}
};
if self.config.statistics.active() {
let (statistics, extra_bytes) = if addr.is_ipv4() {
(&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4)
} else {
(&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6)
};
statistics
.bytes_received
.fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed);
statistics.requests.fetch_add(1, Ordering::Relaxed);
}
None
}
fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) {
fn handle_request(
&mut self,
request: Request,
src: CanonicalSocketAddr,
) -> Option<(CanonicalSocketAddr, Response)> {
let access_list_mode = self.config.access_list.mode;
match request {
Request::Connect(request) => {
let connection_id = self.validator.create_connection_id(src);
let response = Response::Connect(ConnectResponse {
connection_id,
connection_id: self.validator.create_connection_id(src),
transaction_id: request.transaction_id,
});
self.local_responses.push_back((response, src));
return Some((src, response));
}
Request::Announce(request) => {
if self
@ -470,23 +422,23 @@ impl SocketWorker {
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let worker_index =
SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash);
let response = self.shared_state.torrent_maps.announce(
&self.config,
&self.statistics_sender,
&mut self.rng,
&request,
src,
self.peer_valid_until,
);
if self
.request_sender
.try_send_to(worker_index, ConnectedRequest::Announce(request), src)
.is_err()
{
::log::warn!("request sender full, dropping request");
}
return Some((src, response));
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
self.local_responses.push_back((response, src))
return Some((src, response));
}
}
}
@ -495,24 +447,15 @@ impl SocketWorker {
.validator
.connection_id_valid(src, request.connection_id)
{
let split_requests = self.pending_scrape_responses.prepare_split_requests(
&self.config,
request,
self.pending_scrape_valid_until,
);
let response =
Response::Scrape(self.shared_state.torrent_maps.scrape(request, src));
for (swarm_worker_index, request) in split_requests {
if self
.request_sender
.try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src)
.is_err()
{
::log::warn!("request sender full, dropping request");
}
}
return Some((src, response));
}
}
}
None
}
}

View file

@ -12,6 +12,8 @@ use crate::config::Config;
/// HMAC (BLAKE3) based ConnectionId creator and validator
///
/// Method update_elapsed must be called at least once a minute.
///
/// The purpose of using ConnectionIds is to make IP spoofing costly, mainly to
/// prevent the tracker from being used as an amplification vector for DDoS
/// attacks. By including 32 bits of BLAKE3 keyed hash output in the Ids, an
@ -32,6 +34,7 @@ pub struct ConnectionValidator {
start_time: Instant,
max_connection_age: u64,
keyed_hasher: blake3::Hasher,
seconds_since_start: u32,
}
impl ConnectionValidator {
@ -49,11 +52,12 @@ impl ConnectionValidator {
keyed_hasher,
start_time: Instant::now(),
max_connection_age: config.cleaning.max_connection_age.into(),
seconds_since_start: 0,
})
}
pub fn create_connection_id(&mut self, source_addr: CanonicalSocketAddr) -> ConnectionId {
let elapsed = (self.start_time.elapsed().as_secs() as u32).to_ne_bytes();
let elapsed = (self.seconds_since_start).to_ne_bytes();
let hash = self.hash(elapsed, source_addr.get().ip());
@ -78,16 +82,23 @@ impl ConnectionValidator {
return false;
}
let tracker_elapsed = self.start_time.elapsed().as_secs();
let seconds_since_start = self.seconds_since_start as u64;
let client_elapsed = u64::from(u32::from_ne_bytes(elapsed));
let client_expiration_time = client_elapsed + self.max_connection_age;
// In addition to checking if the client connection is expired,
// disallow client_elapsed values that are in future and thus could not
// have been sent by the tracker. This prevents brute forcing with
// `u32::MAX` as 'elapsed' part of ConnectionId to find a hash that
// disallow client_elapsed values that are too far in future and thus
// could not have been sent by the tracker. This prevents brute forcing
// with `u32::MAX` as 'elapsed' part of ConnectionId to find a hash that
// works until the tracker is restarted.
(client_expiration_time > tracker_elapsed) & (client_elapsed <= tracker_elapsed)
let client_not_expired = client_expiration_time > seconds_since_start;
let client_elapsed_not_in_far_future = client_elapsed <= (seconds_since_start + 60);
client_not_expired & client_elapsed_not_in_far_future
}
pub fn update_elapsed(&mut self) {
self.seconds_since_start = self.start_time.elapsed().as_secs() as u32;
}
fn hash(&mut self, elapsed: [u8; 4], ip_addr: IpAddr) -> [u8; 4] {
@ -148,7 +159,6 @@ mod tests {
if max_connection_age == 0 {
quickcheck::TestResult::from_bool(!original_valid)
} else {
// Note: depends on that running this test takes less than a second
quickcheck::TestResult::from_bool(original_valid)
}
}

View file

@ -25,7 +25,6 @@ pub struct StatisticsCollector {
statistics: Statistics,
ip_version: IpVersion,
last_update: Instant,
pending_histograms: Vec<Histogram<u64>>,
last_complete_histogram: PeerHistogramStatistics,
}
@ -34,19 +33,13 @@ impl StatisticsCollector {
Self {
statistics,
last_update: Instant::now(),
pending_histograms: Vec::new(),
last_complete_histogram: Default::default(),
ip_version,
}
}
pub fn add_histogram(&mut self, config: &Config, histogram: Histogram<u64>) {
self.pending_histograms.push(histogram);
if self.pending_histograms.len() == config.swarm_workers {
self.last_complete_histogram =
PeerHistogramStatistics::new(self.pending_histograms.drain(..).sum());
}
pub fn add_histogram(&mut self, histogram: Histogram<u64>) {
self.last_complete_histogram = PeerHistogramStatistics::new(histogram);
}
pub fn collect_from_shared(
@ -60,8 +53,6 @@ impl StatisticsCollector {
let mut responses_error: usize = 0;
let mut bytes_received: usize = 0;
let mut bytes_sent: usize = 0;
let mut num_torrents: usize = 0;
let mut num_peers: usize = 0;
#[cfg(feature = "prometheus")]
let ip_version_prometheus_str = self.ip_version.prometheus_str();
@ -186,44 +177,37 @@ impl StatisticsCollector {
}
}
for (i, statistics) in self
.statistics
.swarm
.iter()
.map(|s| s.by_ip_version(self.ip_version))
.enumerate()
{
{
let n = statistics.torrents.load(Ordering::Relaxed);
let swarm_statistics = &self.statistics.swarm.by_ip_version(self.ip_version);
num_torrents += n;
let num_torrents = {
let num_torrents = swarm_statistics.torrents.load(Ordering::Relaxed);
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_torrents",
"ip_version" => ip_version_prometheus_str,
"worker_index" => i.to_string(),
)
.set(n as f64);
}
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_torrents",
"ip_version" => ip_version_prometheus_str,
)
.set(num_torrents as f64);
}
{
let n = statistics.peers.load(Ordering::Relaxed);
num_peers += n;
num_torrents
};
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_peers",
"ip_version" => ip_version_prometheus_str,
"worker_index" => i.to_string(),
)
.set(n as f64);
}
let num_peers = {
let num_peers = swarm_statistics.peers.load(Ordering::Relaxed);
#[cfg(feature = "prometheus")]
if config.statistics.run_prometheus_endpoint {
::metrics::gauge!(
"aquatic_peers",
"ip_version" => ip_version_prometheus_str,
)
.set(num_peers as f64);
}
}
num_peers
};
let elapsed = {
let now = Instant::now();

View file

@ -81,8 +81,8 @@ pub fn run_statistics_worker(
for message in statistics_receiver.try_iter() {
match message {
StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(&config, h),
StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(&config, h),
StatisticsMessage::Ipv4PeerHistogram(h) => ipv4_collector.add_histogram(h),
StatisticsMessage::Ipv6PeerHistogram(h) => ipv6_collector.add_histogram(h),
StatisticsMessage::PeerAdded(peer_id) => {
if process_peer_client_data {
peers
@ -249,7 +249,10 @@ fn print_to_stdout(config: &Config, statistics: &CollectedStatistics) {
" error: {:>10}",
statistics.responses_per_second_error
);
println!(" torrents: {:>10}", statistics.num_torrents);
println!(
" torrents: {:>10} (updated every {}s)",
statistics.num_torrents, config.cleaning.torrent_cleaning_interval
);
println!(
" peers: {:>10} (updated every {}s)",
statistics.num_peers, config.cleaning.torrent_cleaning_interval

View file

@ -1,149 +0,0 @@
mod storage;
use std::net::IpAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::{CanonicalSocketAddr, ValidUntil};
use crate::common::*;
use crate::config::Config;
use storage::TorrentMaps;
pub struct SwarmWorker {
pub config: Config,
pub state: State,
pub statistics: CachePaddedArc<IpVersionStatistics<SwarmWorkerStatistics>>,
pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
pub response_sender: ConnectedResponseSender,
pub statistics_sender: Sender<StatisticsMessage>,
pub worker_index: SwarmWorkerIndex,
}
impl SwarmWorker {
pub fn run(&mut self) -> anyhow::Result<()> {
let mut torrents = TorrentMaps::default();
let mut rng = SmallRng::from_entropy();
let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms);
let mut peer_valid_until = ValidUntil::new(
self.state.server_start_instant,
self.config.cleaning.max_peer_age,
);
let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval);
let statistics_update_interval = Duration::from_secs(self.config.statistics.interval);
let mut last_cleaning = Instant::now();
let mut last_statistics_update = Instant::now();
let mut iter_counter = 0usize;
loop {
if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) {
// It is OK to block here as long as we don't also do blocking
// sends in socket workers (doing both could cause a deadlock)
match (request, src.get().ip()) {
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
let response = torrents
.ipv4
.0
.entry(request.info_hash)
.or_default()
.announce(
&self.config,
&self.statistics_sender,
&mut rng,
&request,
ip.into(),
peer_valid_until,
);
// It doesn't matter which socket worker receives announce responses
self.response_sender
.send_to_any(src, ConnectedResponse::AnnounceIpv4(response))
.expect("swarm response channel is closed");
}
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
let response = torrents
.ipv6
.0
.entry(request.info_hash)
.or_default()
.announce(
&self.config,
&self.statistics_sender,
&mut rng,
&request,
ip.into(),
peer_valid_until,
);
// It doesn't matter which socket worker receives announce responses
self.response_sender
.send_to_any(src, ConnectedResponse::AnnounceIpv6(response))
.expect("swarm response channel is closed");
}
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
let response = torrents.ipv4.scrape(request);
self.response_sender
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
.expect("swarm response channel is closed");
}
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
let response = torrents.ipv6.scrape(request);
self.response_sender
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
.expect("swarm response channel is closed");
}
};
}
// Run periodic tasks
if iter_counter % 128 == 0 {
let now = Instant::now();
peer_valid_until = ValidUntil::new(
self.state.server_start_instant,
self.config.cleaning.max_peer_age,
);
if now > last_cleaning + cleaning_interval {
torrents.clean_and_update_statistics(
&self.config,
&self.state,
&self.statistics,
&self.statistics_sender,
&self.state.access_list,
);
last_cleaning = now;
}
if self.config.statistics.active()
&& now > last_statistics_update + statistics_update_interval
{
self.statistics
.ipv4
.torrents
.store(torrents.ipv4.num_torrents(), Ordering::Relaxed);
self.statistics
.ipv6
.torrents
.store(torrents.ipv6.num_torrents(), Ordering::Relaxed);
last_statistics_update = now;
}
}
iter_counter = iter_counter.wrapping_add(1);
}
}
}

View file

@ -25,10 +25,10 @@
<h2>IPv4</h2>
<table>
<caption>* Peer count is updated every { peer_update_interval } seconds</caption>
<caption>* Torrent/peer count is updated every { peer_update_interval } seconds</caption>
<tr>
<th scope="row">Number of torrents</th>
<td>{ ipv4.num_torrents }</td>
<td>{ ipv4.num_torrents } *</td>
</tr>
<tr>
<th scope="row">Number of peers</th>
@ -141,10 +141,10 @@
<h2>IPv6</h2>
<table>
<caption>* Peer count is updated every { peer_update_interval } seconds</caption>
<caption>* Torrent/peer count is updated every { peer_update_interval } seconds</caption>
<tr>
<th scope="row">Number of torrents</th>
<td>{ ipv6.num_torrents }</td>
<td>{ ipv6.num_torrents } *</td>
</tr>
<tr>
<th scope="row">Number of peers</th>

View file

@ -105,6 +105,10 @@ clients. Notes:
`aquatic_ws` has not been tested as much as `aquatic_udp`, but likely works
fine in production.
## Architectural overview
![Architectural overview of aquatic](../../documents/aquatic-architecture-2024.svg)
## Copyright and license
Copyright (c) Joakim Frostegård

View file

@ -0,0 +1,106 @@
2024-02-10 Joakim Frostegård
# UDP BitTorrent tracker throughput comparison
This is a performance comparison of several UDP BitTorrent tracker implementations.
Benchmarks were run using [aquatic_bencher](../crates/bencher), with `--cpu-mode subsequent-one-per-pair`.
## Software and hardware
### Tracker implementations
| Name | Commit |
|---------------|---------|
| [aquatic_udp] | 21a5301 |
| [opentracker] | 110868e |
| [chihaya] | 2f79440 |
[aquatic_udp]: ../crates/udp
[opentracker]: http://erdgeist.org/arts/software/opentracker/
[chihaya]: https://github.com/chihaya/chihaya
### OS and compilers
| Name | Version |
|--------|---------|
| Debian | 12.4 |
| Linux | 6.5.10 |
| rustc | 1.76.0 |
| GCC | 12.2.0 |
| go | 1.19.8 |
### Hardware
Hetzner CCX63: 48 dedicated vCPUs (AMD Milan Epyc 7003)
## Results
![UDP BitTorrent tracker throughput](./aquatic-udp-load-test-2024-02-10.png)
<table>
<caption>
<strong>UDP BitTorrent tracker troughput</strong>
<p>Average responses per second, best result.</p>
</caption>
<thead>
<tr>
<th>CPU cores</th>
<th>aquatic_udp (mio)</th>
<th>aquatic_udp (io_uring)</th>
<th>opentracker</th>
<th>chihaya</th>
</tr>
</thead>
<tbody>
<tr>
<th>1</th>
<td><span title="socket workers: 1, avg cpu utilization: 95.3%">186,939</span></td>
<td><span title="socket workers: 1, avg cpu utilization: 95.3%">226,065</span></td>
<td><span title="workers: 1, avg cpu utilization: 95.3%">190,540</span></td>
<td><span title="avg cpu utilization: 95.3%">55,989</span></td>
</tr>
<tr>
<th>2</th>
<td><span title="socket workers: 2, avg cpu utilization: 190%">371,478</span></td>
<td><span title="socket workers: 2, avg cpu utilization: 190%">444,353</span></td>
<td><span title="workers: 2, avg cpu utilization: 190%">379,623</span></td>
<td><span title="avg cpu utilization: 186%">111,226</span></td>
</tr>
<tr>
<th>4</th>
<td><span title="socket workers: 4, avg cpu utilization: 381%">734,709</span></td>
<td><span title="socket workers: 4, avg cpu utilization: 381%">876,642</span></td>
<td><span title="workers: 4, avg cpu utilization: 381%">748,401</span></td>
<td><span title="avg cpu utilization: 300%">136,983</span></td>
</tr>
<tr>
<th>6</th>
<td><span title="socket workers: 6, avg cpu utilization: 565%">1,034,804</span></td>
<td><span title="socket workers: 6, avg cpu utilization: 572%">1,267,006</span></td>
<td><span title="workers: 6, avg cpu utilization: 567%">901,600</span></td>
<td><span title="avg cpu utilization: 414%">131,827</span></td>
</tr>
<tr>
<th>8</th>
<td><span title="socket workers: 8, avg cpu utilization: 731%">1,296,693</span></td>
<td><span title="socket workers: 8, avg cpu utilization: 731%">1,521,113</span></td>
<td><span title="workers: 8, avg cpu utilization: 756%">1,170,928</span></td>
<td><span title="avg cpu utilization: 462%">131,779</span></td>
</tr>
<tr>
<th>12</th>
<td><span title="socket workers: 12, avg cpu utilization: 1064%">1,871,353</span></td>
<td><span title="socket workers: 12, avg cpu utilization: 957%">1,837,223</span></td>
<td><span title="workers: 12, avg cpu utilization: 1127%">1,675,059</span></td>
<td><span title="avg cpu utilization: 509%">130,942</span></td>
</tr>
<tr>
<th>16</th>
<td><span title="socket workers: 16, avg cpu utilization: 1126%">2,037,713</span></td>
<td><span title="socket workers: 16, avg cpu utilization: 1109%">2,258,321</span></td>
<td><span title="workers: 16, avg cpu utilization: 1422%">1,645,828</span></td>
<td><span title="avg cpu utilization: 487%">127,256</span></td>
</tr>
</tbody>
</table>

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB