udp: include ConnectionId in PendingScrapeResponseMap key (#42)

This commit is contained in:
Joakim Frostegård 2021-12-28 03:16:15 +01:00 committed by GitHub
parent a208775104
commit e5a1461613
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 4 deletions

View file

@ -31,12 +31,14 @@ impl Ip for Ipv6Addr {
#[derive(Debug)] #[derive(Debug)]
pub struct PendingScrapeRequest { pub struct PendingScrapeRequest {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId, pub transaction_id: TransactionId,
pub info_hashes: BTreeMap<usize, InfoHash>, pub info_hashes: BTreeMap<usize, InfoHash>,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct PendingScrapeResponse { pub struct PendingScrapeResponse {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId, pub transaction_id: TransactionId,
pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>, pub torrent_stats: BTreeMap<usize, TorrentScrapeStatistics>,
} }

View file

@ -391,6 +391,7 @@ fn handle_scrape_request(
} }
PendingScrapeResponse { PendingScrapeResponse {
connection_id: request.connection_id,
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
torrent_stats, torrent_stats,
} }

View file

@ -55,12 +55,13 @@ pub struct PendingScrapeResponseMeta {
#[derive(Default)] #[derive(Default)]
pub struct PendingScrapeResponseMap( pub struct PendingScrapeResponseMap(
AHashIndexMap<TransactionId, (PendingScrapeResponseMeta, PendingScrapeResponse)>, AHashIndexMap<(ConnectionId, TransactionId), (PendingScrapeResponseMeta, PendingScrapeResponse)>,
); );
impl PendingScrapeResponseMap { impl PendingScrapeResponseMap {
pub fn prepare( pub fn prepare(
&mut self, &mut self,
connection_id: ConnectionId,
transaction_id: TransactionId, transaction_id: TransactionId,
num_pending: usize, num_pending: usize,
valid_until: ValidUntil, valid_until: ValidUntil,
@ -70,15 +71,18 @@ impl PendingScrapeResponseMap {
valid_until, valid_until,
}; };
let response = PendingScrapeResponse { let response = PendingScrapeResponse {
connection_id,
transaction_id, transaction_id,
torrent_stats: BTreeMap::new(), torrent_stats: BTreeMap::new(),
}; };
self.0.insert(transaction_id, (meta, response)); self.0.insert((connection_id, transaction_id), (meta, response));
} }
pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> { pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option<Response> {
let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { let key = (response.connection_id, response.transaction_id);
let finished = if let Some(r) = self.0.get_mut(&key) {
r.0.num_pending -= 1; r.0.num_pending -= 1;
r.1.torrent_stats.extend(response.torrent_stats.into_iter()); r.1.torrent_stats.extend(response.torrent_stats.into_iter());
@ -91,7 +95,7 @@ impl PendingScrapeResponseMap {
}; };
if finished { if finished {
let response = self.0.remove(&response.transaction_id).unwrap().1; let response = self.0.remove(&key).unwrap().1;
Some(Response::Scrape(ScrapeResponse { Some(Response::Scrape(ScrapeResponse {
transaction_id: response.transaction_id, transaction_id: response.transaction_id,
@ -370,12 +374,14 @@ pub fn handle_request(
let mut requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> = let mut requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
Default::default(); Default::default();
let connection_id = request.connection_id;
let transaction_id = request.transaction_id; let transaction_id = request.transaction_id;
for (i, info_hash) in request.info_hashes.into_iter().enumerate() { for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let pending = requests let pending = requests
.entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) .entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest { .or_insert_with(|| PendingScrapeRequest {
connection_id,
transaction_id, transaction_id,
info_hashes: BTreeMap::new(), info_hashes: BTreeMap::new(),
}); });
@ -384,6 +390,7 @@ pub fn handle_request(
} }
pending_scrape_responses.prepare( pending_scrape_responses.prepare(
connection_id,
transaction_id, transaction_id,
requests.len(), requests.len(),
pending_scrape_valid_until, pending_scrape_valid_until,

View file

@ -41,6 +41,7 @@ pub fn bench_scrape_handler(
for request_chunk in requests.chunks(p) { for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk { for (request, src) in request_chunk {
let request = ConnectedRequest::Scrape(PendingScrapeRequest { let request = ConnectedRequest::Scrape(PendingScrapeRequest {
connection_id: request.connection_id,
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
info_hashes: request info_hashes: request
.info_hashes .info_hashes