From 821608ab503bbe6fb6f42fec2d2d0881368a376d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 16:00:41 +0200 Subject: [PATCH] aquatic_udp: glommio: return scrape stats in correct order --- aquatic_udp/src/lib/common/handlers.rs | 14 ++++- aquatic_udp/src/lib/glommio/handlers.rs | 37 +++++++----- aquatic_udp/src/lib/glommio/network.rs | 78 ++++++++++++++++--------- aquatic_udp/src/lib/mio/handlers.rs | 10 ++-- aquatic_udp/src/lib/mio/network.rs | 9 ++- aquatic_udp_bench/src/scrape.rs | 19 +++--- 6 files changed, 108 insertions(+), 59 deletions(-) diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index 773d268..380616a 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -10,20 +10,28 @@ use crate::common::*; #[derive(Debug)] pub enum ConnectedRequest { Announce(AnnounceRequest), - Scrape(ScrapeRequest), + Scrape { + request: ScrapeRequest, + /// Currently only used by glommio implementation + original_indices: Vec, + }, } #[derive(Debug)] pub enum ConnectedResponse { Announce(AnnounceResponse), - Scrape(ScrapeResponse), + Scrape { + response: ScrapeResponse, + /// Currently only used by glommio implementation + original_indices: Vec, + }, } impl Into for ConnectedResponse { fn into(self) -> Response { match self { Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), + Self::Scrape { response, .. } => Response::Scrape(response), } } } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index a288755..3bfbc0d 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -82,22 +82,29 @@ async fn handle_request_stream( })); while let Some((producer_index, request, src)) = stream.next().await { - let response = - match request { - ConnectedRequest::Announce(request) => { - ConnectedResponse::Announce(handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut(), - request, - src, - peer_valid_until.borrow().to_owned(), - )) + let response = match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + request, + src, + peer_valid_until.borrow().to_owned(), + )) + } + ConnectedRequest::Scrape { + request, + original_indices, + } => { + let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request); + + ConnectedResponse::Scrape { + response, + original_indices, } - ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape( - handle_scrape_request(&mut torrents.borrow_mut(), src, request), - ), - }; + } + }; ::log::debug!("preparing to send response to channel: {:?}", response); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index df35f00..feea9a1 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,4 +1,5 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -32,7 +33,7 @@ const PENDING_SCRAPE_MAX_WAIT: u64 = 30; struct PendingScrapeResponse { pending_worker_responses: usize, valid_until: ValidUntil, - stats: Vec, + stats: BTreeMap, } #[derive(Default)] @@ -48,16 +49,25 @@ impl PendingScrapeResponses { let pending = PendingScrapeResponse { pending_worker_responses, valid_until, - stats: Vec::new(), + stats: BTreeMap::new(), }; self.0.insert(transaction_id, pending); } - fn add_and_get_finished(&mut self, mut response: ScrapeResponse) -> Option { + fn add_and_get_finished( + &mut self, + mut response: ScrapeResponse, + mut original_indices: Vec, + ) -> Option { let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { r.pending_worker_responses -= 1; - r.stats.append(&mut response.torrent_stats); + + r.stats.extend( + original_indices + .drain(..) + .zip(response.torrent_stats.drain(..)), + ); r.pending_worker_responses == 0 } else { @@ -67,11 +77,12 @@ impl PendingScrapeResponses { }; if finished { - let r = self.0.remove(&response.transaction_id).unwrap(); + let PendingScrapeResponse { stats, .. } = + self.0.remove(&response.transaction_id).unwrap(); Some(ScrapeResponse { transaction_id: response.transaction_id, - torrent_stats: r.stats, + torrent_stats: stats.into_values().collect(), }) } else { None @@ -258,37 +269,47 @@ async fn read_requests( } } } - Ok(Request::Scrape(request)) => { - if connections.borrow().contains(request.connection_id, src) { - let mut consumer_requests: HashMap = + Ok(Request::Scrape(ScrapeRequest { + transaction_id, + connection_id, + info_hashes, + })) => { + if connections.borrow().contains(connection_id, src) { + let mut consumer_requests: HashMap)> = HashMap::new(); - for info_hash in request.info_hashes { - consumer_requests + for (i, info_hash) in info_hashes.into_iter().enumerate() { + let (req, indices) = consumer_requests .entry(calculate_request_consumer_index(&config, info_hash)) - .or_insert(ScrapeRequest { - transaction_id: request.transaction_id, - connection_id: request.connection_id, - info_hashes: Vec::new(), - }) - .info_hashes - .push(info_hash); + .or_insert_with(|| { + let request = ScrapeRequest { + transaction_id: transaction_id, + connection_id: connection_id, + info_hashes: Vec::new(), + }; + + (request, Vec::new()) + }); + + req.info_hashes.push(info_hash); + indices.push(i); } pending_scrape_responses.borrow_mut().prepare( - request.transaction_id, + transaction_id, consumer_requests.len(), pending_scrape_valid_until.borrow().to_owned(), ); - for (consumer_index, request) in consumer_requests { + for (consumer_index, (request, original_indices)) in consumer_requests { + let request = ConnectedRequest::Scrape { + request, + original_indices, + }; + if let Err(err) = request_senders.try_send_to( consumer_index, - ( - response_consumer_index, - ConnectedRequest::Scrape(request), - src, - ), + (response_consumer_index, request, src), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -338,9 +359,12 @@ async fn handle_shared_responses( while let Some((response, addr)) = stream.next().await { let opt_response = match response { ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), - ConnectedResponse::Scrape(response) => pending_scrape_responses + ConnectedResponse::Scrape { + response, + original_indices, + } => pending_scrape_responses .borrow_mut() - .add_and_get_finished(response) + .add_and_get_finished(response, original_indices) .map(|response| (Response::Scrape(response), addr)), }; diff --git a/aquatic_udp/src/lib/mio/handlers.rs b/aquatic_udp/src/lib/mio/handlers.rs index 99023fa..7019b98 100644 --- a/aquatic_udp/src/lib/mio/handlers.rs +++ b/aquatic_udp/src/lib/mio/handlers.rs @@ -55,8 +55,8 @@ pub fn run_request_worker( }; match request { - ConnectedRequest::Announce(r) => announce_requests.push((r, src)), - ConnectedRequest::Scrape(r) => scrape_requests.push((r, src)), + ConnectedRequest::Announce(request) => announce_requests.push((request, src)), + ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)), } } @@ -80,8 +80,10 @@ pub fn run_request_worker( })); responses.extend(scrape_requests.drain(..).map(|(request, src)| { - let response = - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)); + let response = ConnectedResponse::Scrape { + response: handle_scrape_request(&mut torrents, src, request), + original_indices: Vec::new(), + }; (response, src) })); diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index a510fca..73dacfc 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -192,9 +192,12 @@ fn read_requests( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Scrape(request), src)) - { + let request = ConnectedRequest::Scrape { + request, + original_indices: Vec::new(), + }; + + if let Err(err) = request_sender.try_send((request, src)) { ::log::warn!("request_sender.try_send failed: {:?}", err) } } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index d5eee1c..f718753 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -42,15 +42,20 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender - .send((ConnectedRequest::Scrape(request.clone()), *src)) - .unwrap(); + let request = ConnectedRequest::Scrape { + request: request.clone(), + original_indices: Vec::new(), + }; + + request_sender.send((request, *src)).unwrap(); } - while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Scrape { response, .. }, _)) = + response_receiver.try_recv() + { num_responses += 1; - if let Some(stat) = r.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.last() { dummy ^= stat.leechers.0; } } @@ -59,10 +64,10 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() { num_responses += 1; - if let Some(stat) = r.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.last() { dummy ^= stat.leechers.0; } }