udp scrape improvements (#43)

* udp_protocol: forbid full scrapes

* udp: improve PendingScrapeResponseMap logging

* udp: PendingScrapeResponseMap: store less data, improve logging

* udp: PendingScrapeResponseMap: log if replacing entry on insert

* udp: PendingScrapeResponseMap: use remote addr in key

* Run cargo fmt

* README: update copyright end year

* udp: move scrape request splitting logic into PendingScrapeResponseMap

* udp: add quickcheck test test_pending_scrape_response_map

* udp protocol: fix failing test_scrape_request_convert_identity
This commit is contained in:
Joakim Frostegård 2022-01-06 11:48:16 +01:00 committed by GitHub
parent e5a1461613
commit 700dd68d2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 217 additions and 59 deletions

View file

@ -48,58 +48,101 @@ impl ConnectionMap {
}
}
pub struct PendingScrapeResponseMeta {
#[derive(Debug)]
pub struct PendingScrapeResponseMapEntry {
num_pending: usize,
valid_until: ValidUntil,
torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
}
#[derive(Default)]
pub struct PendingScrapeResponseMap(
AHashIndexMap<(ConnectionId, TransactionId), (PendingScrapeResponseMeta, PendingScrapeResponse)>,
AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>,
);
impl PendingScrapeResponseMap {
pub fn prepare(
pub fn prepare_split_requests(
&mut self,
connection_id: ConnectionId,
transaction_id: TransactionId,
num_pending: usize,
config: &Config,
request: ScrapeRequest,
addr: SocketAddr,
valid_until: ValidUntil,
) {
let meta = PendingScrapeResponseMeta {
num_pending,
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
let mut split_requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
Default::default();
if request.info_hashes.is_empty() {
::log::warn!(
"Attempted to prepare PendingScrapeResponseMap entry with zero info hashes"
);
return split_requests;
}
let connection_id = request.connection_id;
let transaction_id = request.transaction_id;
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
connection_id,
transaction_id,
info_hashes: BTreeMap::new(),
});
split_request.info_hashes.insert(i, info_hash);
}
let key = (connection_id, transaction_id, addr);
let entry = PendingScrapeResponseMapEntry {
num_pending: split_requests.len(),
valid_until,
};
let response = PendingScrapeResponse {
connection_id,
transaction_id,
torrent_stats: BTreeMap::new(),
torrent_stats: Default::default(),
};
self.0.insert((connection_id, transaction_id), (meta, response));
if let Some(previous_entry) = self.0.insert(key, entry) {
::log::warn!(
"PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}",
previous_entry,
key
);
}
split_requests
}
pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> {
let key = (response.connection_id, response.transaction_id);
pub fn add_and_get_finished(
&mut self,
response: PendingScrapeResponse,
addr: SocketAddr,
) -> Option<Response> {
let key = (response.connection_id, response.transaction_id, addr);
let finished = if let Some(r) = self.0.get_mut(&key) {
r.0.num_pending -= 1;
let finished = if let Some(entry) = self.0.get_mut(&key) {
entry.num_pending -= 1;
r.1.torrent_stats.extend(response.torrent_stats.into_iter());
entry
.torrent_stats
.extend(response.torrent_stats.into_iter());
r.0.num_pending == 0
entry.num_pending == 0
} else {
::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map");
::log::warn!(
"PendingScrapeResponseMap.add didn't find entry for key {:?}",
key
);
false
};
if finished {
let response = self.0.remove(&key).unwrap().1;
let entry = self.0.remove(&key).unwrap();
Some(Response::Scrape(ScrapeResponse {
transaction_id: response.transaction_id,
torrent_stats: response.torrent_stats.into_values().collect(),
torrent_stats: entry.torrent_stats.into_values().collect(),
}))
} else {
None
@ -109,7 +152,19 @@ impl PendingScrapeResponseMap {
pub fn clean(&mut self) {
let now = Instant::now();
self.0.retain(|_, v| v.0.valid_until.0 > now);
self.0.retain(|k, v| {
let keep = v.valid_until.0 > now;
if !keep {
::log::warn!(
"Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}",
k,
v
);
}
keep
});
self.0.shrink_to_fit();
}
}
@ -371,32 +426,14 @@ pub fn handle_request(
}
Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) {
let mut requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
Default::default();
let connection_id = request.connection_id;
let transaction_id = request.transaction_id;
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let pending = requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
connection_id,
transaction_id,
info_hashes: BTreeMap::new(),
});
pending.info_hashes.insert(i, info_hash);
}
pending_scrape_responses.prepare(
connection_id,
transaction_id,
requests.len(),
let split_requests = pending_scrape_responses.prepare_split_requests(
config,
request,
src,
pending_scrape_valid_until,
);
for (request_worker_index, request) in requests {
for (request_worker_index, request) in split_requests {
request_sender.try_send_to(
request_worker_index,
ConnectedRequest::Scrape(request),
@ -458,7 +495,7 @@ fn send_responses(
for (response, addr) in response_receiver.try_iter() {
let opt_response = match response {
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r),
ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr),
ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)),
ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)),
};
@ -591,3 +628,111 @@ pub fn create_socket(config: &Config) -> ::std::net::UdpSocket {
socket.into()
}
#[cfg(test)]
mod tests {
use quickcheck::TestResult;
use quickcheck_macros::quickcheck;
use super::*;
#[quickcheck]
fn test_pending_scrape_response_map(
request_data: Vec<(i32, i64, SocketAddr, u8)>,
request_workers: u8,
) -> TestResult {
if request_workers == 0 {
return TestResult::discard();
}
let mut config = Config::default();
config.request_workers = request_workers as usize;
let valid_until = ValidUntil::new(1);
let mut map = PendingScrapeResponseMap::default();
let mut requests = Vec::new();
for (t, c, a, b) in request_data {
if b == 0 {
return TestResult::discard();
}
let mut info_hashes = Vec::new();
for i in 0..b {
let info_hash = InfoHash([i; 20]);
info_hashes.push(info_hash);
}
let request = ScrapeRequest {
transaction_id: TransactionId(t),
connection_id: ConnectionId(c),
info_hashes,
};
requests.push((request, a));
}
let mut all_split_requests = Vec::new();
for (request, addr) in requests.iter() {
let split_requests = map.prepare_split_requests(
&config,
request.to_owned(),
addr.to_owned(),
valid_until,
);
all_split_requests.push((
addr,
split_requests
.into_iter()
.collect::<Vec<(RequestWorkerIndex, PendingScrapeRequest)>>(),
));
}
assert_eq!(map.0.len(), requests.len());
let mut responses = Vec::new();
for (addr, split_requests) in all_split_requests {
for (worker_index, split_request) in split_requests {
assert!(worker_index.0 < request_workers as usize);
let torrent_stats = split_request
.info_hashes
.into_iter()
.map(|(i, info_hash)| {
(
i,
TorrentScrapeStatistics {
seeders: NumberOfPeers((info_hash.0[0]) as i32),
leechers: NumberOfPeers(0),
completed: NumberOfDownloads(0),
},
)
})
.collect();
let response = PendingScrapeResponse {
transaction_id: split_request.transaction_id,
connection_id: split_request.connection_id,
torrent_stats,
};
if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) {
responses.push(response);
}
}
}
assert!(map.0.is_empty());
assert_eq!(responses.len(), requests.len());
TestResult::from_bool(true)
}
}