From 700dd68d2c38dcef21f63ea4ae1bcd4ced4abc12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 6 Jan 2022 11:48:16 +0100 Subject: [PATCH] udp scrape improvements (#43) * udp_protocol: forbid full scrapes * udp: improve PendingScrapeResponseMap logging * udp: PendingScrapeResponseMap: store less data, improve logging * udp: PendingScrapeResponseMap: log if replacing entry on insert * udp: PendingScrapeResponseMap: use remote addr in key * Run cargo fmt * README: update copyright end year * udp: move scrape request splitting logic into PendingScrapeResponseMap * udp: add quickcheck test test_pending_scrape_response_map * udp protocol: fix failing test_scrape_request_convert_identity --- README.md | 2 +- aquatic_udp/src/workers/socket.rs | 243 ++++++++++++++++++++++------ aquatic_udp_protocol/src/request.rs | 31 ++-- 3 files changed, 217 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index c16974f..c297948 100644 --- a/README.md +++ b/README.md @@ -174,7 +174,7 @@ To fairly compare HTTP performance to opentracker, set keepalive to false in ## Copyright and license -Copyright (c) 2020-2021 Joakim FrostegÄrd +Copyright (c) 2020-2022 Joakim FrostegÄrd Distributed under Apache 2.0 license (details in `LICENSE` file.) diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 10ef04c..2ba7a23 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -48,58 +48,101 @@ impl ConnectionMap { } } -pub struct PendingScrapeResponseMeta { +#[derive(Debug)] +pub struct PendingScrapeResponseMapEntry { num_pending: usize, valid_until: ValidUntil, + torrent_stats: BTreeMap, } #[derive(Default)] pub struct PendingScrapeResponseMap( - AHashIndexMap<(ConnectionId, TransactionId), (PendingScrapeResponseMeta, PendingScrapeResponse)>, + AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>, ); impl PendingScrapeResponseMap { - pub fn prepare( + pub fn prepare_split_requests( &mut self, - connection_id: ConnectionId, - transaction_id: TransactionId, - num_pending: usize, + config: &Config, + request: ScrapeRequest, + addr: SocketAddr, valid_until: ValidUntil, - ) { - let meta = PendingScrapeResponseMeta { - num_pending, + ) -> impl IntoIterator { + let mut split_requests: AHashIndexMap = + Default::default(); + + if request.info_hashes.is_empty() { + ::log::warn!( + "Attempted to prepare PendingScrapeResponseMap entry with zero info hashes" + ); + + return split_requests; + } + + let connection_id = request.connection_id; + let transaction_id = request.transaction_id; + + for (i, info_hash) in request.info_hashes.into_iter().enumerate() { + let split_request = split_requests + .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) + .or_insert_with(|| PendingScrapeRequest { + connection_id, + transaction_id, + info_hashes: BTreeMap::new(), + }); + + split_request.info_hashes.insert(i, info_hash); + } + + let key = (connection_id, transaction_id, addr); + + let entry = PendingScrapeResponseMapEntry { + num_pending: split_requests.len(), valid_until, - }; - let response = PendingScrapeResponse { - connection_id, - transaction_id, - torrent_stats: BTreeMap::new(), + torrent_stats: Default::default(), }; - self.0.insert((connection_id, transaction_id), (meta, response)); + if let Some(previous_entry) = self.0.insert(key, entry) { + ::log::warn!( + "PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}", + previous_entry, + key + ); + } + + split_requests } - pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { - let key = (response.connection_id, response.transaction_id); + pub fn add_and_get_finished( + &mut self, + response: PendingScrapeResponse, + addr: SocketAddr, + ) -> Option { + let key = (response.connection_id, response.transaction_id, addr); - let finished = if let Some(r) = self.0.get_mut(&key) { - r.0.num_pending -= 1; + let finished = if let Some(entry) = self.0.get_mut(&key) { + entry.num_pending -= 1; - r.1.torrent_stats.extend(response.torrent_stats.into_iter()); + entry + .torrent_stats + .extend(response.torrent_stats.into_iter()); - r.0.num_pending == 0 + entry.num_pending == 0 } else { - ::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map"); + ::log::warn!( + "PendingScrapeResponseMap.add didn't find entry for key {:?}", + key + ); false }; if finished { - let response = self.0.remove(&key).unwrap().1; + let entry = self.0.remove(&key).unwrap(); Some(Response::Scrape(ScrapeResponse { transaction_id: response.transaction_id, - torrent_stats: response.torrent_stats.into_values().collect(), + torrent_stats: entry.torrent_stats.into_values().collect(), })) } else { None @@ -109,7 +152,19 @@ impl PendingScrapeResponseMap { pub fn clean(&mut self) { let now = Instant::now(); - self.0.retain(|_, v| v.0.valid_until.0 > now); + self.0.retain(|k, v| { + let keep = v.valid_until.0 > now; + + if !keep { + ::log::warn!( + "Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}", + k, + v + ); + } + + keep + }); self.0.shrink_to_fit(); } } @@ -371,32 +426,14 @@ pub fn handle_request( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - let mut requests: AHashIndexMap = - Default::default(); - - let connection_id = request.connection_id; - let transaction_id = request.transaction_id; - - for (i, info_hash) in request.info_hashes.into_iter().enumerate() { - let pending = requests - .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) - .or_insert_with(|| PendingScrapeRequest { - connection_id, - transaction_id, - info_hashes: BTreeMap::new(), - }); - - pending.info_hashes.insert(i, info_hash); - } - - pending_scrape_responses.prepare( - connection_id, - transaction_id, - requests.len(), + let split_requests = pending_scrape_responses.prepare_split_requests( + config, + request, + src, pending_scrape_valid_until, ); - for (request_worker_index, request) in requests { + for (request_worker_index, request) in split_requests { request_sender.try_send_to( request_worker_index, ConnectedRequest::Scrape(request), @@ -458,7 +495,7 @@ fn send_responses( for (response, addr) in response_receiver.try_iter() { let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), }; @@ -591,3 +628,111 @@ pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { socket.into() } + +#[cfg(test)] +mod tests { + use quickcheck::TestResult; + use quickcheck_macros::quickcheck; + + use super::*; + + #[quickcheck] + fn test_pending_scrape_response_map( + request_data: Vec<(i32, i64, SocketAddr, u8)>, + request_workers: u8, + ) -> TestResult { + if request_workers == 0 { + return TestResult::discard(); + } + + let mut config = Config::default(); + + config.request_workers = request_workers as usize; + + let valid_until = ValidUntil::new(1); + + let mut map = PendingScrapeResponseMap::default(); + + let mut requests = Vec::new(); + + for (t, c, a, b) in request_data { + if b == 0 { + return TestResult::discard(); + } + + let mut info_hashes = Vec::new(); + + for i in 0..b { + let info_hash = InfoHash([i; 20]); + + info_hashes.push(info_hash); + } + + let request = ScrapeRequest { + transaction_id: TransactionId(t), + connection_id: ConnectionId(c), + info_hashes, + }; + + requests.push((request, a)); + } + + let mut all_split_requests = Vec::new(); + + for (request, addr) in requests.iter() { + let split_requests = map.prepare_split_requests( + &config, + request.to_owned(), + addr.to_owned(), + valid_until, + ); + + all_split_requests.push(( + addr, + split_requests + .into_iter() + .collect::>(), + )); + } + + assert_eq!(map.0.len(), requests.len()); + + let mut responses = Vec::new(); + + for (addr, split_requests) in all_split_requests { + for (worker_index, split_request) in split_requests { + assert!(worker_index.0 < request_workers as usize); + + let torrent_stats = split_request + .info_hashes + .into_iter() + .map(|(i, info_hash)| { + ( + i, + TorrentScrapeStatistics { + seeders: NumberOfPeers((info_hash.0[0]) as i32), + leechers: NumberOfPeers(0), + completed: NumberOfDownloads(0), + }, + ) + }) + .collect(); + + let response = PendingScrapeResponse { + transaction_id: split_request.transaction_id, + connection_id: split_request.connection_id, + torrent_stats, + }; + + if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) { + responses.push(response); + } + } + } + + assert!(map.0.is_empty()); + assert_eq!(responses.len(), requests.len()); + + TestResult::from_bool(true) + } +} diff --git a/aquatic_udp_protocol/src/request.rs b/aquatic_udp_protocol/src/request.rs index 8d74196..16be9c8 100644 --- a/aquatic_udp_protocol/src/request.rs +++ b/aquatic_udp_protocol/src/request.rs @@ -271,18 +271,26 @@ impl Request { let position = cursor.position() as usize; let inner = cursor.into_inner(); - let info_hashes = (&inner[position..]) + let info_hashes: Vec = (&inner[position..]) .chunks_exact(20) .take(max_scrape_torrents as usize) .map(|chunk| InfoHash(chunk.try_into().unwrap())) .collect(); - Ok((ScrapeRequest { - connection_id: ConnectionId(connection_id), - transaction_id: TransactionId(transaction_id), - info_hashes, - }) - .into()) + if info_hashes.is_empty() { + Err(RequestParseError::sendable_text( + "Full scrapes are not allowed", + connection_id, + transaction_id, + )) + } else { + Ok((ScrapeRequest { + connection_id: ConnectionId(connection_id), + transaction_id: TransactionId(transaction_id), + info_hashes, + }) + .into()) + } } _ => Err(RequestParseError::sendable_text( @@ -296,6 +304,7 @@ impl Request { #[cfg(test)] mod tests { + use quickcheck::TestResult; use quickcheck_macros::quickcheck; use super::*; @@ -378,7 +387,11 @@ mod tests { } #[quickcheck] - fn test_scrape_request_convert_identity(request: ScrapeRequest) -> bool { - same_after_conversion(request.into()) + fn test_scrape_request_convert_identity(request: ScrapeRequest) -> TestResult { + if request.info_hashes.is_empty() { + return TestResult::discard(); + } + + TestResult::from_bool(same_after_conversion(request.into())) } }