aquatic_udp: glommio: return scrape stats in correct order

This commit is contained in:
Joakim Frostegård 2021-10-23 16:00:41 +02:00
parent 0e58347ac4
commit 821608ab50
6 changed files with 108 additions and 59 deletions

View file

@ -10,20 +10,28 @@ use crate::common::*;
#[derive(Debug)]
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
Scrape {
request: ScrapeRequest,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
#[derive(Debug)]
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
Scrape {
response: ScrapeResponse,
/// Currently only used by glommio implementation
original_indices: Vec<usize>,
},
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
Self::Scrape { response, .. } => Response::Scrape(response),
}
}
}

View file

@ -82,22 +82,29 @@ async fn handle_request_stream<S>(
}));
while let Some((producer_index, request, src)) = stream.next().await {
let response =
match request {
ConnectedRequest::Announce(request) => {
ConnectedResponse::Announce(handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
request,
src,
peer_valid_until.borrow().to_owned(),
))
let response = match request {
ConnectedRequest::Announce(request) => {
ConnectedResponse::Announce(handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut(),
request,
src,
peer_valid_until.borrow().to_owned(),
))
}
ConnectedRequest::Scrape {
request,
original_indices,
} => {
let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request);
ConnectedResponse::Scrape {
response,
original_indices,
}
ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(
handle_scrape_request(&mut torrents.borrow_mut(), src, request),
),
};
}
};
::log::debug!("preparing to send response to channel: {:?}", response);

View file

@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
@ -32,7 +33,7 @@ const PENDING_SCRAPE_MAX_WAIT: u64 = 30;
struct PendingScrapeResponse {
pending_worker_responses: usize,
valid_until: ValidUntil,
stats: Vec<TorrentScrapeStatistics>,
stats: BTreeMap<usize, TorrentScrapeStatistics>,
}
#[derive(Default)]
@ -48,16 +49,25 @@ impl PendingScrapeResponses {
let pending = PendingScrapeResponse {
pending_worker_responses,
valid_until,
stats: Vec::new(),
stats: BTreeMap::new(),
};
self.0.insert(transaction_id, pending);
}
fn add_and_get_finished(&mut self, mut response: ScrapeResponse) -> Option<ScrapeResponse> {
fn add_and_get_finished(
&mut self,
mut response: ScrapeResponse,
mut original_indices: Vec<usize>,
) -> Option<ScrapeResponse> {
let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) {
r.pending_worker_responses -= 1;
r.stats.append(&mut response.torrent_stats);
r.stats.extend(
original_indices
.drain(..)
.zip(response.torrent_stats.drain(..)),
);
r.pending_worker_responses == 0
} else {
@ -67,11 +77,12 @@ impl PendingScrapeResponses {
};
if finished {
let r = self.0.remove(&response.transaction_id).unwrap();
let PendingScrapeResponse { stats, .. } =
self.0.remove(&response.transaction_id).unwrap();
Some(ScrapeResponse {
transaction_id: response.transaction_id,
torrent_stats: r.stats,
torrent_stats: stats.into_values().collect(),
})
} else {
None
@ -258,37 +269,47 @@ async fn read_requests(
}
}
}
Ok(Request::Scrape(request)) => {
if connections.borrow().contains(request.connection_id, src) {
let mut consumer_requests: HashMap<usize, ScrapeRequest> =
Ok(Request::Scrape(ScrapeRequest {
transaction_id,
connection_id,
info_hashes,
})) => {
if connections.borrow().contains(connection_id, src) {
let mut consumer_requests: HashMap<usize, (ScrapeRequest, Vec<usize>)> =
HashMap::new();
for info_hash in request.info_hashes {
consumer_requests
for (i, info_hash) in info_hashes.into_iter().enumerate() {
let (req, indices) = consumer_requests
.entry(calculate_request_consumer_index(&config, info_hash))
.or_insert(ScrapeRequest {
transaction_id: request.transaction_id,
connection_id: request.connection_id,
info_hashes: Vec::new(),
})
.info_hashes
.push(info_hash);
.or_insert_with(|| {
let request = ScrapeRequest {
transaction_id: transaction_id,
connection_id: connection_id,
info_hashes: Vec::new(),
};
(request, Vec::new())
});
req.info_hashes.push(info_hash);
indices.push(i);
}
pending_scrape_responses.borrow_mut().prepare(
request.transaction_id,
transaction_id,
consumer_requests.len(),
pending_scrape_valid_until.borrow().to_owned(),
);
for (consumer_index, request) in consumer_requests {
for (consumer_index, (request, original_indices)) in consumer_requests {
let request = ConnectedRequest::Scrape {
request,
original_indices,
};
if let Err(err) = request_senders.try_send_to(
consumer_index,
(
response_consumer_index,
ConnectedRequest::Scrape(request),
src,
),
(response_consumer_index, request, src),
) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
@ -338,9 +359,12 @@ async fn handle_shared_responses<S>(
while let Some((response, addr)) = stream.next().await {
let opt_response = match response {
ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)),
ConnectedResponse::Scrape(response) => pending_scrape_responses
ConnectedResponse::Scrape {
response,
original_indices,
} => pending_scrape_responses
.borrow_mut()
.add_and_get_finished(response)
.add_and_get_finished(response, original_indices)
.map(|response| (Response::Scrape(response), addr)),
};

View file

@ -55,8 +55,8 @@ pub fn run_request_worker(
};
match request {
ConnectedRequest::Announce(r) => announce_requests.push((r, src)),
ConnectedRequest::Scrape(r) => scrape_requests.push((r, src)),
ConnectedRequest::Announce(request) => announce_requests.push((request, src)),
ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)),
}
}
@ -80,8 +80,10 @@ pub fn run_request_worker(
}));
responses.extend(scrape_requests.drain(..).map(|(request, src)| {
let response =
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request));
let response = ConnectedResponse::Scrape {
response: handle_scrape_request(&mut torrents, src, request),
original_indices: Vec::new(),
};
(response, src)
}));

View file

@ -192,9 +192,12 @@ fn read_requests(
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
if let Err(err) =
request_sender.try_send((ConnectedRequest::Scrape(request), src))
{
let request = ConnectedRequest::Scrape {
request,
original_indices: Vec::new(),
};
if let Err(err) = request_sender.try_send((request, src)) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}

View file

@ -42,15 +42,20 @@ pub fn bench_scrape_handler(
for round in (0..bench_config.num_rounds).progress_with(pb) {
for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk {
request_sender
.send((ConnectedRequest::Scrape(request.clone()), *src))
.unwrap();
let request = ConnectedRequest::Scrape {
request: request.clone(),
original_indices: Vec::new(),
};
request_sender.send((request, *src)).unwrap();
}
while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() {
while let Ok((ConnectedResponse::Scrape { response, .. }, _)) =
response_receiver.try_recv()
{
num_responses += 1;
if let Some(stat) = r.torrent_stats.last() {
if let Some(stat) = response.torrent_stats.last() {
dummy ^= stat.leechers.0;
}
}
@ -59,10 +64,10 @@ pub fn bench_scrape_handler(
let total = bench_config.num_scrape_requests * (round + 1);
while num_responses < total {
if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() {
if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() {
num_responses += 1;
if let Some(stat) = r.torrent_stats.last() {
if let Some(stat) = response.torrent_stats.last() {
dummy ^= stat.leechers.0;
}
}