udp: shard request workers by info hash

This commit is contained in:
Joakim Frostegård 2021-11-16 01:03:29 +01:00
parent 4addb0de49
commit b617ff9d09
9 changed files with 378 additions and 218 deletions

View file

@ -1,9 +1,11 @@
use std::collections::BTreeMap;
use std::hash::Hash; use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use crossbeam_channel::Sender;
use parking_lot::Mutex; use parking_lot::Mutex;
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
@ -34,35 +36,29 @@ impl Ip for Ipv6Addr {
} }
} }
#[derive(Debug)]
pub struct PendingScrapeRequest {
pub transaction_id: TransactionId,
pub info_hashes: BTreeMap<usize, InfoHash>,
}
#[derive(Debug)]
pub struct PendingScrapeResponse {
pub transaction_id: TransactionId,
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
}
#[derive(Debug)] #[derive(Debug)]
pub enum ConnectedRequest { pub enum ConnectedRequest {
Announce(AnnounceRequest), Announce(AnnounceRequest),
Scrape { Scrape(PendingScrapeRequest),
request: ScrapeRequest,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
} }
#[derive(Debug)] #[derive(Debug)]
pub enum ConnectedResponse { pub enum ConnectedResponse {
AnnounceIpv4(AnnounceResponseIpv4), AnnounceIpv4(AnnounceResponseIpv4),
AnnounceIpv6(AnnounceResponseIpv6), AnnounceIpv6(AnnounceResponseIpv6),
Scrape { Scrape(PendingScrapeResponse),
response: ScrapeResponse,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::AnnounceIpv4(response) => Response::AnnounceIpv4(response),
Self::AnnounceIpv6(response) => Response::AnnounceIpv6(response),
Self::Scrape { response, .. } => Response::Scrape(response),
}
}
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
@ -117,6 +113,64 @@ impl Into<ConnectedResponse> for ProtocolAnnounceResponse<Ipv6Addr> {
} }
} }
#[derive(Clone, Copy, Debug)]
pub struct SocketWorkerIndex(pub usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct RequestWorkerIndex(pub usize);
impl RequestWorkerIndex {
fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.request_workers)
}
}
pub struct ConnectedRequestSender {
index: SocketWorkerIndex,
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>>,
}
impl ConnectedRequestSender {
pub fn new(
index: SocketWorkerIndex,
senders: Vec<Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>>,
) -> Self {
Self { index, senders }
}
pub fn try_send_to(
&self,
index: RequestWorkerIndex,
request: ConnectedRequest,
addr: SocketAddr,
) {
if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}
}
pub struct ConnectedResponseSender {
senders: Vec<Sender<(ConnectedResponse, SocketAddr)>>,
}
impl ConnectedResponseSender {
pub fn new(senders: Vec<Sender<(ConnectedResponse, SocketAddr)>>) -> Self {
Self { senders }
}
pub fn try_send_to(
&self,
index: SocketWorkerIndex,
response: ConnectedResponse,
addr: SocketAddr,
) {
if let Err(err) = self.senders[index.0].try_send((response, addr)) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum PeerStatus { pub enum PeerStatus {
Seeding, Seeding,

View file

@ -4,7 +4,6 @@ use aquatic_common::access_list::AccessListCache;
use aquatic_common::AHashIndexMap; use aquatic_common::AHashIndexMap;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crossbeam_channel::Sender;
use rand::{prelude::StdRng, Rng}; use rand::{prelude::StdRng, Rng};
use crate::common::*; use crate::common::*;
@ -34,12 +33,75 @@ impl ConnectionMap {
} }
} }
pub struct PendingScrapeResponseMeta {
num_pending: usize,
valid_until: ValidUntil,
}
#[derive(Default)]
pub struct PendingScrapeResponseMap(
AHashIndexMap<TransactionId, (PendingScrapeResponseMeta, PendingScrapeResponse)>,
);
impl PendingScrapeResponseMap {
pub fn prepare(
&mut self,
transaction_id: TransactionId,
num_pending: usize,
valid_until: ValidUntil,
) {
let meta = PendingScrapeResponseMeta {
num_pending,
valid_until,
};
let response = PendingScrapeResponse {
transaction_id,
torrent_stats: BTreeMap::new(),
};
self.0.insert(transaction_id, (meta, response));
}
pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> {
let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) {
r.0.num_pending -= 1;
r.1.torrent_stats.extend(response.torrent_stats.into_iter());
r.0.num_pending == 0
} else {
::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map");
false
};
if finished {
let response = self.0.remove(&response.transaction_id).unwrap().1;
Some(Response::Scrape(ScrapeResponse {
transaction_id: response.transaction_id,
torrent_stats: response.torrent_stats.into_values().collect(),
}))
} else {
None
}
}
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0.valid_until.0 > now);
self.0.shrink_to_fit();
}
}
pub fn handle_request( pub fn handle_request(
config: &Config, config: &Config,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseMap,
access_list_cache: &mut AccessListCache, access_list_cache: &mut AccessListCache,
rng: &mut StdRng, rng: &mut StdRng,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>, request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, SocketAddr)>, local_responses: &mut Vec<(Response, SocketAddr)>,
valid_until: ValidUntil, valid_until: ValidUntil,
res_request: Result<Request, RequestParseError>, res_request: Result<Request, RequestParseError>,
@ -66,11 +128,14 @@ pub fn handle_request(
.load() .load()
.allows(access_list_mode, &request.info_hash.0) .allows(access_list_mode, &request.info_hash.0)
{ {
if let Err(err) = let worker_index =
request_sender.try_send((ConnectedRequest::Announce(request), src)) RequestWorkerIndex::from_info_hash(config, request.info_hash);
{
::log::warn!("request_sender.try_send failed: {:?}", err) request_sender.try_send_to(
} worker_index,
ConnectedRequest::Announce(request),
src,
);
} else { } else {
let response = Response::Error(ErrorResponse { let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
@ -83,13 +148,30 @@ pub fn handle_request(
} }
Ok(Request::Scrape(request)) => { Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) { if connections.contains(request.connection_id, src) {
let request = ConnectedRequest::Scrape { let mut requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
request, Default::default();
original_indices: Vec::new(),
};
if let Err(err) = request_sender.try_send((request, src)) { let transaction_id = request.transaction_id;
::log::warn!("request_sender.try_send failed: {:?}", err)
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let pending = requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
transaction_id,
info_hashes: BTreeMap::new(),
});
pending.info_hashes.insert(i, info_hash);
}
pending_scrape_responses.prepare(transaction_id, requests.len(), valid_until);
for (request_worker_index, request) in requests {
request_sender.try_send_to(
request_worker_index,
ConnectedRequest::Scrape(request),
src,
);
} }
} }
} }

View file

@ -1,12 +1,12 @@
use std::collections::BTreeMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::Receiver;
use rand::{rngs::SmallRng, SeedableRng}; use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_common::extract_response_peers; use aquatic_common::extract_response_peers;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
@ -15,88 +15,37 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn run_request_worker( pub fn run_request_worker(
state: State,
config: Config, config: Config,
request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>,
response_sender: Sender<(ConnectedResponse, SocketAddr)>, response_sender: ConnectedResponseSender,
) { ) {
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut torrents = TorrentMaps::default();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new();
let mut small_rng = SmallRng::from_entropy(); let mut small_rng = SmallRng::from_entropy();
let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds);
loop { loop {
let mut opt_torrents = None; if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
// Collect requests from channel, divide them by type
//
// Collect a maximum number of request. Stop collecting before that
// number is reached if having waited for too long for a request, but
// only if TorrentMaps mutex isn't locked.
for i in 0..config.handlers.max_requests_per_iter {
let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 {
match request_receiver.recv() {
Ok(r) => r,
Err(_) => break, // Really shouldn't happen
}
} else {
match request_receiver.recv_timeout(timeout) {
Ok(r) => r,
Err(_) => {
if let Some(guard) = state.torrents.try_lock() {
opt_torrents = Some(guard);
break;
} else {
continue;
}
}
}
};
match request {
ConnectedRequest::Announce(request) => announce_requests.push((request, src)),
ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)),
}
}
// Generate responses for announce and scrape requests, then drop MutexGuard.
{
let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock());
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(announce_requests.drain(..).map(|(request, src)| { let response = match request {
let response = handle_announce_request( ConnectedRequest::Announce(request) => handle_announce_request(
&config, &config,
&mut small_rng, &mut small_rng,
&mut torrents, &mut torrents,
request, request,
src, src,
peer_valid_until, peer_valid_until,
); ),
ConnectedRequest::Scrape(request) => {
(response, src) ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request))
})); }
responses.extend(scrape_requests.drain(..).map(|(request, src)| {
let response = ConnectedResponse::Scrape {
response: handle_scrape_request(&mut torrents, src, request),
original_indices: Vec::new(),
}; };
(response, src) response_sender.try_send_to(sender_index, response, src);
}));
} }
for r in responses.drain(..) { // TODO: clean torrent map, update peer_valid_until
if let Err(err) = response_sender.send(r) {
::log::error!("error sending response to channel: {}", err);
}
}
} }
} }
@ -207,41 +156,43 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize {
pub fn handle_scrape_request( pub fn handle_scrape_request(
torrents: &mut TorrentMaps, torrents: &mut TorrentMaps,
src: SocketAddr, src: SocketAddr,
request: ScrapeRequest, request: PendingScrapeRequest,
) -> ScrapeResponse { ) -> PendingScrapeResponse {
const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0);
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(request.info_hashes.len()); let mut torrent_stats: BTreeMap<usize, TorrentScrapeStatistics> = BTreeMap::new();
let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); if src.ip().is_ipv4() {
torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| {
if peer_ip.is_ipv4() { let s = if let Some(torrent_data) = torrents.ipv4.get(&info_hash) {
for info_hash in request.info_hashes.iter() { create_torrent_scrape_statistics(
if let Some(torrent_data) = torrents.ipv4.get(info_hash) {
stats.push(create_torrent_scrape_statistics(
torrent_data.num_seeders as i32, torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32, torrent_data.num_leechers as i32,
)); )
} else { } else {
stats.push(EMPTY_STATS); EMPTY_STATS
} };
}
(i, s)
}));
} else { } else {
for info_hash in request.info_hashes.iter() { torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| {
if let Some(torrent_data) = torrents.ipv6.get(info_hash) { let s = if let Some(torrent_data) = torrents.ipv6.get(&info_hash) {
stats.push(create_torrent_scrape_statistics( create_torrent_scrape_statistics(
torrent_data.num_seeders as i32, torrent_data.num_seeders as i32,
torrent_data.num_leechers as i32, torrent_data.num_leechers as i32,
)); )
} else { } else {
stats.push(EMPTY_STATS); EMPTY_STATS
} };
}
(i, s)
}));
} }
ScrapeResponse { PendingScrapeResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
torrent_stats: stats, torrent_stats,
} }
} }

View file

@ -9,6 +9,7 @@ pub mod tasks;
use config::Config; use config::Config;
use std::collections::BTreeMap;
use std::sync::{atomic::AtomicUsize, Arc}; use std::sync::{atomic::AtomicUsize, Arc};
use std::thread::Builder; use std::thread::Builder;
use std::time::Duration; use std::time::Duration;
@ -23,7 +24,7 @@ use aquatic_common::access_list::update_access_list;
use signal_hook::consts::SIGUSR1; use signal_hook::consts::SIGUSR1;
use signal_hook::iterator::Signals; use signal_hook::iterator::Signals;
use common::State; use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State};
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
@ -63,14 +64,30 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let (request_sender, request_receiver) = unbounded(); let mut request_senders = Vec::new();
let (response_sender, response_receiver) = unbounded(); let mut request_receivers = BTreeMap::new();
let mut response_senders = Vec::new();
let mut response_receivers = BTreeMap::new();
for i in 0..config.request_workers {
let (request_sender, request_receiver) = unbounded();
request_senders.push(request_sender);
request_receivers.insert(i, request_receiver);
}
for i in 0..config.socket_workers {
let (response_sender, response_receiver) = unbounded();
response_senders.push(response_sender);
response_receivers.insert(i, response_receiver);
}
for i in 0..config.request_workers { for i in 0..config.request_workers {
let state = state.clone();
let config = config.clone(); let config = config.clone();
let request_receiver = request_receiver.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone();
let response_sender = response_sender.clone(); let response_sender = ConnectedResponseSender::new(response_senders.clone());
Builder::new() Builder::new()
.name(format!("request-{:02}", i + 1)) .name(format!("request-{:02}", i + 1))
@ -82,7 +99,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
WorkerIndex::RequestWorker(i), WorkerIndex::RequestWorker(i),
); );
handlers::run_request_worker(state, config, request_receiver, response_sender) handlers::run_request_worker(config, request_receiver, response_sender)
}) })
.with_context(|| "spawn request worker")?; .with_context(|| "spawn request worker")?;
} }
@ -90,8 +107,9 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
for i in 0..config.socket_workers { for i in 0..config.socket_workers {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();
let request_sender = request_sender.clone(); let request_sender =
let response_receiver = response_receiver.clone(); ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone());
let response_receiver = response_receivers.remove(&i).unwrap();
let num_bound_sockets = num_bound_sockets.clone(); let num_bound_sockets = num_bound_sockets.clone();
Builder::new() Builder::new()
@ -128,6 +146,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
.with_context(|| "spawn socket worker")?; .with_context(|| "spawn socket worker")?;
} }
::std::mem::drop(request_senders);
::std::mem::drop(request_receivers);
::std::mem::drop(response_senders);
::std::mem::drop(response_receivers);
if config.statistics.interval != 0 { if config.statistics.interval != 0 {
let state = state.clone(); let state = state.clone();
let config = config.clone(); let config = config.clone();

View file

@ -9,7 +9,7 @@ use std::vec::Drain;
use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::Receiver;
use mio::net::UdpSocket; use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token}; use mio::{Events, Interest, Poll, Token};
use rand::prelude::{SeedableRng, StdRng}; use rand::prelude::{SeedableRng, StdRng};
@ -24,7 +24,7 @@ pub fn run_socket_worker(
state: State, state: State,
config: Config, config: Config,
token_num: usize, token_num: usize,
request_sender: Sender<(ConnectedRequest, SocketAddr)>, request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, response_receiver: Receiver<(ConnectedResponse, SocketAddr)>,
num_bound_sockets: Arc<AtomicUsize>, num_bound_sockets: Arc<AtomicUsize>,
) { ) {
@ -44,6 +44,7 @@ pub fn run_socket_worker(
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connections = ConnectionMap::default(); let mut connections = ConnectionMap::default();
let mut pending_scrape_responses = PendingScrapeResponseMap::default();
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
@ -66,6 +67,7 @@ pub fn run_socket_worker(
&config, &config,
&state, &state,
&mut connections, &mut connections,
&mut pending_scrape_responses,
&mut rng, &mut rng,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
@ -81,6 +83,7 @@ pub fn run_socket_worker(
&mut socket, &mut socket,
&mut buffer, &mut buffer,
&response_receiver, &response_receiver,
&mut pending_scrape_responses,
local_responses.drain(..), local_responses.drain(..),
); );
@ -103,10 +106,11 @@ fn read_requests(
config: &Config, config: &Config,
state: &State, state: &State,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseMap,
rng: &mut StdRng, rng: &mut StdRng,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
request_sender: &Sender<(ConnectedRequest, SocketAddr)>, request_sender: &ConnectedRequestSender,
local_responses: &mut Vec<(Response, SocketAddr)>, local_responses: &mut Vec<(Response, SocketAddr)>,
) { ) {
let mut requests_received: usize = 0; let mut requests_received: usize = 0;
@ -147,6 +151,7 @@ fn read_requests(
handle_request( handle_request(
config, config,
connections, connections,
pending_scrape_responses,
&mut access_list_cache, &mut access_list_cache,
rng, rng,
request_sender, request_sender,
@ -185,21 +190,66 @@ fn send_responses(
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
pending_scrape_responses: &mut PendingScrapeResponseMap,
local_responses: Drain<(Response, SocketAddr)>, local_responses: Drain<(Response, SocketAddr)>,
) { ) {
let mut responses_sent: usize = 0; let mut responses_sent: usize = 0;
let mut bytes_sent: usize = 0; let mut bytes_sent: usize = 0;
let mut cursor = Cursor::new(buffer); for (response, addr) in local_responses {
send_response(
let response_iterator = local_responses.into_iter().chain( config,
response_receiver socket,
.try_iter() buffer,
.map(|(response, addr)| (response.into(), addr)), &mut responses_sent,
&mut bytes_sent,
response,
addr,
); );
}
for (response, addr) in response_iterator { for (response, addr) in response_receiver.try_iter() {
cursor.set_position(0); let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
if let Some(response) = opt_response {
send_response(
config,
socket,
buffer,
&mut responses_sent,
&mut bytes_sent,
response,
addr,
);
}
}
if config.statistics.interval != 0 {
state
.statistics
.responses_sent
.fetch_add(responses_sent, Ordering::SeqCst);
state
.statistics
.bytes_sent
.fetch_add(bytes_sent, Ordering::SeqCst);
}
}
fn send_response(
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
responses_sent: &mut usize,
bytes_sent: &mut usize,
response: Response,
addr: SocketAddr,
) {
let mut cursor = Cursor::new(buffer);
let addr = if config.network.address.is_ipv4() { let addr = if config.network.address.is_ipv4() {
if let SocketAddr::V4(addr) = addr { if let SocketAddr::V4(addr) = addr {
@ -224,14 +274,10 @@ fn send_responses(
match socket.send_to(&cursor.get_ref()[..amt], addr) { match socket.send_to(&cursor.get_ref()[..amt], addr) {
Ok(amt) => { Ok(amt) => {
responses_sent += 1; *responses_sent += 1;
bytes_sent += amt; *bytes_sent += amt;
} }
Err(err) => { Err(err) => {
if err.kind() == ErrorKind::WouldBlock {
break;
}
::log::info!("send_to error: {}", err); ::log::info!("send_to error: {}", err);
} }
} }
@ -240,16 +286,4 @@ fn send_responses(
::log::error!("Response::write error: {:?}", err); ::log::error!("Response::write error: {:?}", err);
} }
} }
}
if config.statistics.interval != 0 {
state
.statistics
.responses_sent
.fetch_add(responses_sent, Ordering::SeqCst);
state
.statistics
.bytes_sent
.fetch_add(bytes_sent, Ordering::SeqCst);
}
} }

View file

@ -11,7 +11,7 @@ use std::time::{Duration, Instant};
use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::Receiver;
use io_uring::types::{Fixed, Timespec}; use io_uring::types::{Fixed, Timespec};
use io_uring::SubmissionQueue; use io_uring::SubmissionQueue;
use libc::{ use libc::{
@ -103,7 +103,7 @@ impl Into<u64> for UserData {
pub fn run_socket_worker( pub fn run_socket_worker(
state: State, state: State,
config: Config, config: Config,
request_sender: Sender<(ConnectedRequest, SocketAddr)>, request_sender: ConnectedRequestSender,
response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, response_receiver: Receiver<(ConnectedResponse, SocketAddr)>,
num_bound_sockets: Arc<AtomicUsize>, num_bound_sockets: Arc<AtomicUsize>,
) { ) {
@ -114,6 +114,7 @@ pub fn run_socket_worker(
num_bound_sockets.fetch_add(1, Ordering::SeqCst); num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let mut connections = ConnectionMap::default(); let mut connections = ConnectionMap::default();
let mut pending_scrape_responses = PendingScrapeResponseMap::default();
let mut access_list_cache = create_access_list_cache(&state.access_list); let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
@ -252,6 +253,7 @@ pub fn run_socket_worker(
handle_request( handle_request(
&config, &config,
&mut connections, &mut connections,
&mut pending_scrape_responses,
&mut access_list_cache, &mut access_list_cache,
&mut rng, &mut rng,
&request_sender, &request_sender,
@ -333,6 +335,13 @@ pub fn run_socket_worker(
.try_iter() .try_iter()
.take(MAX_SEND_EVENTS - send_entries.len()) .take(MAX_SEND_EVENTS - send_entries.len())
{ {
let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
if let Some(response) = opt_response {
queue_response( queue_response(
&config, &config,
&mut sq, &mut sq,
@ -343,10 +352,11 @@ pub fn run_socket_worker(
&mut sockaddrs_ipv4, &mut sockaddrs_ipv4,
&mut sockaddrs_ipv6, &mut sockaddrs_ipv6,
&mut msghdrs, &mut msghdrs,
response.into(), response,
addr, addr,
); );
} }
}
if iter_counter % 32 == 0 { if iter_counter % 32 == 0 {
let now = Instant::now(); let now = Instant::now();

View file

@ -16,7 +16,7 @@ use crate::config::BenchConfig;
pub fn bench_announce_handler( pub fn bench_announce_handler(
bench_config: &BenchConfig, bench_config: &BenchConfig,
aquatic_config: &Config, aquatic_config: &Config,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>, request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>,
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
rng: &mut impl Rng, rng: &mut impl Rng,
info_hashes: &[InfoHash], info_hashes: &[InfoHash],
@ -38,7 +38,11 @@ pub fn bench_announce_handler(
for request_chunk in requests.chunks(p) { for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk { for (request, src) in request_chunk {
request_sender request_sender
.send((ConnectedRequest::Announce(request.clone()), *src)) .send((
SocketWorkerIndex(0),
ConnectedRequest::Announce(request.clone()),
*src,
))
.unwrap(); .unwrap();
} }

View file

@ -39,21 +39,17 @@ fn main() {
pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
// Setup common state, spawn request handlers // Setup common state, spawn request handlers
let state = State::default();
let aquatic_config = Config::default(); let aquatic_config = Config::default();
let (request_sender, request_receiver) = unbounded(); let (request_sender, request_receiver) = unbounded();
let (response_sender, response_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded();
for _ in 0..bench_config.num_threads { let response_sender = ConnectedResponseSender::new(vec![response_sender]);
let state = state.clone();
let config = aquatic_config.clone();
let request_receiver = request_receiver.clone();
let response_sender = response_sender.clone();
::std::thread::spawn(move || { {
run_request_worker(state, config, request_receiver, response_sender) let config = aquatic_config.clone();
});
::std::thread::spawn(move || run_request_worker(config, request_receiver, response_sender));
} }
// Run benchmarks // Run benchmarks

View file

@ -16,7 +16,7 @@ use crate::config::BenchConfig;
pub fn bench_scrape_handler( pub fn bench_scrape_handler(
bench_config: &BenchConfig, bench_config: &BenchConfig,
aquatic_config: &Config, aquatic_config: &Config,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>, request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>,
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
rng: &mut impl Rng, rng: &mut impl Rng,
info_hashes: &[InfoHash], info_hashes: &[InfoHash],
@ -42,20 +42,25 @@ pub fn bench_scrape_handler(
for round in (0..bench_config.num_rounds).progress_with(pb) { for round in (0..bench_config.num_rounds).progress_with(pb) {
for request_chunk in requests.chunks(p) { for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk { for (request, src) in request_chunk {
let request = ConnectedRequest::Scrape { let request = ConnectedRequest::Scrape(PendingScrapeRequest {
request: request.clone(), transaction_id: request.transaction_id,
original_indices: Vec::new(), info_hashes: request
}; .info_hashes
.clone()
.into_iter()
.enumerate()
.collect(),
});
request_sender.send((request, *src)).unwrap(); request_sender
.send((SocketWorkerIndex(0), request, *src))
.unwrap();
} }
while let Ok((ConnectedResponse::Scrape { response, .. }, _)) = while let Ok((ConnectedResponse::Scrape(response), _)) = response_receiver.try_recv() {
response_receiver.try_recv()
{
num_responses += 1; num_responses += 1;
if let Some(stat) = response.torrent_stats.last() { if let Some(stat) = response.torrent_stats.values().last() {
dummy ^= stat.leechers.0; dummy ^= stat.leechers.0;
} }
} }
@ -64,10 +69,10 @@ pub fn bench_scrape_handler(
let total = bench_config.num_scrape_requests * (round + 1); let total = bench_config.num_scrape_requests * (round + 1);
while num_responses < total { while num_responses < total {
if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() { if let Ok((ConnectedResponse::Scrape(response), _)) = response_receiver.recv() {
num_responses += 1; num_responses += 1;
if let Some(stat) = response.torrent_stats.last() { if let Some(stat) = response.torrent_stats.values().last() {
dummy ^= stat.leechers.0; dummy ^= stat.leechers.0;
} }
} }