diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index 1232808..b4fbc54 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -229,11 +229,14 @@ impl ConnectionReader { match &message { tungstenite::Message::Text(_) | tungstenite::Message::Binary(_) => { match InMessage::from_ws_message(message) { - Ok(in_message) => { - self.handle_in_message(in_message).await?; + Ok(InMessage::AnnounceRequest(request)) => { + self.handle_announce_request(request).await?; + } + Ok(InMessage::ScrapeRequest(request)) => { + self.handle_scrape_request(request).await?; } Err(err) => { - ::log::debug!("Couldn't parse in_message: {:?}", err); + ::log::debug!("Couldn't parse in_message: {:#}", err); self.send_error_response("Invalid request".into(), None, None) .await?; @@ -261,171 +264,167 @@ impl ConnectionReader { } } - async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> { - match in_message { - InMessage::AnnounceRequest(announce_request) => { - #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "announce", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); - let info_hash = announce_request.info_hash; + let info_hash = request.info_hash; - if self - .access_list_cache - .load() - .allows(self.config.access_list.mode, &info_hash.0) - { - let mut announced_info_hashes = - self.clean_up_data.announced_info_hashes.borrow_mut(); + if self + .access_list_cache + .load() + .allows(self.config.access_list.mode, &info_hash.0) + { + let mut announced_info_hashes = self.clean_up_data.announced_info_hashes.borrow_mut(); - // Store peer id / check if stored peer id matches - match announced_info_hashes.entry(announce_request.info_hash) { - Entry::Occupied(entry) => { - if *entry.get() != announce_request.peer_id { - // Drop Rc borrow before awaiting - drop(announced_info_hashes); + // Store peer id / check if stored peer id matches + match announced_info_hashes.entry(request.info_hash) { + Entry::Occupied(entry) => { + if *entry.get() != request.peer_id { + // Drop Rc borrow before awaiting + drop(announced_info_hashes); - self.send_error_response( - "Only one peer id can be used per torrent".into(), - Some(ErrorResponseAction::Announce), - Some(info_hash), - ) - .await?; - - return Err(anyhow::anyhow!( - "Peer used more than one PeerId for a single torrent" - )); - } - } - Entry::Vacant(entry) => { - entry.insert(announce_request.peer_id); - - // Set peer client info if not set - #[cfg(feature = "metrics")] - if self.config.metrics.run_prometheus_endpoint - && self.config.metrics.peer_clients - && self.clean_up_data.opt_peer_client.borrow().is_none() - { - let peer_id = aquatic_peer_id::PeerId(announce_request.peer_id.0); - let client = peer_id.client(); - let prefix = peer_id.first_8_bytes_hex().to_string(); - - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => client.to_string(), - ); - - if self.config.metrics.peer_id_prefixes { - ::metrics::increment_gauge!( - "aquatic_peer_id_prefixes", - 1.0, - "prefix_hex" => prefix.to_string(), - ); - } - - *self.clean_up_data.opt_peer_client.borrow_mut() = - Some((client, prefix)); - }; - } - } - - if let Some(AnnounceEvent::Stopped) = announce_request.event { - announced_info_hashes.remove(&announce_request.info_hash); - } - - // Drop Rc borrow before awaiting - drop(announced_info_hashes); - - let in_message = InMessage::AnnounceRequest(announce_request); - - let consumer_index = - calculate_in_message_consumer_index(&self.config, info_hash); - - // Only fails when receiver is closed - self.in_message_senders - .send_to( - consumer_index, - (self.make_connection_meta(None), in_message), + self.send_error_response( + "Only one peer id can be used per torrent".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), ) - .await - .unwrap(); - } else { - self.send_error_response( - "Info hash not allowed".into(), - Some(ErrorResponseAction::Announce), - Some(info_hash), - ) - .await?; + .await?; + + return Err(anyhow::anyhow!( + "Peer used more than one PeerId for a single torrent" + )); + } + } + Entry::Vacant(entry) => { + entry.insert(request.peer_id); + + // Set peer client info if not set + #[cfg(feature = "metrics")] + if self.config.metrics.run_prometheus_endpoint + && self.config.metrics.peer_clients + && self.clean_up_data.opt_peer_client.borrow().is_none() + { + let peer_id = aquatic_peer_id::PeerId(request.peer_id.0); + let client = peer_id.client(); + let prefix = peer_id.first_8_bytes_hex().to_string(); + + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + ); + + if self.config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 1.0, + "prefix_hex" => prefix.to_string(), + ); + } + + *self.clean_up_data.opt_peer_client.borrow_mut() = Some((client, prefix)); + }; } } - InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { - #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "scrape", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - let info_hashes = if let Some(info_hashes) = info_hashes { - info_hashes - } else { - // If request.info_hashes is empty, don't return scrape for all - // torrents, even though reference server does it. It is too expensive. - self.send_error_response( - "Full scrapes are not allowed".into(), - Some(ErrorResponseAction::Scrape), - None, - ) - .await?; - - return Ok(()); - }; - - let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); - - for info_hash in info_hashes.as_vec() { - let info_hashes = info_hashes_by_worker - .entry(calculate_in_message_consumer_index(&self.config, info_hash)) - .or_default(); - - info_hashes.push(info_hash); - } - - let pending_worker_out_messages = info_hashes_by_worker.len(); - - let pending_scrape_response = PendingScrapeResponse { - pending_worker_out_messages, - stats: Default::default(), - }; - - let pending_scrape_id: u8 = self - .pending_scrape_slab - .borrow_mut() - .insert(pending_scrape_response) - .try_into() - .with_context(|| "Reached 256 pending scrape responses")?; - - let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id))); - - for (consumer_index, info_hashes) in info_hashes_by_worker { - let in_message = InMessage::ScrapeRequest(ScrapeRequest { - action: ScrapeAction::Scrape, - info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)), - }); - - // Only fails when receiver is closed - self.in_message_senders - .send_to(consumer_index, (meta, in_message)) - .await - .unwrap(); - } + if let Some(AnnounceEvent::Stopped) = request.event { + announced_info_hashes.remove(&request.info_hash); } + + // Drop Rc borrow before awaiting + drop(announced_info_hashes); + + let in_message = InMessage::AnnounceRequest(request); + + let consumer_index = calculate_in_message_consumer_index(&self.config, info_hash); + + // Only fails when receiver is closed + self.in_message_senders + .send_to( + consumer_index, + (self.make_connection_meta(None), in_message), + ) + .await + .unwrap(); + } else { + self.send_error_response( + "Info hash not allowed".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; + } + + Ok(()) + } + + async fn handle_scrape_request(&mut self, request: ScrapeRequest) -> anyhow::Result<()> { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + + let info_hashes = if let Some(info_hashes) = request.info_hashes { + info_hashes + } else { + // If request.info_hashes is empty, don't return scrape for all + // torrents, even though reference server does it. It is too expensive. + self.send_error_response( + "Full scrapes are not allowed".into(), + Some(ErrorResponseAction::Scrape), + None, + ) + .await?; + + return Ok(()); + }; + + let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); + + for info_hash in info_hashes.as_vec() { + let info_hashes = info_hashes_by_worker + .entry(calculate_in_message_consumer_index(&self.config, info_hash)) + .or_default(); + + info_hashes.push(info_hash); + } + + let pending_worker_out_messages = info_hashes_by_worker.len(); + + let pending_scrape_response = PendingScrapeResponse { + pending_worker_out_messages, + stats: Default::default(), + }; + + let pending_scrape_id: u8 = self + .pending_scrape_slab + .borrow_mut() + .insert(pending_scrape_response) + .try_into() + .with_context(|| "Reached 256 pending scrape responses")?; + + let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id))); + + for (consumer_index, info_hashes) in info_hashes_by_worker { + let in_message = InMessage::ScrapeRequest(ScrapeRequest { + action: ScrapeAction::Scrape, + info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)), + }); + + // Only fails when receiver is closed + self.in_message_senders + .send_to(consumer_index, (meta, in_message)) + .await + .unwrap(); } Ok(()) @@ -485,31 +484,28 @@ impl ConnectionWriter { .pending_scrape_id .expect("meta.pending_scrape_id not set"); - let finished = if let Some(pending) = Slab::get_mut( - &mut RefCell::borrow_mut(&self.pending_scrape_slab), - pending_scrape_id.0 as usize, - ) { - pending.stats.extend(out_message.files); - pending.pending_worker_out_messages -= 1; + let mut pending_responses = self.pending_scrape_slab.borrow_mut(); - pending.pending_worker_out_messages == 0 - } else { - return Err(anyhow::anyhow!("pending scrape not found in slab")); - }; + let pending_response = pending_responses + .get_mut(pending_scrape_id.0 as usize) + .ok_or(anyhow::anyhow!("pending scrape not found in slab"))?; - if finished { - let out_message = { - let mut slab = RefCell::borrow_mut(&self.pending_scrape_slab); + pending_response.stats.extend(out_message.files); + pending_response.pending_worker_out_messages -= 1; - let pending = slab.remove(pending_scrape_id.0 as usize); + if pending_response.pending_worker_out_messages == 0 { + let pending_response = + pending_responses.remove(pending_scrape_id.0 as usize); - slab.shrink_to_fit(); + pending_responses.shrink_to_fit(); - OutMessage::ScrapeResponse(ScrapeResponse { - action: ScrapeAction::Scrape, - files: pending.stats, - }) - }; + let out_message = OutMessage::ScrapeResponse(ScrapeResponse { + action: ScrapeAction::Scrape, + files: pending_response.stats, + }); + + // Drop Rc borrow before awaiting + drop(pending_responses); self.send_out_message(&out_message).await?; } @@ -522,72 +518,63 @@ impl ConnectionWriter { } async fn send_out_message(&mut self, out_message: &OutMessage) -> anyhow::Result<()> { - let result = timeout(Duration::from_secs(10), async { - let result = - futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await; - - Ok(result) + timeout(Duration::from_secs(10), async { + Ok(futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await) }) - .await; + .await + .map_err(|err| { + anyhow::anyhow!("send_out_message: sending to peer took too long: {:#}", err) + })? + .with_context(|| "send_out_message")?; - match result { - Ok(Ok(())) => { - if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message - { - *self.connection_valid_until.borrow_mut() = ValidUntil::new( - self.server_start_instant, - self.config.cleaning.max_connection_idle, - ); - } - - #[cfg(feature = "metrics")] - { - let out_message_type = match &out_message { - OutMessage::OfferOutMessage(_) => "offer", - OutMessage::AnswerOutMessage(_) => "offer_answer", - OutMessage::AnnounceResponse(_) => "announce", - OutMessage::ScrapeResponse(_) => "scrape", - OutMessage::ErrorResponse(_) => "error", - }; - - ::metrics::increment_counter!( - "aquatic_responses_total", - "type" => out_message_type, - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - - if let Some((peer_client, prefix)) = - self.clean_up_data.opt_peer_client.borrow().as_ref() - { - // As long as connection is still alive, increment peer client - // gauges by zero to prevent them from being removed due to - // idleness - - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 0.0, - "client" => peer_client.to_string(), - ); - - if self.config.metrics.peer_id_prefixes { - ::metrics::increment_gauge!( - "aquatic_peer_id_prefixes", - 0.0, - "prefix_hex" => prefix.to_string(), - ); - } - } - } - - Ok(()) - } - Ok(Err(err)) => Err(err.into()), - Err(err) => Err(anyhow::anyhow!( - "send_out_message: sending to peer took too long: {}", - err - )), + if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message { + *self.connection_valid_until.borrow_mut() = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); } + + #[cfg(feature = "metrics")] + { + let out_message_type = match &out_message { + OutMessage::OfferOutMessage(_) => "offer", + OutMessage::AnswerOutMessage(_) => "offer_answer", + OutMessage::AnnounceResponse(_) => "announce", + OutMessage::ScrapeResponse(_) => "scrape", + OutMessage::ErrorResponse(_) => "error", + }; + + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => out_message_type, + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + + if let Some((peer_client, prefix)) = + self.clean_up_data.opt_peer_client.borrow().as_ref() + { + // As long as connection is still alive, increment peer client + // gauges by zero to prevent them from being removed due to + // idleness + + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 0.0, + "client" => peer_client.to_string(), + ); + + if self.config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 0.0, + "prefix_hex" => prefix.to_string(), + ); + } + } + } + + Ok(()) } }