udp: remove glommio implementation

This commit is contained in:
Joakim Frostegård 2021-11-14 21:59:06 +01:00
parent 7b20942d0f
commit 4641dd29f2
22 changed files with 465 additions and 1202 deletions

3
Cargo.lock generated
View file

@ -182,8 +182,6 @@ dependencies = [
"bytemuck",
"cfg-if",
"crossbeam-channel",
"futures-lite",
"glommio",
"hex",
"histogram",
"io-uring",
@ -208,6 +206,7 @@ dependencies = [
"anyhow",
"aquatic_cli_helpers",
"aquatic_udp",
"aquatic_udp_protocol",
"crossbeam-channel",
"indicatif",
"mimalloc",

View file

@ -17,9 +17,8 @@ name = "aquatic_udp"
[features]
default = ["with-mio"]
cpu-pinning = ["aquatic_common/cpu-pinning"]
with-glommio = ["cpu-pinning", "glommio", "futures-lite"]
with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"]
with-io-uring = ["crossbeam-channel", "histogram", "socket2", "io-uring", "libc", "bytemuck"]
with-mio = ["mio"]
with-io-uring = ["io-uring", "libc", "bytemuck"]
[dependencies]
anyhow = "1"
@ -27,7 +26,9 @@ aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_udp_protocol = "0.1.0"
cfg-if = "1"
crossbeam-channel = "0.5"
hex = "0.4"
histogram = "0.6"
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
parking_lot = "0.11"
@ -35,11 +36,7 @@ rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
slab = "0.4"
signal-hook = { version = "0.3" }
# mio / io-uring
crossbeam-channel = { version = "0.5", optional = true }
histogram = { version = "0.6", optional = true }
socket2 = { version = "0.4.1", features = ["all"], optional = true }
socket2 = { version = "0.4.1", features = ["all"] }
# mio
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true }
@ -49,10 +46,6 @@ io-uring = { version = "0.5", optional = true }
libc = { version = "0.2", optional = true }
bytemuck = { version = "1", optional = true }
# glommio
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
futures-lite = { version = "1", optional = true }
[dev-dependencies]
quickcheck = "1.0"
quickcheck_macros = "1.0"

View file

@ -1,21 +1,52 @@
use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Instant;
use parking_lot::Mutex;
use socket2::{Domain, Protocol, Socket, Type};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap};
use aquatic_common::AHashIndexMap;
pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*;
use aquatic_common::ValidUntil;
use aquatic_udp_protocol::*;
use crate::config::Config;
pub mod handlers;
pub mod network;
pub const MAX_PACKET_SIZE: usize = 8192;
#[derive(Debug)]
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape {
request: ScrapeRequest,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
#[derive(Debug)]
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape {
response: ScrapeResponse,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape { response, .. } => Response::Scrape(response),
}
}
}
pub trait Ip: Hash + PartialEq + Eq + Clone + Copy {
fn ip_addr(self) -> IpAddr;
}
@ -160,6 +191,43 @@ impl TorrentMaps {
}
}
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}
pub fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,
IpAddr::V6(ip) => {
if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() {
IpVersion::IPv4
} else {
IpVersion::IPv6
}
}
}
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv6Addr};

View file

@ -1,8 +1,13 @@
use std::{net::SocketAddr, time::Instant};
use aquatic_common::access_list::AccessListCache;
use aquatic_common::AHashIndexMap;
pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*;
use aquatic_common::ValidUntil;
use aquatic_udp_protocol::*;
use crossbeam_channel::Sender;
use rand::{prelude::StdRng, Rng};
use crate::common::*;
#[derive(Default)]
pub struct ConnectionMap(AHashIndexMap<(ConnectionId, SocketAddr), ValidUntil>);
@ -28,3 +33,117 @@ impl ConnectionMap {
self.0.shrink_to_fit();
}
}
pub fn handle_request(
config: &Config,
connections: &mut ConnectionMap,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>,
local_responses: &mut Vec<(Response, SocketAddr)>,
valid_until: ValidUntil,
res_request: Result<Request, RequestParseError>,
src: SocketAddr,
) {
let access_list_mode = config.access_list.mode;
match res_request {
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.insert(connection_id, src, valid_until);
let response = Response::Connect(ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
});
local_responses.push((response, src))
}
Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) {
if access_list_cache
.load()
.allows(access_list_mode, &request.info_hash.0)
{
if let Err(err) =
request_sender.try_send((ConnectedRequest::Announce(request), src))
{
::log::warn!("request_sender.try_send failed: {:?}", err)
}
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
local_responses.push((response, src))
}
}
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
let request = ConnectedRequest::Scrape {
request,
original_indices: Vec::new(),
};
if let Err(err) = request_sender.try_send((request, src)) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if connections.contains(connection_id, src) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
};
local_responses.push((response.into(), src));
}
}
}
}
}
pub fn create_socket(config: &Config) -> ::std::net::UdpSocket {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) {
::log::error!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
}
socket.into()
}

View file

@ -18,9 +18,7 @@ pub struct Config {
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
pub handlers: HandlerConfig,
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig,
pub privileges: PrivilegeConfig,
@ -70,7 +68,6 @@ pub struct ProtocolConfig {
pub peer_announce_interval: i32,
}
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct HandlerConfig {
@ -80,7 +77,6 @@ pub struct HandlerConfig {
pub channel_recv_timeout_microseconds: u64,
}
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct StatisticsConfig {
@ -109,9 +105,7 @@ impl Default for Config {
log_level: LogLevel::Error,
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
handlers: HandlerConfig::default(),
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(),
privileges: PrivilegeConfig::default(),
@ -143,7 +137,6 @@ impl Default for ProtocolConfig {
}
}
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
impl Default for HandlerConfig {
fn default() -> Self {
Self {
@ -153,7 +146,6 @@ impl Default for HandlerConfig {
}
}
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
impl Default for StatisticsConfig {
fn default() -> Self {
Self { interval: 0 }

View file

@ -1,8 +0,0 @@
use std::sync::Arc;
use aquatic_common::access_list::AccessListArcSwap;
#[derive(Default, Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
}

View file

@ -1,117 +0,0 @@
use std::cell::RefCell;
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::Duration;
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*};
use rand::prelude::SmallRng;
use rand::SeedableRng;
use crate::common::handlers::handle_announce_request;
use crate::common::handlers::*;
use crate::common::*;
use crate::config::Config;
use super::common::State;
pub async fn run_request_worker(
config: Config,
state: State,
request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>,
) {
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap();
let response_senders = Rc::new(response_senders);
let torrents = Rc::new(RefCell::new(TorrentMaps::default()));
// Periodically clean torrents
TimerActionRepeat::repeat(enclose!((config, torrents, state) move || {
enclose!((config, torrents, state) move || async move {
torrents.borrow_mut().clean(&config, &state.access_list);
Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval))
})()
}));
let mut handles = Vec::new();
for (_, receiver) in request_receivers.streams() {
let handle = spawn_local(handle_request_stream(
config.clone(),
torrents.clone(),
response_senders.clone(),
receiver,
))
.detach();
handles.push(handle);
}
for handle in handles {
handle.await;
}
}
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,
response_senders: Rc<Senders<(ConnectedResponse, SocketAddr)>>,
mut stream: S,
) where
S: Stream<Item = (usize, ConnectedRequest, SocketAddr)> + ::std::marker::Unpin,
{
let mut rng = SmallRng::from_entropy();
let max_peer_age = config.cleaning.max_peer_age;
let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age)));
TimerActionRepeat::repeat(enclose!((peer_valid_until) move || {
enclose!((peer_valid_until) move || async move {
*peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age);
Some(Duration::from_secs(1))
})()
}));
while let Some((producer_index, request, src)) = stream.next().await {
let response = match request {
ConnectedRequest::Announce(request) => {
ConnectedResponse::Announce(handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
request,
src,
peer_valid_until.borrow().to_owned(),
))
}
ConnectedRequest::Scrape {
request,
original_indices,
} => {
let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request);
ConnectedResponse::Scrape {
response,
original_indices,
}
}
};
::log::debug!("preparing to send response to channel: {:?}", response);
if let Err(err) = response_senders
.send_to(producer_index, (response, src))
.await
{
::log::error!("response_sender.send: {:?}", err);
}
yield_if_needed().await;
}
}

View file

@ -1,135 +0,0 @@
use std::sync::{atomic::AtomicUsize, Arc};
use aquatic_common::access_list::update_access_list;
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use glommio::channels::channel_mesh::MeshBuilder;
use glommio::prelude::*;
use signal_hook::consts::SIGUSR1;
use signal_hook::iterator::Signals;
use crate::config::Config;
use self::common::State;
mod common;
pub mod handlers;
pub mod network;
pub const SHARED_CHANNEL_SIZE: usize = 4096;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
{
let config = config.clone();
let state = state.clone();
::std::thread::spawn(move || run_inner(config, state));
}
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
}
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let builder = LocalExecutorBuilder::default().name("socket");
let executor = builder.spawn(move || async move {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::SocketWorker(i),
);
network::run_socket_worker(
config,
state,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
)
.await
});
executors.push(executor);
}
for i in 0..(config.request_workers) {
let config = config.clone();
let state = state.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let builder = LocalExecutorBuilder::default().name("request");
let executor = builder.spawn(move || async move {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::RequestWorker(i),
);
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await
});
executors.push(executor);
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}

View file

@ -1,428 +0,0 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::AHashIndexMap;
use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_unbounded, LocalSender};
use glommio::enclose;
use glommio::net::UdpSocket;
use glommio::prelude::*;
use glommio::timer::TimerActionRepeat;
use rand::prelude::{Rng, SeedableRng, StdRng};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use super::common::State;
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;
const PENDING_SCRAPE_MAX_WAIT: u64 = 30;
struct PendingScrapeResponse {
pending_worker_responses: usize,
valid_until: ValidUntil,
stats: BTreeMap<usize, TorrentScrapeStatistics>,
}
#[derive(Default)]
struct PendingScrapeResponses(AHashIndexMap<TransactionId, PendingScrapeResponse>);
impl PendingScrapeResponses {
fn prepare(
&mut self,
transaction_id: TransactionId,
pending_worker_responses: usize,
valid_until: ValidUntil,
) {
let pending = PendingScrapeResponse {
pending_worker_responses,
valid_until,
stats: BTreeMap::new(),
};
self.0.insert(transaction_id, pending);
}
fn add_and_get_finished(
&mut self,
mut response: ScrapeResponse,
mut original_indices: Vec<usize>,
) -> Option<ScrapeResponse> {
let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) {
r.pending_worker_responses -= 1;
r.stats.extend(
original_indices
.drain(..)
.zip(response.torrent_stats.drain(..)),
);
r.pending_worker_responses == 0
} else {
::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map");
false
};
if finished {
let PendingScrapeResponse { stats, .. } =
self.0.remove(&response.transaction_id).unwrap();
Some(ScrapeResponse {
transaction_id: response.transaction_id,
torrent_stats: stats.into_values().collect(),
})
} else {
None
}
}
fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.valid_until.0 > now);
self.0.shrink_to_fit();
}
}
pub async fn run_socket_worker(
config: Config,
state: State,
request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
) {
let (local_sender, local_receiver) = new_unbounded();
let mut socket = UdpSocket::bind(config.network.address).unwrap();
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
socket.set_buffer_size(recv_buffer_size);
}
let socket = Rc::new(socket);
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap();
let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default()));
// Periodically clean pending_scrape_responses
TimerActionRepeat::repeat(enclose!((pending_scrape_responses) move || {
enclose!((pending_scrape_responses) move || async move {
pending_scrape_responses.borrow_mut().clean();
Some(Duration::from_secs(120))
})()
}));
spawn_local(enclose!((pending_scrape_responses) read_requests(
config.clone(),
state,
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
pending_scrape_responses,
)))
.detach();
for (_, receiver) in response_receivers.streams().into_iter() {
spawn_local(enclose!((pending_scrape_responses) handle_shared_responses(
socket.clone(),
pending_scrape_responses,
receiver,
)))
.detach();
}
send_local_responses(socket, local_receiver.stream()).await;
}
async fn read_requests(
config: Config,
state: State,
request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>,
response_consumer_index: usize,
local_sender: LocalSender<(Response, SocketAddr)>,
socket: Rc<UdpSocket>,
pending_scrape_responses: Rc<RefCell<PendingScrapeResponses>>,
) {
let mut rng = StdRng::from_entropy();
let access_list_mode = config.access_list.mode;
let max_connection_age = config.cleaning.max_connection_age;
let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age)));
let pending_scrape_valid_until =
Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT)));
let connections = Rc::new(RefCell::new(ConnectionMap::default()));
let mut access_list_cache = create_access_list_cache(&state.access_list);
// Periodically update connection_valid_until
TimerActionRepeat::repeat(enclose!((connection_valid_until) move || {
enclose!((connection_valid_until) move || async move {
*connection_valid_until.borrow_mut() = ValidUntil::new(max_connection_age);
Some(Duration::from_secs(1))
})()
}));
// Periodically update pending_scrape_valid_until
TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || {
enclose!((pending_scrape_valid_until) move || async move {
*pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT);
Some(Duration::from_secs(10))
})()
}));
// Periodically clean connections
TimerActionRepeat::repeat(enclose!((config, connections) move || {
enclose!((config, connections) move || async move {
connections.borrow_mut().clean();
Some(Duration::from_secs(config.cleaning.connection_cleaning_interval))
})()
}));
let mut buf = [0u8; MAX_PACKET_SIZE];
loop {
match socket.recv_from(&mut buf).await {
Ok((amt, src)) => {
let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents);
::log::debug!("read request: {:?}", request);
match request {
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.borrow_mut().insert(
connection_id,
src,
connection_valid_until.borrow().to_owned(),
);
let response = Response::Connect(ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
});
local_sender.try_send((response, src)).unwrap();
}
Ok(Request::Announce(request)) => {
if connections.borrow().contains(request.connection_id, src) {
if access_list_cache
.load()
.allows(access_list_mode, &request.info_hash.0)
{
let request_consumer_index =
calculate_request_consumer_index(&config, request.info_hash);
if let Err(err) = request_senders
.send_to(
request_consumer_index,
(
response_consumer_index,
ConnectedRequest::Announce(request),
src,
),
)
.await
{
::log::error!("request_sender.try_send failed: {:?}", err)
}
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
local_sender.try_send((response, src)).unwrap();
}
}
}
Ok(Request::Scrape(ScrapeRequest {
transaction_id,
connection_id,
info_hashes,
})) => {
if connections.borrow().contains(connection_id, src) {
let mut consumer_requests: AHashIndexMap<
usize,
(ScrapeRequest, Vec<usize>),
> = Default::default();
for (i, info_hash) in info_hashes.into_iter().enumerate() {
let (req, indices) = consumer_requests
.entry(calculate_request_consumer_index(&config, info_hash))
.or_insert_with(|| {
let request = ScrapeRequest {
transaction_id: transaction_id,
connection_id: connection_id,
info_hashes: Vec::new(),
};
(request, Vec::new())
});
req.info_hashes.push(info_hash);
indices.push(i);
}
pending_scrape_responses.borrow_mut().prepare(
transaction_id,
consumer_requests.len(),
pending_scrape_valid_until.borrow().to_owned(),
);
for (consumer_index, (request, original_indices)) in consumer_requests {
let request = ConnectedRequest::Scrape {
request,
original_indices,
};
if let Err(err) = request_senders
.send_to(
consumer_index,
(response_consumer_index, request, src),
)
.await
{
::log::error!("request_sender.send failed: {:?}", err)
}
}
}
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if connections.borrow().contains(connection_id, src) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
};
local_sender.try_send((response.into(), src)).unwrap();
}
}
}
}
}
Err(err) => {
::log::error!("recv_from: {:?}", err);
}
}
yield_if_needed().await;
}
}
async fn handle_shared_responses<S>(
socket: Rc<UdpSocket>,
pending_scrape_responses: Rc<RefCell<PendingScrapeResponses>>,
mut stream: S,
) where
S: Stream<Item = (ConnectedResponse, SocketAddr)> + ::std::marker::Unpin,
{
let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]);
while let Some((response, addr)) = stream.next().await {
let opt_response = match response {
ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)),
ConnectedResponse::Scrape {
response,
original_indices,
} => pending_scrape_responses
.borrow_mut()
.add_and_get_finished(response, original_indices)
.map(|response| (Response::Scrape(response), addr)),
};
if let Some((response, addr)) = opt_response {
write_response_to_socket(&socket, &mut buf, addr, response).await;
}
yield_if_needed().await;
}
}
async fn send_local_responses<S>(socket: Rc<UdpSocket>, mut stream: S)
where
S: Stream<Item = (Response, SocketAddr)> + ::std::marker::Unpin,
{
let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]);
while let Some((response, addr)) = stream.next().await {
write_response_to_socket(&socket, &mut buf, addr, response).await;
yield_if_needed().await;
}
}
async fn write_response_to_socket(
socket: &Rc<UdpSocket>,
buf: &mut Cursor<&mut [u8]>,
addr: SocketAddr,
response: Response,
) {
buf.set_position(0);
::log::debug!("preparing to send response: {:?}", response.clone());
response
.write(buf, ip_version_from_ip(addr.ip()))
.expect("write response");
let position = buf.position() as usize;
if let Err(err) = socket.send_to(&buf.get_ref()[..position], addr).await {
::log::info!("send_to failed: {:?}", err);
}
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers
}
fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,
IpAddr::V6(ip) => {
if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() {
IpVersion::IPv4
} else {
IpVersion::IPv6
}
}
}
}

View file

@ -1,37 +1,101 @@
use std::net::IpAddr;
use std::net::SocketAddr;
use std::time::Duration;
use rand::rngs::SmallRng;
use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender};
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_common::extract_response_peers;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config;
#[derive(Debug)]
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape {
request: ScrapeRequest,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
pub fn run_request_worker(
state: State,
config: Config,
request_receiver: Receiver<(ConnectedRequest, SocketAddr)>,
response_sender: Sender<(ConnectedResponse, SocketAddr)>,
) {
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new();
let mut small_rng = SmallRng::from_entropy();
let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds);
loop {
let mut opt_torrents = None;
// Collect requests from channel, divide them by type
//
// Collect a maximum number of request. Stop collecting before that
// number is reached if having waited for too long for a request, but
// only if TorrentMaps mutex isn't locked.
for i in 0..config.handlers.max_requests_per_iter {
let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 {
match request_receiver.recv() {
Ok(r) => r,
Err(_) => break, // Really shouldn't happen
}
} else {
match request_receiver.recv_timeout(timeout) {
Ok(r) => r,
Err(_) => {
if let Some(guard) = state.torrents.try_lock() {
opt_torrents = Some(guard);
break;
} else {
continue;
}
}
}
};
match request {
ConnectedRequest::Announce(request) => announce_requests.push((request, src)),
ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)),
}
}
#[derive(Debug)]
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape {
response: ScrapeResponse,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
// Generate responses for announce and scrape requests, then drop MutexGuard.
{
let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock());
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(announce_requests.drain(..).map(|(request, src)| {
let response = handle_announce_request(
&config,
&mut small_rng,
&mut torrents,
request,
src,
peer_valid_until,
);
(ConnectedResponse::Announce(response), src)
}));
responses.extend(scrape_requests.drain(..).map(|(request, src)| {
let response = ConnectedResponse::Scrape {
response: handle_scrape_request(&mut torrents, src, request),
original_indices: Vec::new(),
};
(response, src)
}));
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape { response, .. } => Response::Scrape(response),
for r in responses.drain(..) {
if let Err(err) = response_sender.send(r) {
::log::error!("error sending response to channel: {}", err);
}
}
}
}
@ -143,7 +207,6 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize {
}
}
#[inline]
pub fn handle_scrape_request(
torrents: &mut TorrentMaps,
src: SocketAddr,

View file

@ -1,22 +1,175 @@
use cfg_if::cfg_if;
pub mod common;
pub mod config;
#[cfg(all(feature = "with-glommio", target_os = "linux"))]
pub mod glommio;
#[cfg(any(feature = "with-mio", feature = "with-io-uring"))]
pub mod other;
pub mod handlers;
#[cfg(feature = "with-mio")]
pub mod network_mio;
#[cfg(feature = "with-io-uring")]
pub mod network_uring;
pub mod tasks;
use config::Config;
use std::sync::{atomic::AtomicUsize, Arc};
use std::thread::Builder;
use std::time::Duration;
use anyhow::Context;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use crossbeam_channel::unbounded;
use aquatic_common::access_list::update_access_list;
use signal_hook::consts::SIGUSR1;
use signal_hook::iterator::Signals;
use common::State;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub fn run(config: Config) -> ::anyhow::Result<()> {
cfg_if! {
if #[cfg(all(feature = "with-glommio", target_os = "linux"))] {
glommio::run(config)
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
{
let config = config.clone();
let state = state.clone();
::std::thread::spawn(move || run_inner(config, state));
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
}
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();
for i in 0..config.request_workers {
let state = state.clone();
let config = config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::RequestWorker(i),
);
handlers::run_request_worker(state, config, request_receiver, response_sender)
})
.with_context(|| "spawn request worker")?;
}
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();
let request_sender = request_sender.clone();
let response_receiver = response_receiver.clone();
let num_bound_sockets = num_bound_sockets.clone();
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::SocketWorker(i),
);
cfg_if::cfg_if!(
if #[cfg(feature = "with-io-uring")] {
network_uring::run_socket_worker(
state,
config,
request_sender,
response_receiver,
num_bound_sockets,
);
} else {
other::run(config)
}
network_mio::run_socket_worker(
state,
config,
i,
request_sender,
response_receiver,
num_bound_sockets,
);
}
);
})
.with_context(|| "spawn socket worker")?;
}
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
Builder::new()
.name("statistics-collector".to_string())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
tasks::gather_and_print_statistics(&state, &config);
}
})
.with_context(|| "spawn statistics worker")?;
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
state.torrents.lock().clean(&config, &state.access_list);
}
}

View file

@ -8,6 +8,7 @@ use std::time::{Duration, Instant};
use std::vec::Drain;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender};
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
@ -15,13 +16,10 @@ use rand::prelude::{SeedableRng, StdRng};
use aquatic_udp_protocol::{Request, Response};
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
use crate::common::network::*;
use crate::common::*;
use crate::config::Config;
use super::common::*;
pub fn run_socket_worker(
state: State,
config: Config,

View file

@ -10,6 +10,7 @@ use std::sync::{
use std::time::{Duration, Instant};
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender};
use io_uring::types::{Fixed, Timespec};
use io_uring::SubmissionQueue;
@ -21,13 +22,11 @@ use slab::Slab;
use aquatic_udp_protocol::{Request, Response};
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
use crate::common::network::*;
use crate::common::*;
use crate::config::Config;
use super::common::*;
const RING_SIZE: usize = 128;
const MAX_RECV_EVENTS: usize = 1;
const MAX_SEND_EVENTS: usize = RING_SIZE - MAX_RECV_EVENTS - 1;

View file

@ -1,166 +0,0 @@
use aquatic_common::access_list::{AccessListArcSwap, AccessListCache};
use aquatic_udp_protocol::*;
use crossbeam_channel::Sender;
use parking_lot::Mutex;
use rand::{prelude::StdRng, Rng};
use socket2::{Domain, Protocol, Socket, Type};
use std::{
net::{IpAddr, SocketAddr},
sync::{atomic::AtomicUsize, Arc},
};
use crate::common::*;
use crate::common::{handlers::ConnectedRequest, network::ConnectionMap};
use crate::config::Config;
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}
pub fn handle_request(
config: &Config,
connections: &mut ConnectionMap,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>,
local_responses: &mut Vec<(Response, SocketAddr)>,
valid_until: ValidUntil,
res_request: Result<Request, RequestParseError>,
src: SocketAddr,
) {
let access_list_mode = config.access_list.mode;
match res_request {
Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen());
connections.insert(connection_id, src, valid_until);
let response = Response::Connect(ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
});
local_responses.push((response, src))
}
Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) {
if access_list_cache
.load()
.allows(access_list_mode, &request.info_hash.0)
{
if let Err(err) =
request_sender.try_send((ConnectedRequest::Announce(request), src))
{
::log::warn!("request_sender.try_send failed: {:?}", err)
}
} else {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Info hash not allowed".into(),
});
local_responses.push((response, src))
}
}
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
let request = ConnectedRequest::Scrape {
request,
original_indices: Vec::new(),
};
if let Err(err) = request_sender.try_send((request, src)) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if connections.contains(connection_id, src) {
let response = ErrorResponse {
transaction_id,
message: err.right_or("Parse error").into(),
};
local_responses.push((response.into(), src));
}
}
}
}
}
pub fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,
IpAddr::V6(ip) => {
if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() {
IpVersion::IPv4
} else {
IpVersion::IPv6
}
}
}
}
pub fn create_socket(config: &Config) -> ::std::net::UdpSocket {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) {
::log::error!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
}
socket.into()
}

View file

@ -1,98 +0,0 @@
use std::net::SocketAddr;
use std::time::Duration;
use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender};
use rand::{rngs::SmallRng, SeedableRng};
use aquatic_udp_protocol::*;
use crate::common::handlers::*;
use crate::config::Config;
use crate::other::common::*;
pub fn run_request_worker(
state: State,
config: Config,
request_receiver: Receiver<(ConnectedRequest, SocketAddr)>,
response_sender: Sender<(ConnectedResponse, SocketAddr)>,
) {
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new();
let mut small_rng = SmallRng::from_entropy();
let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds);
loop {
let mut opt_torrents = None;
// Collect requests from channel, divide them by type
//
// Collect a maximum number of request. Stop collecting before that
// number is reached if having waited for too long for a request, but
// only if TorrentMaps mutex isn't locked.
for i in 0..config.handlers.max_requests_per_iter {
let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 {
match request_receiver.recv() {
Ok(r) => r,
Err(_) => break, // Really shouldn't happen
}
} else {
match request_receiver.recv_timeout(timeout) {
Ok(r) => r,
Err(_) => {
if let Some(guard) = state.torrents.try_lock() {
opt_torrents = Some(guard);
break;
} else {
continue;
}
}
}
};
match request {
ConnectedRequest::Announce(request) => announce_requests.push((request, src)),
ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)),
}
}
// Generate responses for announce and scrape requests, then drop MutexGuard.
{
let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock());
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(announce_requests.drain(..).map(|(request, src)| {
let response = handle_announce_request(
&config,
&mut small_rng,
&mut torrents,
request,
src,
peer_valid_until,
);
(ConnectedResponse::Announce(response), src)
}));
responses.extend(scrape_requests.drain(..).map(|(request, src)| {
let response = ConnectedResponse::Scrape {
response: handle_scrape_request(&mut torrents, src, request),
original_indices: Vec::new(),
};
(response, src)
}));
}
for r in responses.drain(..) {
if let Err(err) = response_sender.send(r) {
::log::error!("error sending response to channel: {}", err);
}
}
}
}

View file

@ -1,172 +0,0 @@
use std::sync::{atomic::AtomicUsize, Arc};
use std::thread::Builder;
use std::time::Duration;
use anyhow::Context;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
use aquatic_common::privileges::drop_privileges_after_socket_binding;
use crossbeam_channel::unbounded;
use aquatic_common::access_list::update_access_list;
use signal_hook::consts::SIGUSR1;
use signal_hook::iterator::Signals;
use crate::config::Config;
pub mod common;
pub mod handlers;
#[cfg(feature = "with-mio")]
pub mod network_mio;
#[cfg(feature = "with-io-uring")]
pub mod network_uring;
pub mod tasks;
use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::default();
update_access_list(&config.access_list, &state.access_list)?;
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
{
let config = config.clone();
let state = state.clone();
::std::thread::spawn(move || run_inner(config, state));
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
}
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded();
for i in 0..config.request_workers {
let state = state.clone();
let config = config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
Builder::new()
.name(format!("request-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::RequestWorker(i),
);
handlers::run_request_worker(state, config, request_receiver, response_sender)
})
.with_context(|| "spawn request worker")?;
}
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();
let request_sender = request_sender.clone();
let response_receiver = response_receiver.clone();
let num_bound_sockets = num_bound_sockets.clone();
Builder::new()
.name(format!("socket-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::SocketWorker(i),
);
cfg_if::cfg_if!(
if #[cfg(feature = "with-io-uring")] {
network_uring::run_socket_worker(
state,
config,
request_sender,
response_receiver,
num_bound_sockets,
);
} else {
network_mio::run_socket_worker(
state,
config,
i,
request_sender,
response_receiver,
num_bound_sockets,
);
}
);
})
.with_context(|| "spawn socket worker")?;
}
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
Builder::new()
.name("statistics-collector".to_string())
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(config.statistics.interval));
tasks::gather_and_print_statistics(&state, &config);
}
})
.with_context(|| "spawn statistics worker")?;
}
drop_privileges_after_socket_binding(
&config.privileges,
num_bound_sockets,
config.socket_workers,
)
.unwrap();
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
state.torrents.lock().clean(&config, &state.access_list);
}
}

View file

@ -13,6 +13,7 @@ name = "aquatic_udp_bench"
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_udp = "0.1.0"
aquatic_udp_protocol = "0.1.0"
crossbeam-channel = "0.5"
indicatif = "0.16.2"
mimalloc = { version = "0.1", default-features = false }

View file

@ -6,9 +6,9 @@ use indicatif::ProgressIterator;
use rand::Rng;
use rand_distr::Pareto;
use aquatic_udp::common::handlers::*;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::BenchConfig;

View file

@ -7,6 +7,7 @@
//! Scrape: 1 873 545 requests/second, 533.75 ns/request
//! ```
use aquatic_udp::handlers::run_request_worker;
use crossbeam_channel::unbounded;
use num_format::{Locale, ToFormattedString};
use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
@ -15,8 +16,7 @@ use std::time::Duration;
use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp::other::common::*;
use aquatic_udp::other::handlers;
use aquatic_udp_protocol::*;
use config::BenchConfig;
@ -52,7 +52,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
let response_sender = response_sender.clone();
::std::thread::spawn(move || {
handlers::run_request_worker(state, config, request_receiver, response_sender)
run_request_worker(state, config, request_receiver, response_sender)
});
}

View file

@ -6,9 +6,9 @@ use indicatif::ProgressIterator;
use rand::Rng;
use rand_distr::Pareto;
use aquatic_udp::common::handlers::*;
use aquatic_udp::common::*;
use aquatic_udp::config::Config;
use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::BenchConfig;

View file

@ -2,14 +2,16 @@
. ./scripts/env-native-cpu-without-avx-512
USAGE="Usage: $0 [mio|io-uring] [ARGS]"
if [ "$1" != "mio" ] && [ "$1" != "glommio" ] && [ "$1" != "io-uring" ]; then
echo "Usage: $0 [mio|glommio|io-uring] [ARGS]"
echo "$USAGE"
else
if [ "$1" = "mio" ]; then
cargo run --release --bin aquatic_udp -- "${@:2}"
elif [ "$1" = "io-uring" ]; then
cargo run --release --features "with-io-uring" --no-default-features --bin aquatic_udp -- "${@:2}"
else
cargo run --release --features "with-glommio" --no-default-features --bin aquatic_udp -- "${@:2}"
echo "$USAGE"
fi
fi