diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 941dab0..0b56c17 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -31,12 +31,14 @@ impl Ip for Ipv6Addr { #[derive(Debug)] pub struct PendingScrapeRequest { + pub connection_id: ConnectionId, pub transaction_id: TransactionId, pub info_hashes: BTreeMap, } #[derive(Debug)] pub struct PendingScrapeResponse { + pub connection_id: ConnectionId, pub transaction_id: TransactionId, pub torrent_stats: BTreeMap, } diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 44163ab..d3158c8 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -391,6 +391,7 @@ fn handle_scrape_request( } PendingScrapeResponse { + connection_id: request.connection_id, transaction_id: request.transaction_id, torrent_stats, } diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 0fbc6ea..10ef04c 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -55,12 +55,13 @@ pub struct PendingScrapeResponseMeta { #[derive(Default)] pub struct PendingScrapeResponseMap( - AHashIndexMap, + AHashIndexMap<(ConnectionId, TransactionId), (PendingScrapeResponseMeta, PendingScrapeResponse)>, ); impl PendingScrapeResponseMap { pub fn prepare( &mut self, + connection_id: ConnectionId, transaction_id: TransactionId, num_pending: usize, valid_until: ValidUntil, @@ -70,15 +71,18 @@ impl PendingScrapeResponseMap { valid_until, }; let response = PendingScrapeResponse { + connection_id, transaction_id, torrent_stats: BTreeMap::new(), }; - self.0.insert(transaction_id, (meta, response)); + self.0.insert((connection_id, transaction_id), (meta, response)); } pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { - let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { + let key = (response.connection_id, response.transaction_id); + + let finished = if let Some(r) = self.0.get_mut(&key) { r.0.num_pending -= 1; r.1.torrent_stats.extend(response.torrent_stats.into_iter()); @@ -91,7 +95,7 @@ impl PendingScrapeResponseMap { }; if finished { - let response = self.0.remove(&response.transaction_id).unwrap().1; + let response = self.0.remove(&key).unwrap().1; Some(Response::Scrape(ScrapeResponse { transaction_id: response.transaction_id, @@ -370,12 +374,14 @@ pub fn handle_request( let mut requests: AHashIndexMap = Default::default(); + let connection_id = request.connection_id; let transaction_id = request.transaction_id; for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let pending = requests .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { + connection_id, transaction_id, info_hashes: BTreeMap::new(), }); @@ -384,6 +390,7 @@ pub fn handle_request( } pending_scrape_responses.prepare( + connection_id, transaction_id, requests.len(), pending_scrape_valid_until, diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index fc058cb..09fe3e5 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -41,6 +41,7 @@ pub fn bench_scrape_handler( for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { let request = ConnectedRequest::Scrape(PendingScrapeRequest { + connection_id: request.connection_id, transaction_id: request.transaction_id, info_hashes: request .info_hashes