mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 02:35:31 +00:00
Merge pull request #44 from greatest-ape/udp-2022-01-13
udp: use slab for pending scrape responses, split response statistics by type
This commit is contained in:
commit
391c55a19e
7 changed files with 178 additions and 145 deletions
|
|
@ -31,15 +31,13 @@ impl Ip for Ipv6Addr {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PendingScrapeRequest {
|
pub struct PendingScrapeRequest {
|
||||||
pub connection_id: ConnectionId,
|
pub slab_key: usize,
|
||||||
pub transaction_id: TransactionId,
|
|
||||||
pub info_hashes: BTreeMap<usize, InfoHash>,
|
pub info_hashes: BTreeMap<usize, InfoHash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PendingScrapeResponse {
|
pub struct PendingScrapeResponse {
|
||||||
pub connection_id: ConnectionId,
|
pub slab_key: usize,
|
||||||
pub transaction_id: TransactionId,
|
|
||||||
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
|
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -151,7 +149,10 @@ impl PeerStatus {
|
||||||
|
|
||||||
pub struct Statistics {
|
pub struct Statistics {
|
||||||
pub requests_received: AtomicUsize,
|
pub requests_received: AtomicUsize,
|
||||||
pub responses_sent: AtomicUsize,
|
pub responses_sent_connect: AtomicUsize,
|
||||||
|
pub responses_sent_announce: AtomicUsize,
|
||||||
|
pub responses_sent_scrape: AtomicUsize,
|
||||||
|
pub responses_sent_error: AtomicUsize,
|
||||||
pub bytes_received: AtomicUsize,
|
pub bytes_received: AtomicUsize,
|
||||||
pub bytes_sent: AtomicUsize,
|
pub bytes_sent: AtomicUsize,
|
||||||
pub torrents: Vec<AtomicUsize>,
|
pub torrents: Vec<AtomicUsize>,
|
||||||
|
|
@ -162,7 +163,10 @@ impl Statistics {
|
||||||
pub fn new(num_request_workers: usize) -> Self {
|
pub fn new(num_request_workers: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
requests_received: Default::default(),
|
requests_received: Default::default(),
|
||||||
responses_sent: Default::default(),
|
responses_sent_connect: Default::default(),
|
||||||
|
responses_sent_announce: Default::default(),
|
||||||
|
responses_sent_scrape: Default::default(),
|
||||||
|
responses_sent_error: Default::default(),
|
||||||
bytes_received: Default::default(),
|
bytes_received: Default::default(),
|
||||||
bytes_sent: Default::default(),
|
bytes_sent: Default::default(),
|
||||||
torrents: Self::create_atomic_usize_vec(num_request_workers),
|
torrents: Self::create_atomic_usize_vec(num_request_workers),
|
||||||
|
|
|
||||||
|
|
@ -391,8 +391,7 @@ fn handle_scrape_request(
|
||||||
}
|
}
|
||||||
|
|
||||||
PendingScrapeResponse {
|
PendingScrapeResponse {
|
||||||
connection_id: request.connection_id,
|
slab_key: request.slab_key,
|
||||||
transaction_id: request.transaction_id,
|
|
||||||
torrent_stats,
|
torrent_stats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ use crossbeam_channel::Receiver;
|
||||||
use mio::net::UdpSocket;
|
use mio::net::UdpSocket;
|
||||||
use mio::{Events, Interest, Poll, Token};
|
use mio::{Events, Interest, Poll, Token};
|
||||||
use rand::prelude::{Rng, SeedableRng, StdRng};
|
use rand::prelude::{Rng, SeedableRng, StdRng};
|
||||||
|
use slab::Slab;
|
||||||
|
|
||||||
use aquatic_common::access_list::create_access_list_cache;
|
use aquatic_common::access_list::create_access_list_cache;
|
||||||
use aquatic_common::access_list::AccessListCache;
|
use aquatic_common::access_list::AccessListCache;
|
||||||
|
|
@ -49,23 +50,21 @@ impl ConnectionMap {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PendingScrapeResponseMapEntry {
|
pub struct PendingScrapeResponseSlabEntry {
|
||||||
num_pending: usize,
|
num_pending: usize,
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
|
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
|
||||||
|
transaction_id: TransactionId,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct PendingScrapeResponseMap(
|
pub struct PendingScrapeResponseSlab(Slab<PendingScrapeResponseSlabEntry>);
|
||||||
AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>,
|
|
||||||
);
|
|
||||||
|
|
||||||
impl PendingScrapeResponseMap {
|
impl PendingScrapeResponseSlab {
|
||||||
pub fn prepare_split_requests(
|
pub fn prepare_split_requests(
|
||||||
&mut self,
|
&mut self,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
request: ScrapeRequest,
|
request: ScrapeRequest,
|
||||||
addr: SocketAddr,
|
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
|
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
|
||||||
let mut split_requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
|
let mut split_requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
|
||||||
|
|
@ -73,54 +72,38 @@ impl PendingScrapeResponseMap {
|
||||||
|
|
||||||
if request.info_hashes.is_empty() {
|
if request.info_hashes.is_empty() {
|
||||||
::log::warn!(
|
::log::warn!(
|
||||||
"Attempted to prepare PendingScrapeResponseMap entry with zero info hashes"
|
"Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes"
|
||||||
);
|
);
|
||||||
|
|
||||||
return split_requests;
|
return split_requests;
|
||||||
}
|
}
|
||||||
|
|
||||||
let connection_id = request.connection_id;
|
let vacant_entry = self.0.vacant_entry();
|
||||||
let transaction_id = request.transaction_id;
|
let slab_key = vacant_entry.key();
|
||||||
|
|
||||||
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
|
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
|
||||||
let split_request = split_requests
|
let split_request = split_requests
|
||||||
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
|
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
|
||||||
.or_insert_with(|| PendingScrapeRequest {
|
.or_insert_with(|| PendingScrapeRequest {
|
||||||
connection_id,
|
slab_key,
|
||||||
transaction_id,
|
|
||||||
info_hashes: BTreeMap::new(),
|
info_hashes: BTreeMap::new(),
|
||||||
});
|
});
|
||||||
|
|
||||||
split_request.info_hashes.insert(i, info_hash);
|
split_request.info_hashes.insert(i, info_hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
let key = (connection_id, transaction_id, addr);
|
vacant_entry.insert(PendingScrapeResponseSlabEntry {
|
||||||
|
|
||||||
let entry = PendingScrapeResponseMapEntry {
|
|
||||||
num_pending: split_requests.len(),
|
num_pending: split_requests.len(),
|
||||||
valid_until,
|
valid_until,
|
||||||
torrent_stats: Default::default(),
|
torrent_stats: Default::default(),
|
||||||
};
|
transaction_id: request.transaction_id,
|
||||||
|
});
|
||||||
if let Some(previous_entry) = self.0.insert(key, entry) {
|
|
||||||
::log::warn!(
|
|
||||||
"PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}",
|
|
||||||
previous_entry,
|
|
||||||
key
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
split_requests
|
split_requests
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_and_get_finished(
|
pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> {
|
||||||
&mut self,
|
let finished = if let Some(entry) = self.0.get_mut(response.slab_key) {
|
||||||
response: PendingScrapeResponse,
|
|
||||||
addr: SocketAddr,
|
|
||||||
) -> Option<Response> {
|
|
||||||
let key = (response.connection_id, response.transaction_id, addr);
|
|
||||||
|
|
||||||
let finished = if let Some(entry) = self.0.get_mut(&key) {
|
|
||||||
entry.num_pending -= 1;
|
entry.num_pending -= 1;
|
||||||
|
|
||||||
entry
|
entry
|
||||||
|
|
@ -130,18 +113,18 @@ impl PendingScrapeResponseMap {
|
||||||
entry.num_pending == 0
|
entry.num_pending == 0
|
||||||
} else {
|
} else {
|
||||||
::log::warn!(
|
::log::warn!(
|
||||||
"PendingScrapeResponseMap.add didn't find entry for key {:?}",
|
"PendingScrapeResponseSlab.add didn't find entry for key {:?}",
|
||||||
key
|
response.slab_key
|
||||||
);
|
);
|
||||||
|
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
|
|
||||||
if finished {
|
if finished {
|
||||||
let entry = self.0.remove(&key).unwrap();
|
let entry = self.0.remove(response.slab_key);
|
||||||
|
|
||||||
Some(Response::Scrape(ScrapeResponse {
|
Some(Response::Scrape(ScrapeResponse {
|
||||||
transaction_id: response.transaction_id,
|
transaction_id: entry.transaction_id,
|
||||||
torrent_stats: entry.torrent_stats.into_values().collect(),
|
torrent_stats: entry.torrent_stats.into_values().collect(),
|
||||||
}))
|
}))
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -157,7 +140,7 @@ impl PendingScrapeResponseMap {
|
||||||
|
|
||||||
if !keep {
|
if !keep {
|
||||||
::log::warn!(
|
::log::warn!(
|
||||||
"Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}",
|
"Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}",
|
||||||
k,
|
k,
|
||||||
v
|
v
|
||||||
);
|
);
|
||||||
|
|
@ -193,7 +176,7 @@ pub fn run_socket_worker(
|
||||||
|
|
||||||
let mut events = Events::with_capacity(config.network.poll_event_capacity);
|
let mut events = Events::with_capacity(config.network.poll_event_capacity);
|
||||||
let mut connections = ConnectionMap::default();
|
let mut connections = ConnectionMap::default();
|
||||||
let mut pending_scrape_responses = PendingScrapeResponseMap::default();
|
let mut pending_scrape_responses = PendingScrapeResponseSlab::default();
|
||||||
let mut access_list_cache = create_access_list_cache(&state.access_list);
|
let mut access_list_cache = create_access_list_cache(&state.access_list);
|
||||||
|
|
||||||
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
|
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
|
||||||
|
|
@ -278,7 +261,7 @@ fn read_requests(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
state: &State,
|
state: &State,
|
||||||
connections: &mut ConnectionMap,
|
connections: &mut ConnectionMap,
|
||||||
pending_scrape_responses: &mut PendingScrapeResponseMap,
|
pending_scrape_responses: &mut PendingScrapeResponseSlab,
|
||||||
access_list_cache: &mut AccessListCache,
|
access_list_cache: &mut AccessListCache,
|
||||||
rng: &mut StdRng,
|
rng: &mut StdRng,
|
||||||
socket: &mut UdpSocket,
|
socket: &mut UdpSocket,
|
||||||
|
|
@ -375,7 +358,7 @@ fn read_requests(
|
||||||
pub fn handle_request(
|
pub fn handle_request(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
connections: &mut ConnectionMap,
|
connections: &mut ConnectionMap,
|
||||||
pending_scrape_responses: &mut PendingScrapeResponseMap,
|
pending_scrape_responses: &mut PendingScrapeResponseSlab,
|
||||||
access_list_cache: &mut AccessListCache,
|
access_list_cache: &mut AccessListCache,
|
||||||
rng: &mut StdRng,
|
rng: &mut StdRng,
|
||||||
request_sender: &ConnectedRequestSender,
|
request_sender: &ConnectedRequestSender,
|
||||||
|
|
@ -429,7 +412,6 @@ pub fn handle_request(
|
||||||
let split_requests = pending_scrape_responses.prepare_split_requests(
|
let split_requests = pending_scrape_responses.prepare_split_requests(
|
||||||
config,
|
config,
|
||||||
request,
|
request,
|
||||||
src,
|
|
||||||
pending_scrape_valid_until,
|
pending_scrape_valid_until,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
@ -471,78 +453,31 @@ fn send_responses(
|
||||||
socket: &mut UdpSocket,
|
socket: &mut UdpSocket,
|
||||||
buffer: &mut [u8],
|
buffer: &mut [u8],
|
||||||
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
|
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
|
||||||
pending_scrape_responses: &mut PendingScrapeResponseMap,
|
pending_scrape_responses: &mut PendingScrapeResponseSlab,
|
||||||
local_responses: Drain<(Response, SocketAddr)>,
|
local_responses: Drain<(Response, SocketAddr)>,
|
||||||
) {
|
) {
|
||||||
let mut responses_sent_ipv4: usize = 0;
|
|
||||||
let mut responses_sent_ipv6: usize = 0;
|
|
||||||
let mut bytes_sent_ipv4: usize = 0;
|
|
||||||
let mut bytes_sent_ipv6: usize = 0;
|
|
||||||
|
|
||||||
for (response, addr) in local_responses {
|
for (response, addr) in local_responses {
|
||||||
send_response(
|
send_response(state, config, socket, buffer, response, addr);
|
||||||
config,
|
|
||||||
socket,
|
|
||||||
buffer,
|
|
||||||
&mut responses_sent_ipv4,
|
|
||||||
&mut responses_sent_ipv6,
|
|
||||||
&mut bytes_sent_ipv4,
|
|
||||||
&mut bytes_sent_ipv6,
|
|
||||||
response,
|
|
||||||
addr,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (response, addr) in response_receiver.try_iter() {
|
for (response, addr) in response_receiver.try_iter() {
|
||||||
let opt_response = match response {
|
let opt_response = match response {
|
||||||
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr),
|
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
|
||||||
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
|
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
|
||||||
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
|
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(response) = opt_response {
|
if let Some(response) = opt_response {
|
||||||
send_response(
|
send_response(state, config, socket, buffer, response, addr);
|
||||||
config,
|
|
||||||
socket,
|
|
||||||
buffer,
|
|
||||||
&mut responses_sent_ipv4,
|
|
||||||
&mut responses_sent_ipv6,
|
|
||||||
&mut bytes_sent_ipv4,
|
|
||||||
&mut bytes_sent_ipv6,
|
|
||||||
response,
|
|
||||||
addr,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.statistics.active() {
|
|
||||||
state
|
|
||||||
.statistics_ipv4
|
|
||||||
.responses_sent
|
|
||||||
.fetch_add(responses_sent_ipv4, Ordering::Release);
|
|
||||||
state
|
|
||||||
.statistics_ipv6
|
|
||||||
.responses_sent
|
|
||||||
.fetch_add(responses_sent_ipv6, Ordering::Release);
|
|
||||||
state
|
|
||||||
.statistics_ipv4
|
|
||||||
.bytes_sent
|
|
||||||
.fetch_add(bytes_sent_ipv4, Ordering::Release);
|
|
||||||
state
|
|
||||||
.statistics_ipv6
|
|
||||||
.bytes_sent
|
|
||||||
.fetch_add(bytes_sent_ipv6, Ordering::Release);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_response(
|
fn send_response(
|
||||||
|
state: &State,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
socket: &mut UdpSocket,
|
socket: &mut UdpSocket,
|
||||||
buffer: &mut [u8],
|
buffer: &mut [u8],
|
||||||
responses_sent_ipv4: &mut usize,
|
|
||||||
responses_sent_ipv6: &mut usize,
|
|
||||||
bytes_sent_ipv4: &mut usize,
|
|
||||||
bytes_sent_ipv6: &mut usize,
|
|
||||||
response: Response,
|
response: Response,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) {
|
) {
|
||||||
|
|
@ -572,15 +507,33 @@ fn send_response(
|
||||||
let amt = cursor.position() as usize;
|
let amt = cursor.position() as usize;
|
||||||
|
|
||||||
match socket.send_to(&cursor.get_ref()[..amt], addr) {
|
match socket.send_to(&cursor.get_ref()[..amt], addr) {
|
||||||
Ok(amt) => {
|
Ok(amt) if config.statistics.active() => {
|
||||||
if addr_is_ipv4 {
|
let stats = if addr_is_ipv4 {
|
||||||
*responses_sent_ipv4 += 1;
|
&state.statistics_ipv4
|
||||||
*bytes_sent_ipv4 += amt;
|
|
||||||
} else {
|
} else {
|
||||||
*responses_sent_ipv6 += 1;
|
&state.statistics_ipv6
|
||||||
*bytes_sent_ipv6 += amt;
|
};
|
||||||
|
|
||||||
|
stats.bytes_sent.fetch_add(amt, Ordering::Relaxed);
|
||||||
|
|
||||||
|
match response {
|
||||||
|
Response::Connect(_) => {
|
||||||
|
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
|
||||||
|
stats
|
||||||
|
.responses_sent_announce
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
Response::Scrape(_) => {
|
||||||
|
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
Response::Error(_) => {
|
||||||
|
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(_) => {}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
::log::info!("send_to error: {}", err);
|
::log::info!("send_to error: {}", err);
|
||||||
}
|
}
|
||||||
|
|
@ -638,7 +591,7 @@ mod tests {
|
||||||
|
|
||||||
#[quickcheck]
|
#[quickcheck]
|
||||||
fn test_pending_scrape_response_map(
|
fn test_pending_scrape_response_map(
|
||||||
request_data: Vec<(i32, i64, SocketAddr, u8)>,
|
request_data: Vec<(i32, i64, u8)>,
|
||||||
request_workers: u8,
|
request_workers: u8,
|
||||||
) -> TestResult {
|
) -> TestResult {
|
||||||
if request_workers == 0 {
|
if request_workers == 0 {
|
||||||
|
|
@ -651,11 +604,11 @@ mod tests {
|
||||||
|
|
||||||
let valid_until = ValidUntil::new(1);
|
let valid_until = ValidUntil::new(1);
|
||||||
|
|
||||||
let mut map = PendingScrapeResponseMap::default();
|
let mut map = PendingScrapeResponseSlab::default();
|
||||||
|
|
||||||
let mut requests = Vec::new();
|
let mut requests = Vec::new();
|
||||||
|
|
||||||
for (t, c, a, b) in request_data {
|
for (t, c, b) in request_data {
|
||||||
if b == 0 {
|
if b == 0 {
|
||||||
return TestResult::discard();
|
return TestResult::discard();
|
||||||
}
|
}
|
||||||
|
|
@ -674,32 +627,27 @@ mod tests {
|
||||||
info_hashes,
|
info_hashes,
|
||||||
};
|
};
|
||||||
|
|
||||||
requests.push((request, a));
|
requests.push(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut all_split_requests = Vec::new();
|
let mut all_split_requests = Vec::new();
|
||||||
|
|
||||||
for (request, addr) in requests.iter() {
|
for request in requests.iter() {
|
||||||
let split_requests = map.prepare_split_requests(
|
let split_requests =
|
||||||
&config,
|
map.prepare_split_requests(&config, request.to_owned(), valid_until);
|
||||||
request.to_owned(),
|
|
||||||
addr.to_owned(),
|
|
||||||
valid_until,
|
|
||||||
);
|
|
||||||
|
|
||||||
all_split_requests.push((
|
all_split_requests.push(
|
||||||
addr,
|
|
||||||
split_requests
|
split_requests
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect::<Vec<(RequestWorkerIndex, PendingScrapeRequest)>>(),
|
.collect::<Vec<(RequestWorkerIndex, PendingScrapeRequest)>>(),
|
||||||
));
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(map.0.len(), requests.len());
|
assert_eq!(map.0.len(), requests.len());
|
||||||
|
|
||||||
let mut responses = Vec::new();
|
let mut responses = Vec::new();
|
||||||
|
|
||||||
for (addr, split_requests) in all_split_requests {
|
for split_requests in all_split_requests {
|
||||||
for (worker_index, split_request) in split_requests {
|
for (worker_index, split_request) in split_requests {
|
||||||
assert!(worker_index.0 < request_workers as usize);
|
assert!(worker_index.0 < request_workers as usize);
|
||||||
|
|
||||||
|
|
@ -719,12 +667,11 @@ mod tests {
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let response = PendingScrapeResponse {
|
let response = PendingScrapeResponse {
|
||||||
transaction_id: split_request.transaction_id,
|
slab_key: split_request.slab_key,
|
||||||
connection_id: split_request.connection_id,
|
|
||||||
torrent_stats,
|
torrent_stats,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) {
|
if let Some(response) = map.add_and_get_finished(response) {
|
||||||
responses.push(response);
|
responses.push(response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,10 @@ const STYLESHEET_CONTENTS: &str = concat!(
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
struct CollectedStatistics {
|
struct CollectedStatistics {
|
||||||
requests_per_second: f64,
|
requests_per_second: f64,
|
||||||
responses_per_second: f64,
|
responses_per_second_connect: f64,
|
||||||
|
responses_per_second_announce: f64,
|
||||||
|
responses_per_second_scrape: f64,
|
||||||
|
responses_per_second_error: f64,
|
||||||
bytes_received_per_second: f64,
|
bytes_received_per_second: f64,
|
||||||
bytes_sent_per_second: f64,
|
bytes_sent_per_second: f64,
|
||||||
num_torrents: usize,
|
num_torrents: usize,
|
||||||
|
|
@ -33,10 +36,21 @@ struct CollectedStatistics {
|
||||||
|
|
||||||
impl CollectedStatistics {
|
impl CollectedStatistics {
|
||||||
fn from_shared(statistics: &Arc<Statistics>, last: &mut Instant) -> Self {
|
fn from_shared(statistics: &Arc<Statistics>, last: &mut Instant) -> Self {
|
||||||
let requests_received = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64;
|
let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
let responses_sent = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64;
|
let responses_sent_connect = statistics
|
||||||
let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64;
|
.responses_sent_connect
|
||||||
let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64;
|
.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
|
let responses_sent_announce = statistics
|
||||||
|
.responses_sent_announce
|
||||||
|
.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
|
let responses_sent_scrape = statistics
|
||||||
|
.responses_sent_scrape
|
||||||
|
.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
|
let responses_sent_error = statistics
|
||||||
|
.responses_sent_error
|
||||||
|
.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
|
let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
|
let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64;
|
||||||
let num_torrents = Self::sum_atomic_usizes(&statistics.torrents);
|
let num_torrents = Self::sum_atomic_usizes(&statistics.torrents);
|
||||||
let num_peers = Self::sum_atomic_usizes(&statistics.peers);
|
let num_peers = Self::sum_atomic_usizes(&statistics.peers);
|
||||||
|
|
||||||
|
|
@ -48,7 +62,10 @@ impl CollectedStatistics {
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
requests_per_second: requests_received / elapsed,
|
requests_per_second: requests_received / elapsed,
|
||||||
responses_per_second: responses_sent / elapsed,
|
responses_per_second_connect: responses_sent_connect / elapsed,
|
||||||
|
responses_per_second_announce: responses_sent_announce / elapsed,
|
||||||
|
responses_per_second_scrape: responses_sent_scrape / elapsed,
|
||||||
|
responses_per_second_error: responses_sent_error / elapsed,
|
||||||
bytes_received_per_second: bytes_received / elapsed,
|
bytes_received_per_second: bytes_received / elapsed,
|
||||||
bytes_sent_per_second: bytes_sent / elapsed,
|
bytes_sent_per_second: bytes_sent / elapsed,
|
||||||
num_torrents,
|
num_torrents,
|
||||||
|
|
@ -57,7 +74,7 @@ impl CollectedStatistics {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {
|
fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize {
|
||||||
values.iter().map(|n| n.load(Ordering::Acquire)).sum()
|
values.iter().map(|n| n.load(Ordering::Relaxed)).sum()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -66,10 +83,23 @@ impl Into<FormattedStatistics> for CollectedStatistics {
|
||||||
let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0;
|
let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0;
|
||||||
let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0;
|
let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0;
|
||||||
|
|
||||||
|
let responses_per_second_total = self.responses_per_second_connect
|
||||||
|
+ self.responses_per_second_announce
|
||||||
|
+ self.responses_per_second_scrape
|
||||||
|
+ self.responses_per_second_error;
|
||||||
|
|
||||||
FormattedStatistics {
|
FormattedStatistics {
|
||||||
requests_per_second: (self.requests_per_second as usize)
|
requests_per_second: (self.requests_per_second as usize)
|
||||||
.to_formatted_string(&Locale::en),
|
.to_formatted_string(&Locale::en),
|
||||||
responses_per_second: (self.responses_per_second as usize)
|
responses_per_second_total: (responses_per_second_total as usize)
|
||||||
|
.to_formatted_string(&Locale::en),
|
||||||
|
responses_per_second_connect: (self.responses_per_second_connect as usize)
|
||||||
|
.to_formatted_string(&Locale::en),
|
||||||
|
responses_per_second_announce: (self.responses_per_second_announce as usize)
|
||||||
|
.to_formatted_string(&Locale::en),
|
||||||
|
responses_per_second_scrape: (self.responses_per_second_scrape as usize)
|
||||||
|
.to_formatted_string(&Locale::en),
|
||||||
|
responses_per_second_error: (self.responses_per_second_error as usize)
|
||||||
.to_formatted_string(&Locale::en),
|
.to_formatted_string(&Locale::en),
|
||||||
rx_mbits: format!("{:.2}", rx_mbits),
|
rx_mbits: format!("{:.2}", rx_mbits),
|
||||||
tx_mbits: format!("{:.2}", tx_mbits),
|
tx_mbits: format!("{:.2}", tx_mbits),
|
||||||
|
|
@ -82,7 +112,11 @@ impl Into<FormattedStatistics> for CollectedStatistics {
|
||||||
#[derive(Clone, Debug, Serialize)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
struct FormattedStatistics {
|
struct FormattedStatistics {
|
||||||
requests_per_second: String,
|
requests_per_second: String,
|
||||||
responses_per_second: String,
|
responses_per_second_total: String,
|
||||||
|
responses_per_second_connect: String,
|
||||||
|
responses_per_second_announce: String,
|
||||||
|
responses_per_second_scrape: String,
|
||||||
|
responses_per_second_error: String,
|
||||||
rx_mbits: String,
|
rx_mbits: String,
|
||||||
tx_mbits: String,
|
tx_mbits: String,
|
||||||
num_torrents: String,
|
num_torrents: String,
|
||||||
|
|
@ -161,9 +195,27 @@ pub fn run_statistics_worker(config: Config, state: State) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) {
|
fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) {
|
||||||
|
println!(" requests/second: {:>10}", statistics.requests_per_second);
|
||||||
|
println!(" responses/second");
|
||||||
println!(
|
println!(
|
||||||
" requests/second: {:>10}, responses/second: {:>10}",
|
" total: {:>10}",
|
||||||
statistics.requests_per_second, statistics.responses_per_second
|
statistics.responses_per_second_total
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
" connect: {:>10}",
|
||||||
|
statistics.responses_per_second_connect
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
" announce: {:>10}",
|
||||||
|
statistics.responses_per_second_announce
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
" scrape: {:>10}",
|
||||||
|
statistics.responses_per_second_scrape
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
" error: {:>10}",
|
||||||
|
statistics.responses_per_second_error
|
||||||
);
|
);
|
||||||
println!(
|
println!(
|
||||||
" bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out",
|
" bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out",
|
||||||
|
|
|
||||||
|
|
@ -39,8 +39,24 @@
|
||||||
<td>{ ipv4.requests_per_second }</td>
|
<td>{ ipv4.requests_per_second }</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<th scope="row">Responses / second</th>
|
<th scope="row">Total responses / second</th>
|
||||||
<td>{ ipv4.responses_per_second }</td>
|
<td>{ ipv4.responses_per_second_total }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Connect responses / second</th>
|
||||||
|
<td>{ ipv4.responses_per_second_connect }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Announce responses / second</th>
|
||||||
|
<td>{ ipv4.responses_per_second_announce }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Scrape responses / second</th>
|
||||||
|
<td>{ ipv4.responses_per_second_scrape }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Error responses / second</th>
|
||||||
|
<td>{ ipv4.responses_per_second_error }</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<th scope="row">Bandwidth (RX)</th>
|
<th scope="row">Bandwidth (RX)</th>
|
||||||
|
|
@ -73,8 +89,24 @@
|
||||||
<td>{ ipv6.requests_per_second }</td>
|
<td>{ ipv6.requests_per_second }</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<th scope="row">Responses / second</th>
|
<th scope="row">Total responses / second</th>
|
||||||
<td>{ ipv6.responses_per_second }</td>
|
<td>{ ipv6.responses_per_second_total }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Connect responses / second</th>
|
||||||
|
<td>{ ipv6.responses_per_second_connect }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Announce responses / second</th>
|
||||||
|
<td>{ ipv6.responses_per_second_announce }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Scrape responses / second</th>
|
||||||
|
<td>{ ipv6.responses_per_second_scrape }</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<th scope="row">Error responses / second</th>
|
||||||
|
<td>{ ipv6.responses_per_second_error }</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<th scope="row">Bandwidth (RX)</th>
|
<th scope="row">Bandwidth (RX)</th>
|
||||||
|
|
|
||||||
|
|
@ -41,8 +41,7 @@ pub fn bench_scrape_handler(
|
||||||
for request_chunk in requests.chunks(p) {
|
for request_chunk in requests.chunks(p) {
|
||||||
for (request, src) in request_chunk {
|
for (request, src) in request_chunk {
|
||||||
let request = ConnectedRequest::Scrape(PendingScrapeRequest {
|
let request = ConnectedRequest::Scrape(PendingScrapeRequest {
|
||||||
connection_id: request.connection_id,
|
slab_key: 0,
|
||||||
transaction_id: request.transaction_id,
|
|
||||||
info_hashes: request
|
info_hashes: request
|
||||||
.info_hashes
|
.info_hashes
|
||||||
.clone()
|
.clone()
|
||||||
|
|
|
||||||
|
|
@ -97,7 +97,7 @@ impl Response {
|
||||||
/// addresses. Clients seem not to support it very well, but due to a lack
|
/// addresses. Clients seem not to support it very well, but due to a lack
|
||||||
/// of alternative solutions, it is implemented here.
|
/// of alternative solutions, it is implemented here.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn write(self, bytes: &mut impl Write) -> Result<(), io::Error> {
|
pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> {
|
||||||
match self {
|
match self {
|
||||||
Response::Connect(r) => {
|
Response::Connect(r) => {
|
||||||
bytes.write_i32::<NetworkEndian>(0)?;
|
bytes.write_i32::<NetworkEndian>(0)?;
|
||||||
|
|
@ -111,7 +111,7 @@ impl Response {
|
||||||
bytes.write_i32::<NetworkEndian>(r.leechers.0)?;
|
bytes.write_i32::<NetworkEndian>(r.leechers.0)?;
|
||||||
bytes.write_i32::<NetworkEndian>(r.seeders.0)?;
|
bytes.write_i32::<NetworkEndian>(r.seeders.0)?;
|
||||||
|
|
||||||
for peer in r.peers {
|
for peer in r.peers.iter() {
|
||||||
bytes.write_all(&peer.ip_address.octets())?;
|
bytes.write_all(&peer.ip_address.octets())?;
|
||||||
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
|
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
|
||||||
}
|
}
|
||||||
|
|
@ -120,7 +120,7 @@ impl Response {
|
||||||
bytes.write_i32::<NetworkEndian>(2)?;
|
bytes.write_i32::<NetworkEndian>(2)?;
|
||||||
bytes.write_i32::<NetworkEndian>(r.transaction_id.0)?;
|
bytes.write_i32::<NetworkEndian>(r.transaction_id.0)?;
|
||||||
|
|
||||||
for torrent_stat in r.torrent_stats {
|
for torrent_stat in r.torrent_stats.iter() {
|
||||||
bytes.write_i32::<NetworkEndian>(torrent_stat.seeders.0)?;
|
bytes.write_i32::<NetworkEndian>(torrent_stat.seeders.0)?;
|
||||||
bytes.write_i32::<NetworkEndian>(torrent_stat.completed.0)?;
|
bytes.write_i32::<NetworkEndian>(torrent_stat.completed.0)?;
|
||||||
bytes.write_i32::<NetworkEndian>(torrent_stat.leechers.0)?;
|
bytes.write_i32::<NetworkEndian>(torrent_stat.leechers.0)?;
|
||||||
|
|
@ -139,7 +139,7 @@ impl Response {
|
||||||
bytes.write_i32::<NetworkEndian>(r.leechers.0)?;
|
bytes.write_i32::<NetworkEndian>(r.leechers.0)?;
|
||||||
bytes.write_i32::<NetworkEndian>(r.seeders.0)?;
|
bytes.write_i32::<NetworkEndian>(r.seeders.0)?;
|
||||||
|
|
||||||
for peer in r.peers {
|
for peer in r.peers.iter() {
|
||||||
bytes.write_all(&peer.ip_address.octets())?;
|
bytes.write_all(&peer.ip_address.octets())?;
|
||||||
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
|
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue