Merge pull request #12 from greatest-ape/glommio-fixes

aquatic_udp glommio implementation improvements
This commit is contained in:
Joakim Frostegård 2021-10-25 00:27:01 +02:00 committed by GitHub
commit d948843191
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 666 additions and 346 deletions

View file

@ -1,8 +1,10 @@
# Container image that runs your code
# Not used by Github action, but can be used to run test locally:
# 1. docker build -t aquatic ./path/to/Dockerfile
# 2. docker run aquatic
# 3. On failure, run `docker rmi aquatic -f` and go back to step 1
FROM rust:bullseye
# Copies your code file from your action repository to the filesystem path `/` of the container
COPY entrypoint.sh /entrypoint.sh
# Code file to execute when the docker container starts up (`entrypoint.sh`)
ENTRYPOINT ["/entrypoint.sh"]

View file

@ -10,5 +10,7 @@ outputs:
wss_ipv4:
description: 'WSS IPv4 status'
runs:
using: 'docker'
image: 'Dockerfile'
using: 'composite'
steps:
- run: $GITHUB_ACTION_PATH/entrypoint.sh
shell: bash

View file

@ -5,11 +5,6 @@
#
# IPv6 is unfortunately disabled by default in Docker
# (see sysctl net.ipv6.conf.lo.disable_ipv6)
#
# When testing locally, use:
# 1. docker build -t aquatic ./path/to/Dockerfile
# 2. docker run aquatic
# 3. On failure, run `docker rmi aquatic -f` and go back to step 1
set -e
@ -21,6 +16,8 @@ else
SUDO=""
fi
ulimit -a
$SUDO apt-get update
$SUDO apt-get install -y cmake libssl-dev screen rtorrent mktorrent ssl-cert ca-certificates curl golang
@ -43,6 +40,9 @@ else
cd "$GITHUB_WORKSPACE"
fi
echo "last aquatic commits:"
git log --oneline -3
# Setup bogus TLS certificate
$SUDO echo "127.0.0.1 example.com" >> /etc/hosts

View file

@ -11,9 +11,8 @@ env:
jobs:
build:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v2
- name: Build

View file

@ -10,9 +10,13 @@ jobs:
test-transfer-http:
runs-on: ubuntu-latest
name: "Test BitTorrent file transfer over HTTP (with and without TLS), UDP and WSS"
timeout-minutes: 20
container:
image: rust:1-bullseye
options: --ulimit memlock=524288:524288
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Test file transfers
uses: ./.github/actions/test-transfer
id: test_transfer
id: test_transfer

View file

@ -91,6 +91,8 @@ except that it:
Supports IPv4 and IPv6 (BitTorrent UDP protocol doesn't support IPv6 very well,
however.)
For optimal performance, enable setting of core affinities in configuration.
#### Benchmarks
[opentracker]: http://erdgeist.org/arts/software/opentracker/
@ -110,6 +112,22 @@ Server responses per second, best result in bold:
Please refer to `documents/aquatic-udp-load-test-2021-08-19.pdf` for more details.
#### Alternative implementation using io_uring
[io_uring]: https://en.wikipedia.org/wiki/Io_uring
[glommio]: https://github.com/DataDog/glommio
There is an alternative implementation that utilizes [io_uring] by running on
[glommio]. It only runs on Linux and requires a recent kernel (version 5.1 or later).
In some cases, it performs even better than the cross-platform implementation.
To use it, pass the `with-glommio` feature when building, e.g.:
```sh
cargo build -p aquatic_udp --features "with-glommio" --no-default-features
./target/release/aquatic_udp
```
### aquatic_http: HTTP BitTorrent tracker
Aims for compatibility with the HTTP BitTorrent protocol, as described

10
TODO.md
View file

@ -1,13 +1,13 @@
# TODO
* aquatic_udp glommio
* update access lists
* clean connections
* update peer valid until
* privdrop
* Add to file transfer CI
* consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest)
containing TransactionId and BTreeMap<usize, InfoHash>, and same for
response
* access lists:
* use arc-swap Cache
* use arc-swap Cache?
* add CI tests
* aquatic_ws: should it send back error on message parse error, or does that

View file

@ -42,7 +42,7 @@ impl Default for AccessListConfig {
}
}
#[derive(Default)]
#[derive(Default, Clone)]
pub struct AccessList(HashSet<[u8; 20]>);
impl AccessList {
@ -51,6 +51,20 @@ impl AccessList {
Ok(())
}
pub fn create_from_path(path: &PathBuf) -> anyhow::Result<Self> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut new_list = Self::default();
for line in reader.lines() {
new_list.insert_from_line(&line?)?;
}
Ok(new_list)
}
pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool {
match mode {
AccessListMode::White => self.0.contains(info_hash),
@ -69,16 +83,7 @@ pub type AccessListArcSwap = ArcSwap<AccessList>;
impl AccessListQuery for AccessListArcSwap {
fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut new_list = HashSet::new();
for line in reader.lines() {
new_list.insert(parse_info_hash(&line?)?);
}
self.store(Arc::new(AccessList(new_list)));
self.store(Arc::new(AccessList::create_from_path(path)?));
Ok(())
}

View file

@ -15,7 +15,9 @@ path = "src/lib/lib.rs"
name = "aquatic_udp"
[features]
default = ["with-mio"]
with-glommio = ["glommio", "futures-lite"]
with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"]
[dependencies]
anyhow = "1"
@ -24,20 +26,23 @@ aquatic_common = "0.1.0"
aquatic_udp_protocol = "0.1.0"
cfg-if = "1"
core_affinity = "0.5"
crossbeam-channel = "0.5"
hashbrown = "0.11.2"
hex = "0.4"
histogram = "0.6"
indexmap = "1"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] }
parking_lot = "0.11"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
socket2 = { version = "0.4.1", features = ["all"] }
# mio
crossbeam-channel = { version = "0.5", optional = true }
histogram = { version = "0.6", optional = true }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true }
socket2 = { version = "0.4.1", features = ["all"], optional = true }
# glommio
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
futures-lite = { version = "1", optional = true }

View file

@ -1,10 +1,70 @@
use std::net::SocketAddr;
use rand::rngs::SmallRng;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_common::extract_response_peers;
use crate::common::*;
pub fn handle_announce_request<I: Ip>(
#[derive(Debug)]
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape {
request: ScrapeRequest,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
#[derive(Debug)]
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape {
response: ScrapeResponse,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape { response, .. } => Response::Scrape(response),
}
}
}
pub fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMaps,
request: AnnounceRequest,
src: SocketAddr,
peer_valid_until: ValidUntil,
) -> AnnounceResponse {
match convert_ipv4_mapped_ipv6(src.ip()) {
IpAddr::V4(ip) => handle_announce_request_inner(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request_inner(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
),
}
}
fn handle_announce_request_inner<I: Ip>(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMap<I>,
@ -83,6 +143,57 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize {
}
}
#[inline]
pub fn handle_scrape_request(
torrents: &mut TorrentMaps,
src: SocketAddr,
request: ScrapeRequest,
) -> ScrapeResponse {
const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0);
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(request.info_hashes.len());
let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
if peer_ip.is_ipv4() {
for info_hash in request.info_hashes.iter() {
if let Some(torrent_data) = torrents.ipv4.get(info_hash) {
stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32,
));
} else {
stats.push(EMPTY_STATS);
}
}
} else {
for info_hash in request.info_hashes.iter() {
if let Some(torrent_data) = torrents.ipv6.get(info_hash) {
stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32,
));
} else {
stats.push(EMPTY_STATS);
}
}
}
ScrapeResponse {
transaction_id: request.transaction_id,
torrent_stats: stats,
}
}
#[inline(always)]
const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics {
TorrentScrapeStatistics {
seeders: NumberOfPeers(seeders),
completed: NumberOfDownloads(0), // No implementation planned
leechers: NumberOfPeers(leechers),
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;

View file

@ -11,7 +11,7 @@ pub use aquatic_udp_protocol::*;
use crate::config::Config;
pub mod announce;
pub mod handlers;
pub mod network;
pub const MAX_PACKET_SIZE: usize = 4096;

View file

@ -17,7 +17,7 @@ impl ConnectionMap {
self.0.insert((connection_id, socket_addr), valid_until);
}
pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool {
pub fn contains(&self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool {
self.0.contains_key(&(connection_id, socket_addr))
}

View file

@ -18,7 +18,9 @@ pub struct Config {
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
#[cfg(feature = "with-mio")]
pub handlers: HandlerConfig,
#[cfg(feature = "with-mio")]
pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
@ -52,6 +54,7 @@ pub struct NetworkConfig {
/// $ sudo sysctl -w net.core.rmem_max=104857600
/// $ sudo sysctl -w net.core.rmem_default=104857600
pub socket_recv_buffer_size: usize,
#[cfg(feature = "with-mio")]
pub poll_event_capacity: usize,
}
@ -66,6 +69,7 @@ pub struct ProtocolConfig {
pub peer_announce_interval: i32,
}
#[cfg(feature = "with-mio")]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct HandlerConfig {
@ -75,6 +79,7 @@ pub struct HandlerConfig {
pub channel_recv_timeout_microseconds: u64,
}
#[cfg(feature = "with-mio")]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct StatisticsConfig {
@ -119,7 +124,9 @@ impl Default for Config {
log_level: LogLevel::Error,
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
#[cfg(feature = "with-mio")]
handlers: HandlerConfig::default(),
#[cfg(feature = "with-mio")]
statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
@ -133,8 +140,9 @@ impl Default for NetworkConfig {
fn default() -> Self {
Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)),
poll_event_capacity: 4096,
socket_recv_buffer_size: 4096 * 128,
#[cfg(feature = "with-mio")]
poll_event_capacity: 4096,
}
}
}
@ -149,6 +157,7 @@ impl Default for ProtocolConfig {
}
}
#[cfg(feature = "with-mio")]
impl Default for HandlerConfig {
fn default() -> Self {
Self {
@ -158,6 +167,7 @@ impl Default for HandlerConfig {
}
}
#[cfg(feature = "with-mio")]
impl Default for StatisticsConfig {
fn default() -> Self {
Self { interval: 0 }

View file

@ -10,23 +10,36 @@ use crate::config::Config;
pub async fn update_access_list(config: Config, access_list: Rc<RefCell<AccessList>>) {
if config.access_list.mode.is_on() {
let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap();
match BufferedFile::open(config.access_list.path).await {
Ok(file) => {
let mut reader = StreamReaderBuilder::new(file).build();
let mut reader = StreamReaderBuilder::new(access_list_file).build();
loop {
let mut buf = String::with_capacity(42);
loop {
let mut buf = String::with_capacity(42);
match reader.read_line(&mut buf).await {
Ok(_) => {
if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) {
::log::error!(
"Couln't parse access list line '{}': {:?}",
buf,
err
);
}
}
Err(err) => {
::log::error!("Couln't read access list line {:?}", err);
match reader.read_line(&mut buf).await {
Ok(_) => {
access_list.borrow_mut().insert_from_line(&buf);
}
Err(err) => {
break;
break;
}
}
yield_if_needed().await;
}
}
yield_if_needed().await;
}
Err(err) => {
::log::error!("Couldn't open access list file: {:?}", err)
}
};
}
}

View file

@ -1,5 +1,5 @@
use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Duration;
@ -10,15 +10,17 @@ use glommio::{enclose, prelude::*};
use rand::prelude::SmallRng;
use rand::SeedableRng;
use crate::common::announce::handle_announce_request;
use crate::common::handlers::handle_announce_request;
use crate::common::handlers::*;
use crate::common::*;
use crate::config::Config;
use crate::glommio::common::update_access_list;
pub async fn run_request_worker(
config: Config,
request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>,
access_list: AccessList,
) {
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap();
@ -26,7 +28,7 @@ pub async fn run_request_worker(
let response_senders = Rc::new(response_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
let access_list = Rc::new(RefCell::new(AccessList::default()));
let access_list = Rc::new(RefCell::new(access_list));
// Periodically clean torrents and update access list
TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || {
@ -61,39 +63,52 @@ pub async fn run_request_worker(
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
response_senders: Rc<Senders<(AnnounceResponse, SocketAddr)>>,
response_senders: Rc<Senders<(ConnectedResponse, SocketAddr)>>,
mut stream: S,
) where
S: Stream<Item = (usize, AnnounceRequest, SocketAddr)> + ::std::marker::Unpin,
S: Stream<Item = (usize, ConnectedRequest, SocketAddr)> + ::std::marker::Unpin,
{
let mut rng = SmallRng::from_entropy();
// Needs to be updated periodically: use timer?
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
let max_peer_age = config.cleaning.max_peer_age;
let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age)));
while let Some((producer_index, request, addr)) = stream.next().await {
let response = match addr.ip() {
IpAddr::V4(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut().ipv4,
TimerActionRepeat::repeat(enclose!((peer_valid_until) move || {
enclose!((peer_valid_until) move || async move {
*peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age);
Some(Duration::from_secs(1))
})()
}));
while let Some((producer_index, request, src)) = stream.next().await {
let response = match request {
ConnectedRequest::Announce(request) => {
ConnectedResponse::Announce(handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
request,
src,
peer_valid_until.borrow().to_owned(),
))
}
ConnectedRequest::Scrape {
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut().ipv6,
request,
ip,
peer_valid_until,
),
original_indices,
} => {
let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request);
ConnectedResponse::Scrape {
response,
original_indices,
}
}
};
::log::debug!("preparing to send response to channel: {:?}", response);
if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) {
if let Err(err) = response_senders.try_send_to(producer_index, (response, src)) {
::log::warn!("response_sender.try_send: {:?}", err);
}

View file

@ -1,14 +1,11 @@
//! Work-in-progress glommio (io_uring) implementation
//!
//! * Doesn't support scrape requests
//! * Currently not faster than mio implementation
use std::sync::{atomic::AtomicUsize, Arc};
use aquatic_common::access_list::AccessList;
use glommio::channels::channel_mesh::MeshBuilder;
use glommio::prelude::*;
use crate::config::Config;
use crate::drop_privileges_after_socket_binding;
mod common;
pub mod handlers;
@ -18,11 +15,17 @@ pub const SHARED_CHANNEL_SIZE: usize = 4096;
pub fn run(config: Config) -> anyhow::Result<()> {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: config.core_affinity.offset }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset,
});
}
let access_list = if config.access_list.mode.is_on() {
AccessList::create_from_path(&config.access_list.path).expect("Load access list")
} else {
AccessList::default()
};
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@ -37,6 +40,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let access_list = access_list.clone();
let mut builder = LocalExecutorBuilder::default();
@ -50,6 +54,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
access_list,
)
.await
});
@ -61,20 +66,30 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let config = config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let access_list = access_list.clone();
let mut builder = LocalExecutorBuilder::default();
if config.core_affinity.set_affinities {
builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i);
builder =
builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i);
}
let executor = builder.spawn(|| async move {
handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await
handlers::run_request_worker(
config,
request_mesh_builder,
response_mesh_builder,
access_list,
)
.await
});
executors.push(executor);
}
drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap();
for executor in executors {
executor
.expect("failed to spawn local executor")

View file

@ -1,3 +1,5 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
@ -5,25 +7,102 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_unbounded, LocalSender};
use glommio::enclose;
use glommio::net::UdpSocket;
use glommio::prelude::*;
use glommio::timer::TimerActionRepeat;
use hashbrown::HashMap;
use rand::prelude::{Rng, SeedableRng, StdRng};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use super::common::update_access_list;
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;
const PENDING_SCRAPE_MAX_WAIT: u64 = 30;
struct PendingScrapeResponse {
pending_worker_responses: usize,
valid_until: ValidUntil,
stats: BTreeMap<usize, TorrentScrapeStatistics>,
}
#[derive(Default)]
struct PendingScrapeResponses(HashMap<TransactionId, PendingScrapeResponse>);
impl PendingScrapeResponses {
fn prepare(
&mut self,
transaction_id: TransactionId,
pending_worker_responses: usize,
valid_until: ValidUntil,
) {
let pending = PendingScrapeResponse {
pending_worker_responses,
valid_until,
stats: BTreeMap::new(),
};
self.0.insert(transaction_id, pending);
}
fn add_and_get_finished(
&mut self,
mut response: ScrapeResponse,
mut original_indices: Vec<usize>,
) -> Option<ScrapeResponse> {
let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) {
r.pending_worker_responses -= 1;
r.stats.extend(
original_indices
.drain(..)
.zip(response.torrent_stats.drain(..)),
);
r.pending_worker_responses == 0
} else {
::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map");
false
};
if finished {
let PendingScrapeResponse { stats, .. } =
self.0.remove(&response.transaction_id).unwrap();
Some(ScrapeResponse {
transaction_id: response.transaction_id,
torrent_stats: stats.into_values().collect(),
})
} else {
None
}
}
fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.valid_until.0 > now);
self.0.shrink_to_fit();
}
}
pub async fn run_socket_worker(
config: Config,
request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
access_list: AccessList,
) {
let (local_sender, local_receiver) = new_unbounded();
@ -40,50 +119,101 @@ pub async fn run_socket_worker(
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap();
spawn_local(read_requests(
let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default()));
// Periodically clean pending_scrape_responses
TimerActionRepeat::repeat(enclose!((config, pending_scrape_responses) move || {
enclose!((config, pending_scrape_responses) move || async move {
pending_scrape_responses.borrow_mut().clean();
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
spawn_local(enclose!((pending_scrape_responses) read_requests(
config.clone(),
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
))
pending_scrape_responses,
access_list,
)))
.detach();
for (_, receiver) in response_receivers.streams().into_iter() {
spawn_local(send_responses(
spawn_local(enclose!((pending_scrape_responses) handle_shared_responses(
socket.clone(),
receiver.map(|(response, addr)| (response.into(), addr)),
))
pending_scrape_responses,
receiver,
)))
.detach();
}
send_responses(socket, local_receiver.stream()).await;
send_local_responses(socket, local_receiver.stream()).await;
}
async fn read_requests(
config: Config,
request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>,
request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>,
response_consumer_index: usize,
local_sender: LocalSender<(Response, SocketAddr)>,
socket: Rc<UdpSocket>,
pending_scrape_responses: Rc<RefCell<PendingScrapeResponses>>,
access_list: AccessList,
) {
let mut rng = StdRng::from_entropy();
let access_list_mode = config.access_list.mode;
// Needs to be updated periodically: use timer?
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
// Needs to be updated periodically: use timer?
let access_list = AccessList::default();
// Needs to be cleaned periodically: use timer?
let mut connections = ConnectionMap::default();
let max_connection_age = config.cleaning.max_connection_age;
let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age)));
let pending_scrape_valid_until =
Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT)));
let access_list = Rc::new(RefCell::new(access_list));
let connections = Rc::new(RefCell::new(ConnectionMap::default()));
let mut buf = [0u8; 2048];
// Periodically update connection_valid_until
TimerActionRepeat::repeat(enclose!((connection_valid_until) move || {
enclose!((connection_valid_until) move || async move {
*connection_valid_until.borrow_mut() = ValidUntil::new(max_connection_age);
Some(Duration::from_secs(1))
})()
}));
// Periodically update pending_scrape_valid_until
TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || {
enclose!((pending_scrape_valid_until) move || async move {
*pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT);
Some(Duration::from_secs(10))
})()
}));
// Periodically update access list
TimerActionRepeat::repeat(enclose!((config, access_list) move || {
enclose!((config, access_list) move || async move {
update_access_list(config.clone(), access_list.clone()).await;
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
// Periodically clean connections
TimerActionRepeat::repeat(enclose!((config, connections) move || {
enclose!((config, connections) move || async move {
connections.borrow_mut().clean();
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
let mut buf = [0u8; MAX_PACKET_SIZE];
loop {
match socket.recv_from(&mut buf).await {
@ -96,7 +226,11 @@ async fn read_requests(
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.insert(connection_id, src, valid_until);
connections.borrow_mut().insert(
connection_id,
src,
connection_valid_until.borrow().to_owned(),
);
let response = Response::Connect(ConnectResponse {
connection_id,
@ -106,14 +240,21 @@ async fn read_requests(
local_sender.try_send((response, src)).unwrap();
}
Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) {
if access_list.allows(access_list_mode, &request.info_hash.0) {
if connections.borrow().contains(request.connection_id, src) {
if access_list
.borrow()
.allows(access_list_mode, &request.info_hash.0)
{
let request_consumer_index =
(request.info_hash.0[0] as usize) % config.request_workers;
calculate_request_consumer_index(&config, request.info_hash);
if let Err(err) = request_senders.try_send_to(
request_consumer_index,
(response_consumer_index, request, src),
(
response_consumer_index,
ConnectedRequest::Announce(request),
src,
),
) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
@ -127,14 +268,51 @@ async fn read_requests(
}
}
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Scrape requests not supported".into(),
});
Ok(Request::Scrape(ScrapeRequest {
transaction_id,
connection_id,
info_hashes,
})) => {
if connections.borrow().contains(connection_id, src) {
let mut consumer_requests: HashMap<usize, (ScrapeRequest, Vec<usize>)> =
HashMap::new();
local_sender.try_send((response, src)).unwrap();
for (i, info_hash) in info_hashes.into_iter().enumerate() {
let (req, indices) = consumer_requests
.entry(calculate_request_consumer_index(&config, info_hash))
.or_insert_with(|| {
let request = ScrapeRequest {
transaction_id: transaction_id,
connection_id: connection_id,
info_hashes: Vec::new(),
};
(request, Vec::new())
});
req.info_hashes.push(info_hash);
indices.push(i);
}
pending_scrape_responses.borrow_mut().prepare(
transaction_id,
consumer_requests.len(),
pending_scrape_valid_until.borrow().to_owned(),
);
for (consumer_index, (request, original_indices)) in consumer_requests {
let request = ConnectedRequest::Scrape {
request,
original_indices,
};
if let Err(err) = request_senders.try_send_to(
consumer_index,
(response_consumer_index, request, src),
) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}
}
}
Err(err) => {
@ -146,7 +324,7 @@ async fn read_requests(
err,
} = err
{
if connections.contains(connection_id, src) {
if connections.borrow().contains(connection_id, src) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
@ -167,32 +345,75 @@ async fn read_requests(
}
}
async fn send_responses<S>(socket: Rc<UdpSocket>, mut stream: S)
async fn handle_shared_responses<S>(
socket: Rc<UdpSocket>,
pending_scrape_responses: Rc<RefCell<PendingScrapeResponses>>,
mut stream: S,
) where
S: Stream<Item = (ConnectedResponse, SocketAddr)> + ::std::marker::Unpin,
{
let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]);
while let Some((response, addr)) = stream.next().await {
let opt_response = match response {
ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)),
ConnectedResponse::Scrape {
response,
original_indices,
} => pending_scrape_responses
.borrow_mut()
.add_and_get_finished(response, original_indices)
.map(|response| (Response::Scrape(response), addr)),
};
if let Some((response, addr)) = opt_response {
write_response_to_socket(&socket, &mut buf, addr, response).await;
}
yield_if_needed().await;
}
}
async fn send_local_responses<S>(socket: Rc<UdpSocket>, mut stream: S)
where
S: Stream<Item = (Response, SocketAddr)> + ::std::marker::Unpin,
{
let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]);
while let Some((response, src)) = stream.next().await {
buf.set_position(0);
::log::debug!("preparing to send response: {:?}", response.clone());
response
.write(&mut buf, ip_version_from_ip(src.ip()))
.expect("write response");
let position = buf.position() as usize;
if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await {
::log::info!("send_to failed: {:?}", err);
}
while let Some((response, addr)) = stream.next().await {
write_response_to_socket(&socket, &mut buf, addr, response).await;
yield_if_needed().await;
}
}
async fn write_response_to_socket(
socket: &Rc<UdpSocket>,
buf: &mut Cursor<&mut [u8]>,
addr: SocketAddr,
response: Response,
) {
buf.set_position(0);
::log::debug!("preparing to send response: {:?}", response.clone());
response
.write(buf, ip_version_from_ip(addr.ip()))
.expect("write response");
let position = buf.position() as usize;
if let Err(err) = socket.send_to(&buf.get_ref()[..position], addr).await {
::log::info!("send_to failed: {:?}", err);
}
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers
}
fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,

View file

@ -1,12 +1,22 @@
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use cfg_if::cfg_if;
pub mod common;
pub mod config;
#[cfg(all(feature = "with-glommio", target_os = "linux"))]
pub mod glommio;
#[cfg(feature = "with-mio")]
pub mod mio;
use config::Config;
use privdrop::PrivDrop;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
@ -19,3 +29,35 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
}
}
}
fn drop_privileges_after_socket_binding(
config: &Config,
num_bound_sockets: Arc<AtomicUsize>,
) -> anyhow::Result<()> {
if config.privileges.drop_privileges {
let mut counter = 0usize;
loop {
let sockets = num_bound_sockets.load(Ordering::SeqCst);
if sockets == config.socket_workers {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
}
}
}
Ok(())
}

View file

@ -4,25 +4,6 @@ use std::sync::{atomic::AtomicUsize, Arc};
use crate::common::*;
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
}
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
}
}
}
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,

View file

@ -1,20 +1,16 @@
use std::net::SocketAddr;
use std::time::Duration;
use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender};
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_udp_protocol::*;
use crate::common::handlers::*;
use crate::config::Config;
use crate::mio::common::*;
mod announce;
mod scrape;
use announce::handle_announce_requests;
use scrape::handle_scrape_requests;
pub fn run_request_worker(
state: State,
config: Config,
@ -59,8 +55,8 @@ pub fn run_request_worker(
};
match request {
ConnectedRequest::Announce(r) => announce_requests.push((r, src)),
ConnectedRequest::Scrape(r) => scrape_requests.push((r, src)),
ConnectedRequest::Announce(request) => announce_requests.push((request, src)),
ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)),
}
}
@ -68,15 +64,29 @@ pub fn run_request_worker(
{
let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock());
handle_announce_requests(
&config,
&mut torrents,
&mut small_rng,
announce_requests.drain(..),
&mut responses,
);
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses);
responses.extend(announce_requests.drain(..).map(|(request, src)| {
let response = handle_announce_request(
&config,
&mut small_rng,
&mut torrents,
request,
src,
peer_valid_until,
);
(ConnectedResponse::Announce(response), src)
}));
responses.extend(scrape_requests.drain(..).map(|(request, src)| {
let response = ConnectedResponse::Scrape {
response: handle_scrape_request(&mut torrents, src, request),
original_indices: Vec::new(),
};
(response, src)
}));
}
for r in responses.drain(..) {

View file

@ -1,49 +0,0 @@
use std::net::{IpAddr, SocketAddr};
use std::vec::Drain;
use parking_lot::MutexGuard;
use rand::rngs::SmallRng;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*;
use crate::common::announce::handle_announce_request;
use crate::common::*;
use crate::config::Config;
use crate::mio::common::*;
#[inline]
pub fn handle_announce_requests(
config: &Config,
torrents: &mut MutexGuard<TorrentMaps>,
rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request, src)| {
let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
let response = match peer_ip {
IpAddr::V4(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
),
};
(ConnectedResponse::Announce(response), src)
}));
}

View file

@ -1,66 +0,0 @@
use std::net::SocketAddr;
use std::vec::Drain;
use parking_lot::MutexGuard;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*;
use crate::mio::common::*;
use crate::common::*;
#[inline]
pub fn handle_scrape_requests(
torrents: &mut MutexGuard<TorrentMaps>,
requests: Drain<(ScrapeRequest, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let empty_stats = create_torrent_scrape_statistics(0, 0);
responses.extend(requests.map(|(request, src)| {
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(request.info_hashes.len());
let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
if peer_ip.is_ipv4() {
for info_hash in request.info_hashes.iter() {
if let Some(torrent_data) = torrents.ipv4.get(info_hash) {
stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32,
));
} else {
stats.push(empty_stats);
}
}
} else {
for info_hash in request.info_hashes.iter() {
if let Some(torrent_data) = torrents.ipv6.get(info_hash) {
stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32,
));
} else {
stats.push(empty_stats);
}
}
}
let response = ConnectedResponse::Scrape(ScrapeResponse {
transaction_id: request.transaction_id,
torrent_stats: stats,
});
(response, src)
}));
}
#[inline(always)]
fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics {
TorrentScrapeStatistics {
seeders: NumberOfPeers(seeders),
completed: NumberOfDownloads(0), // No implementation planned
leechers: NumberOfPeers(leechers),
}
}

View file

@ -2,15 +2,11 @@ use std::thread::Builder;
use std::time::Duration;
use std::{
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
sync::{atomic::AtomicUsize, Arc},
};
use anyhow::Context;
use crossbeam_channel::unbounded;
use privdrop::PrivDrop;
pub mod common;
pub mod handlers;
@ -20,14 +16,15 @@ pub mod tasks;
use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery};
use crate::config::Config;
use crate::drop_privileges_after_socket_binding;
use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: config.core_affinity.offset }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset,
});
}
let state = State::default();
@ -38,30 +35,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?;
if config.privileges.drop_privileges {
let mut counter = 0usize;
loop {
let sockets = num_bound_sockets.load(Ordering::SeqCst);
if sockets == config.socket_workers {
PrivDrop::default()
.chroot(config.privileges.chroot_path.clone())
.user(config.privileges.user.clone())
.apply()?;
break;
}
::std::thread::sleep(Duration::from_millis(10));
counter += 1;
if counter == 500 {
panic!("Sockets didn't bind in time for privilege drop.");
}
}
}
drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap();
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
@ -93,9 +67,9 @@ pub fn start_workers(
.name(format!("request-{:02}", i + 1))
.spawn(move || {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: config.core_affinity.offset + 1 + i }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset + 1 + i,
});
}
handlers::run_request_worker(state, config, request_receiver, response_sender)
@ -114,9 +88,9 @@ pub fn start_workers(
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: config.core_affinity.offset + 1 + config.request_workers + i }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset + 1 + config.request_workers + i,
});
}
network::run_socket_worker(
@ -139,9 +113,9 @@ pub fn start_workers(
.name("statistics-collector".to_string())
.spawn(move || {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: config.core_affinity.offset }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: config.core_affinity.offset,
});
}
loop {

View file

@ -16,6 +16,7 @@ use socket2::{Domain, Protocol, Socket, Type};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;
@ -191,9 +192,12 @@ fn read_requests(
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
if let Err(err) =
request_sender.try_send((ConnectedRequest::Scrape(request), src))
{
let request = ConnectedRequest::Scrape {
request,
original_indices: Vec::new(),
};
if let Err(err) = request_sender.try_send((request, src)) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}

View file

@ -6,9 +6,9 @@ use indicatif::ProgressIterator;
use rand::Rng;
use rand_distr::Pareto;
use aquatic_udp::common::handlers::*;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use crate::common::*;
use crate::config::BenchConfig;

View file

@ -6,9 +6,9 @@ use indicatif::ProgressIterator;
use rand::Rng;
use rand_distr::Pareto;
use aquatic_udp::common::handlers::*;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use crate::common::*;
use crate::config::BenchConfig;
@ -42,15 +42,20 @@ pub fn bench_scrape_handler(
for round in (0..bench_config.num_rounds).progress_with(pb) {
for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk {
request_sender
.send((ConnectedRequest::Scrape(request.clone()), *src))
.unwrap();
let request = ConnectedRequest::Scrape {
request: request.clone(),
original_indices: Vec::new(),
};
request_sender.send((request, *src)).unwrap();
}
while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() {
while let Ok((ConnectedResponse::Scrape { response, .. }, _)) =
response_receiver.try_recv()
{
num_responses += 1;
if let Some(stat) = r.torrent_stats.last() {
if let Some(stat) = response.torrent_stats.last() {
dummy ^= stat.leechers.0;
}
}
@ -59,10 +64,10 @@ pub fn bench_scrape_handler(
let total = bench_config.num_scrape_requests * (round + 1);
while num_responses < total {
if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() {
if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() {
num_responses += 1;
if let Some(stat) = r.torrent_stats.last() {
if let Some(stat) = response.torrent_stats.last() {
dummy ^= stat.leechers.0;
}
}

View file

@ -34,13 +34,12 @@ impl aquatic_cli_helpers::Config for Config {}
fn run(config: Config) -> ::anyhow::Result<()> {
let affinity_max = core_affinity::get_core_ids()
.map(|ids| ids.iter().map(|id| id.id ).max())
.flatten().unwrap_or(0);
.map(|ids| ids.iter().map(|id| id.id).max())
.flatten()
.unwrap_or(0);
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: affinity_max }
);
core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max });
}
if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape
@ -103,9 +102,9 @@ fn run(config: Config) -> ::anyhow::Result<()> {
thread::spawn(move || {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: affinity_max - 1 - i as usize }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: affinity_max - 1 - i as usize,
});
}
run_socket_thread(state, response_sender, receiver, &config, addr, thread_id)
@ -120,9 +119,9 @@ fn run(config: Config) -> ::anyhow::Result<()> {
thread::spawn(move || {
if config.core_affinity.set_affinities {
core_affinity::set_for_current(
core_affinity::CoreId { id: affinity_max - config.num_socket_workers as usize - 1 - i as usize }
);
core_affinity::set_for_current(core_affinity::CoreId {
id: affinity_max - config.num_socket_workers as usize - 1 - i as usize,
});
}
run_handler_thread(&config, state, pareto, request_senders, response_receiver)
});