From 00c4e74374a929b9575b0b6f96745b151fa792a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 13 Jan 2022 18:35:41 +0100 Subject: [PATCH] udp: use slab for pending scrape responses to fix potential issue Peers sometimes send multiple scrape requests with the same transaction id, which would previously cause warnings due to replacing the PendingScrapeMapEntry and later not finding it. --- aquatic_udp/src/common.rs | 6 +- aquatic_udp/src/workers/request.rs | 3 +- aquatic_udp/src/workers/socket.rs | 89 ++++++++++++------------------ aquatic_udp_bench/src/scrape.rs | 3 +- 4 files changed, 39 insertions(+), 62 deletions(-) diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 0b56c17..df26a70 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -31,15 +31,13 @@ impl Ip for Ipv6Addr { #[derive(Debug)] pub struct PendingScrapeRequest { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub info_hashes: BTreeMap, } #[derive(Debug)] pub struct PendingScrapeResponse { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub torrent_stats: BTreeMap, } diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index d3158c8..92f4319 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -391,8 +391,7 @@ fn handle_scrape_request( } PendingScrapeResponse { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: request.slab_key, torrent_stats, } } diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 2ba7a23..730c44f 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -12,6 +12,7 @@ use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{Rng, SeedableRng, StdRng}; +use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; @@ -49,23 +50,23 @@ impl ConnectionMap { } #[derive(Debug)] -pub struct PendingScrapeResponseMapEntry { +pub struct PendingScrapeResponseSlabEntry { num_pending: usize, valid_until: ValidUntil, torrent_stats: BTreeMap, + transaction_id: TransactionId, } #[derive(Default)] -pub struct PendingScrapeResponseMap( - AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>, +pub struct PendingScrapeResponseSlab( + Slab, ); -impl PendingScrapeResponseMap { +impl PendingScrapeResponseSlab { pub fn prepare_split_requests( &mut self, config: &Config, request: ScrapeRequest, - addr: SocketAddr, valid_until: ValidUntil, ) -> impl IntoIterator { let mut split_requests: AHashIndexMap = @@ -73,54 +74,38 @@ impl PendingScrapeResponseMap { if request.info_hashes.is_empty() { ::log::warn!( - "Attempted to prepare PendingScrapeResponseMap entry with zero info hashes" + "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" ); return split_requests; } - let connection_id = request.connection_id; - let transaction_id = request.transaction_id; + let vacant_entry = self.0.vacant_entry(); + let slab_key = vacant_entry.key(); for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { - connection_id, - transaction_id, + slab_key, info_hashes: BTreeMap::new(), }); split_request.info_hashes.insert(i, info_hash); } - let key = (connection_id, transaction_id, addr); - - let entry = PendingScrapeResponseMapEntry { + vacant_entry.insert(PendingScrapeResponseSlabEntry { num_pending: split_requests.len(), valid_until, torrent_stats: Default::default(), - }; - - if let Some(previous_entry) = self.0.insert(key, entry) { - ::log::warn!( - "PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}", - previous_entry, - key - ); - } + transaction_id: request.transaction_id, + }); split_requests } - pub fn add_and_get_finished( - &mut self, - response: PendingScrapeResponse, - addr: SocketAddr, - ) -> Option { - let key = (response.connection_id, response.transaction_id, addr); - - let finished = if let Some(entry) = self.0.get_mut(&key) { + pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { + let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { entry.num_pending -= 1; entry @@ -130,18 +115,18 @@ impl PendingScrapeResponseMap { entry.num_pending == 0 } else { ::log::warn!( - "PendingScrapeResponseMap.add didn't find entry for key {:?}", - key + "PendingScrapeResponseSlab.add didn't find entry for key {:?}", + response.slab_key ); false }; if finished { - let entry = self.0.remove(&key).unwrap(); + let entry = self.0.remove(response.slab_key); Some(Response::Scrape(ScrapeResponse { - transaction_id: response.transaction_id, + transaction_id: entry.transaction_id, torrent_stats: entry.torrent_stats.into_values().collect(), })) } else { @@ -157,7 +142,7 @@ impl PendingScrapeResponseMap { if !keep { ::log::warn!( - "Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}", + "Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}", k, v ); @@ -193,7 +178,7 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); - let mut pending_scrape_responses = PendingScrapeResponseMap::default(); + let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); @@ -278,7 +263,7 @@ fn read_requests( config: &Config, state: &State, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, socket: &mut UdpSocket, @@ -375,7 +360,7 @@ fn read_requests( pub fn handle_request( config: &Config, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, request_sender: &ConnectedRequestSender, @@ -429,7 +414,6 @@ pub fn handle_request( let split_requests = pending_scrape_responses.prepare_split_requests( config, request, - src, pending_scrape_valid_until, ); @@ -471,7 +455,7 @@ fn send_responses( socket: &mut UdpSocket, buffer: &mut [u8], response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, SocketAddr)>, ) { let mut responses_sent_ipv4: usize = 0; @@ -495,7 +479,7 @@ fn send_responses( for (response, addr) in response_receiver.try_iter() { let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr), + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), }; @@ -638,7 +622,7 @@ mod tests { #[quickcheck] fn test_pending_scrape_response_map( - request_data: Vec<(i32, i64, SocketAddr, u8)>, + request_data: Vec<(i32, i64, u8)>, request_workers: u8, ) -> TestResult { if request_workers == 0 { @@ -651,11 +635,11 @@ mod tests { let valid_until = ValidUntil::new(1); - let mut map = PendingScrapeResponseMap::default(); + let mut map = PendingScrapeResponseSlab::default(); let mut requests = Vec::new(); - for (t, c, a, b) in request_data { + for (t, c, b) in request_data { if b == 0 { return TestResult::discard(); } @@ -674,32 +658,30 @@ mod tests { info_hashes, }; - requests.push((request, a)); + requests.push(request); } let mut all_split_requests = Vec::new(); - for (request, addr) in requests.iter() { + for request in requests.iter() { let split_requests = map.prepare_split_requests( &config, request.to_owned(), - addr.to_owned(), valid_until, ); - all_split_requests.push(( - addr, + all_split_requests.push( split_requests .into_iter() .collect::>(), - )); + ); } assert_eq!(map.0.len(), requests.len()); let mut responses = Vec::new(); - for (addr, split_requests) in all_split_requests { + for split_requests in all_split_requests { for (worker_index, split_request) in split_requests { assert!(worker_index.0 < request_workers as usize); @@ -719,12 +701,11 @@ mod tests { .collect(); let response = PendingScrapeResponse { - transaction_id: split_request.transaction_id, - connection_id: split_request.connection_id, + slab_key: split_request.slab_key, torrent_stats, }; - if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) { + if let Some(response) = map.add_and_get_finished(response) { responses.push(response); } } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 09fe3e5..1062ade 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -41,8 +41,7 @@ pub fn bench_scrape_handler( for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { let request = ConnectedRequest::Scrape(PendingScrapeRequest { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: 0, info_hashes: request .info_hashes .clone()