diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 0b56c17..df26a70 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -31,15 +31,13 @@ impl Ip for Ipv6Addr { #[derive(Debug)] pub struct PendingScrapeRequest { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub info_hashes: BTreeMap, } #[derive(Debug)] pub struct PendingScrapeResponse { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub torrent_stats: BTreeMap, } diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index d3158c8..92f4319 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -391,8 +391,7 @@ fn handle_scrape_request( } PendingScrapeResponse { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: request.slab_key, torrent_stats, } } diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 2ba7a23..730c44f 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -12,6 +12,7 @@ use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{Rng, SeedableRng, StdRng}; +use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; @@ -49,23 +50,23 @@ impl ConnectionMap { } #[derive(Debug)] -pub struct PendingScrapeResponseMapEntry { +pub struct PendingScrapeResponseSlabEntry { num_pending: usize, valid_until: ValidUntil, torrent_stats: BTreeMap, + transaction_id: TransactionId, } #[derive(Default)] -pub struct PendingScrapeResponseMap( - AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>, +pub struct PendingScrapeResponseSlab( + Slab, ); -impl PendingScrapeResponseMap { +impl PendingScrapeResponseSlab { pub fn prepare_split_requests( &mut self, config: &Config, request: ScrapeRequest, - addr: SocketAddr, valid_until: ValidUntil, ) -> impl IntoIterator { let mut split_requests: AHashIndexMap = @@ -73,54 +74,38 @@ impl PendingScrapeResponseMap { if request.info_hashes.is_empty() { ::log::warn!( - "Attempted to prepare PendingScrapeResponseMap entry with zero info hashes" + "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" ); return split_requests; } - let connection_id = request.connection_id; - let transaction_id = request.transaction_id; + let vacant_entry = self.0.vacant_entry(); + let slab_key = vacant_entry.key(); for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { - connection_id, - transaction_id, + slab_key, info_hashes: BTreeMap::new(), }); split_request.info_hashes.insert(i, info_hash); } - let key = (connection_id, transaction_id, addr); - - let entry = PendingScrapeResponseMapEntry { + vacant_entry.insert(PendingScrapeResponseSlabEntry { num_pending: split_requests.len(), valid_until, torrent_stats: Default::default(), - }; - - if let Some(previous_entry) = self.0.insert(key, entry) { - ::log::warn!( - "PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}", - previous_entry, - key - ); - } + transaction_id: request.transaction_id, + }); split_requests } - pub fn add_and_get_finished( - &mut self, - response: PendingScrapeResponse, - addr: SocketAddr, - ) -> Option { - let key = (response.connection_id, response.transaction_id, addr); - - let finished = if let Some(entry) = self.0.get_mut(&key) { + pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { + let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { entry.num_pending -= 1; entry @@ -130,18 +115,18 @@ impl PendingScrapeResponseMap { entry.num_pending == 0 } else { ::log::warn!( - "PendingScrapeResponseMap.add didn't find entry for key {:?}", - key + "PendingScrapeResponseSlab.add didn't find entry for key {:?}", + response.slab_key ); false }; if finished { - let entry = self.0.remove(&key).unwrap(); + let entry = self.0.remove(response.slab_key); Some(Response::Scrape(ScrapeResponse { - transaction_id: response.transaction_id, + transaction_id: entry.transaction_id, torrent_stats: entry.torrent_stats.into_values().collect(), })) } else { @@ -157,7 +142,7 @@ impl PendingScrapeResponseMap { if !keep { ::log::warn!( - "Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}", + "Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}", k, v ); @@ -193,7 +178,7 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); - let mut pending_scrape_responses = PendingScrapeResponseMap::default(); + let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); @@ -278,7 +263,7 @@ fn read_requests( config: &Config, state: &State, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, socket: &mut UdpSocket, @@ -375,7 +360,7 @@ fn read_requests( pub fn handle_request( config: &Config, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, request_sender: &ConnectedRequestSender, @@ -429,7 +414,6 @@ pub fn handle_request( let split_requests = pending_scrape_responses.prepare_split_requests( config, request, - src, pending_scrape_valid_until, ); @@ -471,7 +455,7 @@ fn send_responses( socket: &mut UdpSocket, buffer: &mut [u8], response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, SocketAddr)>, ) { let mut responses_sent_ipv4: usize = 0; @@ -495,7 +479,7 @@ fn send_responses( for (response, addr) in response_receiver.try_iter() { let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr), + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), }; @@ -638,7 +622,7 @@ mod tests { #[quickcheck] fn test_pending_scrape_response_map( - request_data: Vec<(i32, i64, SocketAddr, u8)>, + request_data: Vec<(i32, i64, u8)>, request_workers: u8, ) -> TestResult { if request_workers == 0 { @@ -651,11 +635,11 @@ mod tests { let valid_until = ValidUntil::new(1); - let mut map = PendingScrapeResponseMap::default(); + let mut map = PendingScrapeResponseSlab::default(); let mut requests = Vec::new(); - for (t, c, a, b) in request_data { + for (t, c, b) in request_data { if b == 0 { return TestResult::discard(); } @@ -674,32 +658,30 @@ mod tests { info_hashes, }; - requests.push((request, a)); + requests.push(request); } let mut all_split_requests = Vec::new(); - for (request, addr) in requests.iter() { + for request in requests.iter() { let split_requests = map.prepare_split_requests( &config, request.to_owned(), - addr.to_owned(), valid_until, ); - all_split_requests.push(( - addr, + all_split_requests.push( split_requests .into_iter() .collect::>(), - )); + ); } assert_eq!(map.0.len(), requests.len()); let mut responses = Vec::new(); - for (addr, split_requests) in all_split_requests { + for split_requests in all_split_requests { for (worker_index, split_request) in split_requests { assert!(worker_index.0 < request_workers as usize); @@ -719,12 +701,11 @@ mod tests { .collect(); let response = PendingScrapeResponse { - transaction_id: split_request.transaction_id, - connection_id: split_request.connection_id, + slab_key: split_request.slab_key, torrent_stats, }; - if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) { + if let Some(response) = map.add_and_get_finished(response) { responses.push(response); } } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 09fe3e5..1062ade 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -41,8 +41,7 @@ pub fn bench_scrape_handler( for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { let request = ConnectedRequest::Scrape(PendingScrapeRequest { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: 0, info_hashes: request .info_hashes .clone()