udp: use slab for pending scrape responses to fix potential issue

Peers sometimes send multiple scrape requests with the same transaction
id, which would previously cause warnings due to replacing the
PendingScrapeMapEntry and later not finding it.
This commit is contained in:
Joakim Frostegård 2022-01-13 18:35:41 +01:00
parent 700dd68d2c
commit 00c4e74374
4 changed files with 39 additions and 62 deletions

View file

@ -31,15 +31,13 @@ impl Ip for Ipv6Addr {
#[derive(Debug)]
pub struct PendingScrapeRequest {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId,
pub slab_key: usize,
pub info_hashes: BTreeMap<usize, InfoHash>,
}
#[derive(Debug)]
pub struct PendingScrapeResponse {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId,
pub slab_key: usize,
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
}

View file

@ -391,8 +391,7 @@ fn handle_scrape_request(
}
PendingScrapeResponse {
connection_id: request.connection_id,
transaction_id: request.transaction_id,
slab_key: request.slab_key,
torrent_stats,
}
}

View file

@ -12,6 +12,7 @@ use crossbeam_channel::Receiver;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
use rand::prelude::{Rng, SeedableRng, StdRng};
use slab::Slab;
use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListCache;
@ -49,23 +50,23 @@ impl ConnectionMap {
}
#[derive(Debug)]
pub struct PendingScrapeResponseMapEntry {
pub struct PendingScrapeResponseSlabEntry {
num_pending: usize,
valid_until: ValidUntil,
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
transaction_id: TransactionId,
}
#[derive(Default)]
pub struct PendingScrapeResponseMap(
AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>,
pub struct PendingScrapeResponseSlab(
Slab<PendingScrapeResponseSlabEntry>,
);
impl PendingScrapeResponseMap {
impl PendingScrapeResponseSlab {
pub fn prepare_split_requests(
&mut self,
config: &Config,
request: ScrapeRequest,
addr: SocketAddr,
valid_until: ValidUntil,
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
let mut split_requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
@ -73,54 +74,38 @@ impl PendingScrapeResponseMap {
if request.info_hashes.is_empty() {
::log::warn!(
"Attempted to prepare PendingScrapeResponseMap entry with zero info hashes"
"Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes"
);
return split_requests;
}
let connection_id = request.connection_id;
let transaction_id = request.transaction_id;
let vacant_entry = self.0.vacant_entry();
let slab_key = vacant_entry.key();
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
connection_id,
transaction_id,
slab_key,
info_hashes: BTreeMap::new(),
});
split_request.info_hashes.insert(i, info_hash);
}
let key = (connection_id, transaction_id, addr);
let entry = PendingScrapeResponseMapEntry {
vacant_entry.insert(PendingScrapeResponseSlabEntry {
num_pending: split_requests.len(),
valid_until,
torrent_stats: Default::default(),
};
if let Some(previous_entry) = self.0.insert(key, entry) {
::log::warn!(
"PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}",
previous_entry,
key
);
}
transaction_id: request.transaction_id,
});
split_requests
}
pub fn add_and_get_finished(
&mut self,
response: PendingScrapeResponse,
addr: SocketAddr,
) -> Option<Response> {
let key = (response.connection_id, response.transaction_id, addr);
let finished = if let Some(entry) = self.0.get_mut(&key) {
pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> {
let finished = if let Some(entry) = self.0.get_mut(response.slab_key) {
entry.num_pending -= 1;
entry
@ -130,18 +115,18 @@ impl PendingScrapeResponseMap {
entry.num_pending == 0
} else {
::log::warn!(
"PendingScrapeResponseMap.add didn't find entry for key {:?}",
key
"PendingScrapeResponseSlab.add didn't find entry for key {:?}",
response.slab_key
);
false
};
if finished {
let entry = self.0.remove(&key).unwrap();
let entry = self.0.remove(response.slab_key);
Some(Response::Scrape(ScrapeResponse {
transaction_id: response.transaction_id,
transaction_id: entry.transaction_id,
torrent_stats: entry.torrent_stats.into_values().collect(),
}))
} else {
@ -157,7 +142,7 @@ impl PendingScrapeResponseMap {
if !keep {
::log::warn!(
"Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}",
"Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}",
k,
v
);
@ -193,7 +178,7 @@ pub fn run_socket_worker(
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connections = ConnectionMap::default();
let mut pending_scrape_responses = PendingScrapeResponseMap::default();
let mut pending_scrape_responses = PendingScrapeResponseSlab::default();
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
@ -278,7 +263,7 @@ fn read_requests(
config: &Config,
state: &State,
connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseMap,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng,
socket: &mut UdpSocket,
@ -375,7 +360,7 @@ fn read_requests(
pub fn handle_request(
config: &Config,
connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseMap,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng,
request_sender: &ConnectedRequestSender,
@ -429,7 +414,6 @@ pub fn handle_request(
let split_requests = pending_scrape_responses.prepare_split_requests(
config,
request,
src,
pending_scrape_valid_until,
);
@ -471,7 +455,7 @@ fn send_responses(
socket: &mut UdpSocket,
buffer: &mut [u8],
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
pending_scrape_responses: &mut PendingScrapeResponseMap,
pending_scrape_responses: &mut PendingScrapeResponseSlab,
local_responses: Drain<(Response, SocketAddr)>,
) {
let mut responses_sent_ipv4: usize = 0;
@ -495,7 +479,7 @@ fn send_responses(
for (response, addr) in response_receiver.try_iter() {
let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr),
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
@ -638,7 +622,7 @@ mod tests {
#[quickcheck]
fn test_pending_scrape_response_map(
request_data: Vec<(i32, i64, SocketAddr, u8)>,
request_data: Vec<(i32, i64, u8)>,
request_workers: u8,
) -> TestResult {
if request_workers == 0 {
@ -651,11 +635,11 @@ mod tests {
let valid_until = ValidUntil::new(1);
let mut map = PendingScrapeResponseMap::default();
let mut map = PendingScrapeResponseSlab::default();
let mut requests = Vec::new();
for (t, c, a, b) in request_data {
for (t, c, b) in request_data {
if b == 0 {
return TestResult::discard();
}
@ -674,32 +658,30 @@ mod tests {
info_hashes,
};
requests.push((request, a));
requests.push(request);
}
let mut all_split_requests = Vec::new();
for (request, addr) in requests.iter() {
for request in requests.iter() {
let split_requests = map.prepare_split_requests(
&config,
request.to_owned(),
addr.to_owned(),
valid_until,
);
all_split_requests.push((
addr,
all_split_requests.push(
split_requests
.into_iter()
.collect::<Vec<(RequestWorkerIndex, PendingScrapeRequest)>>(),
));
);
}
assert_eq!(map.0.len(), requests.len());
let mut responses = Vec::new();
for (addr, split_requests) in all_split_requests {
for split_requests in all_split_requests {
for (worker_index, split_request) in split_requests {
assert!(worker_index.0 < request_workers as usize);
@ -719,12 +701,11 @@ mod tests {
.collect();
let response = PendingScrapeResponse {
transaction_id: split_request.transaction_id,
connection_id: split_request.connection_id,
slab_key: split_request.slab_key,
torrent_stats,
};
if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) {
if let Some(response) = map.add_and_get_finished(response) {
responses.push(response);
}
}