From f050467c2a3d13efbe537acadf06d43fa8ca12b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 25 Jan 2024 23:24:10 +0100 Subject: [PATCH] ws: further refactor of swarm worker to improve readability --- crates/ws/src/workers/swarm/storage.rs | 420 +++++++++++--------- crates/ws_protocol/src/incoming/announce.rs | 2 +- 2 files changed, 236 insertions(+), 186 deletions(-) diff --git a/crates/ws/src/workers/swarm/storage.rs b/crates/ws/src/workers/swarm/storage.rs index 5ef51b5..2566abb 100644 --- a/crates/ws/src/workers/swarm/storage.rs +++ b/crates/ws/src/workers/swarm/storage.rs @@ -1,7 +1,9 @@ use std::sync::Arc; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_ws_protocol::incoming::{AnnounceEvent, AnnounceRequest, ScrapeRequest}; +use aquatic_ws_protocol::incoming::{ + AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, ScrapeRequest, +}; use aquatic_ws_protocol::outgoing::{ AnnounceResponse, AnswerOutMessage, ErrorResponse, ErrorResponseAction, OfferOutMessage, OutMessage, ScrapeResponse, ScrapeStatistics, @@ -154,8 +156,6 @@ impl TorrentMap { ) { let torrent_data = self.torrents.entry(request.info_hash).or_default(); - let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); - // If there is already a peer with this peer_id, check that connection id // is same as that of request sender. Otherwise, ignore request. Since // peers have access to each others peer_id's, they could send requests @@ -168,194 +168,48 @@ impl TorrentMap { ::log::trace!("received request from {:?}", request_sender_meta); - // Insert/update/remove peer who sent this request - { - let peer_status = PeerStatus::from_event_and_bytes_left( - request.event.unwrap_or_default(), - request.bytes_left, - ); + let peer_status = torrent_data.insert_or_update_peer( + config, + server_start_instant, + request_sender_meta, + &request, + &self.peer_gauge, + ); - match torrent_data.peers.entry(request.peer_id) { - ::indexmap::map::Entry::Occupied(mut entry) => match peer_status { - PeerStatus::Leeching => { - let peer = entry.get_mut(); - - if peer.seeder { - torrent_data.num_seeders -= 1; - } - - peer.seeder = false; - peer.valid_until = valid_until; - } - PeerStatus::Seeding => { - let peer = entry.get_mut(); - - if !peer.seeder { - torrent_data.num_seeders += 1; - } - - peer.seeder = true; - peer.valid_until = valid_until; - } - PeerStatus::Stopped => { - let peer = entry.remove(); - - if peer.seeder { - torrent_data.num_seeders -= 1; - } - - #[cfg(feature = "metrics")] - self.peer_gauge.decrement(1.0); - - return; - } - }, - ::indexmap::map::Entry::Vacant(entry) => match peer_status { - PeerStatus::Leeching => { - let peer = Peer { - connection_id: request_sender_meta.connection_id, - consumer_id: request_sender_meta.out_message_consumer_id, - seeder: false, - valid_until, - expecting_answers: Default::default(), - }; - - entry.insert(peer); - - #[cfg(feature = "metrics")] - self.peer_gauge.increment(1.0) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - let peer = Peer { - connection_id: request_sender_meta.connection_id, - consumer_id: request_sender_meta.out_message_consumer_id, - seeder: true, - valid_until, - expecting_answers: Default::default(), - }; - - entry.insert(peer); - - #[cfg(feature = "metrics")] - self.peer_gauge.increment(1.0); - } - PeerStatus::Stopped => return, - }, - } - }; - - // If peer sent offers, send them on to random peers - if let Some(offers) = request.offers { - // FIXME: config: also maybe check this when parsing request - let max_num_peers_to_take = offers.len().min(config.protocol.max_offers); - - #[inline] - fn convert_offer_receiver_peer( - peer_id: &PeerId, - peer: &Peer, - ) -> (PeerId, ConnectionId, ConsumerId) { - (*peer_id, peer.connection_id, peer.consumer_id) + if peer_status != PeerStatus::Stopped { + if let Some(offers) = request.offers { + torrent_data.handle_offers( + config, + rng, + server_start_instant, + request.info_hash, + request.peer_id, + offers, + out_messages, + ); } - let offer_receivers: Vec<(PeerId, ConnectionId, ConsumerId)> = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - request.peer_id, - convert_offer_receiver_peer, - ); + if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) = ( + request.answer, + request.answer_to_peer_id, + request.answer_offer_id, + ) { + let opt_out_message = torrent_data.handle_answer( + request_sender_meta, + request.info_hash, + request.peer_id, + answer_receiver_id, + offer_id, + answer, + ); - if let Some(peer) = torrent_data.peers.get_mut(&request.peer_id) { - for ( - offer, - ( - offer_receiver_peer_id, - offer_receiver_connection_id, - offer_receiver_consumer_id, - ), - ) in offers.into_iter().zip(offer_receivers) - { - let offer_out_message = OfferOutMessage { - action: AnnounceAction::Announce, - info_hash: request.info_hash, - peer_id: request.peer_id, - offer: offer.offer, - offer_id: offer.offer_id, - }; - - let meta = OutMessageMeta { - out_message_consumer_id: offer_receiver_consumer_id, - connection_id: offer_receiver_connection_id, - pending_scrape_id: None, - }; - - out_messages.push((meta, OutMessage::OfferOutMessage(offer_out_message))); - ::log::trace!("sending OfferOutMessage to {:?}", meta); - - peer.expecting_answers.insert( - ExpectingAnswer { - from_peer_id: offer_receiver_peer_id, - regarding_offer_id: offer.offer_id, - }, - ValidUntil::new(server_start_instant, config.cleaning.max_offer_age), - ); + if let Some(out_message) = opt_out_message { + out_messages.push(out_message); } } } - // If peer sent answer, send it on to relevant peer - if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) = ( - request.answer, - request.answer_to_peer_id, - request.answer_offer_id, - ) { - if let Some(answer_receiver) = torrent_data.peers.get_mut(&answer_receiver_id) { - let expecting_answer = ExpectingAnswer { - from_peer_id: request.peer_id, - regarding_offer_id: offer_id, - }; - - if answer_receiver - .expecting_answers - .remove(&expecting_answer) - .is_some() - { - let answer_out_message = AnswerOutMessage { - action: AnnounceAction::Announce, - peer_id: request.peer_id, - info_hash: request.info_hash, - answer, - offer_id, - }; - - let meta = OutMessageMeta { - out_message_consumer_id: answer_receiver.consumer_id, - connection_id: answer_receiver.connection_id, - pending_scrape_id: None, - }; - - out_messages.push((meta, OutMessage::AnswerOutMessage(answer_out_message))); - ::log::trace!("sending AnswerOutMessage to {:?}", meta); - } else { - let error_message = ErrorResponse { - action: Some(ErrorResponseAction::Announce), - info_hash: Some(request.info_hash), - failure_reason: - "Could not find the offer corresponding to your answer. It may have expired." - .into(), - }; - - out_messages.push(( - request_sender_meta.into(), - OutMessage::ErrorResponse(error_message), - )); - } - } - } - - let out_message = OutMessage::AnnounceResponse(AnnounceResponse { + let response = OutMessage::AnnounceResponse(AnnounceResponse { action: AnnounceAction::Announce, info_hash: request.info_hash, complete: torrent_data.num_seeders, @@ -363,7 +217,7 @@ impl TorrentMap { announce_interval: config.protocol.peer_announce_interval, }); - out_messages.push((request_sender_meta.into(), out_message)); + out_messages.push((request_sender_meta.into(), response)); } pub fn handle_scrape_request( @@ -478,6 +332,202 @@ impl TorrentData { fn num_leechers(&self) -> usize { self.peers.len() - self.num_seeders } + + pub fn insert_or_update_peer( + &mut self, + config: &Config, + server_start_instant: ServerStartInstant, + request_sender_meta: InMessageMeta, + request: &AnnounceRequest, + #[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge, + ) -> PeerStatus { + let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age); + + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event.unwrap_or_default(), + request.bytes_left, + ); + + match self.peers.entry(request.peer_id) { + ::indexmap::map::Entry::Occupied(mut entry) => match peer_status { + PeerStatus::Leeching => { + let peer = entry.get_mut(); + + if peer.seeder { + self.num_seeders -= 1; + } + + peer.seeder = false; + peer.valid_until = valid_until; + } + PeerStatus::Seeding => { + let peer = entry.get_mut(); + + if !peer.seeder { + self.num_seeders += 1; + } + + peer.seeder = true; + peer.valid_until = valid_until; + } + PeerStatus::Stopped => { + let peer = entry.remove(); + + if peer.seeder { + self.num_seeders -= 1; + } + + #[cfg(feature = "metrics")] + peer_gauge.decrement(1.0); + } + }, + ::indexmap::map::Entry::Vacant(entry) => match peer_status { + PeerStatus::Leeching => { + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: false, + valid_until, + expecting_answers: Default::default(), + }; + + entry.insert(peer); + + #[cfg(feature = "metrics")] + peer_gauge.increment(1.0) + } + PeerStatus::Seeding => { + self.num_seeders += 1; + + let peer = Peer { + connection_id: request_sender_meta.connection_id, + consumer_id: request_sender_meta.out_message_consumer_id, + seeder: true, + valid_until, + expecting_answers: Default::default(), + }; + + entry.insert(peer); + + #[cfg(feature = "metrics")] + peer_gauge.increment(1.0); + } + PeerStatus::Stopped => (), + }, + } + + peer_status + } + + /// Pass on offers to random peers + #[allow(clippy::too_many_arguments)] + pub fn handle_offers( + &mut self, + config: &Config, + rng: &mut SmallRng, + server_start_instant: ServerStartInstant, + info_hash: InfoHash, + sender_peer_id: PeerId, + offers: Vec, + out_messages: &mut Vec<(OutMessageMeta, OutMessage)>, + ) { + let max_num_peers_to_take = offers.len().min(config.protocol.max_offers); + + let offer_receivers: Vec<(PeerId, ConnectionId, ConsumerId)> = extract_response_peers( + rng, + &self.peers, + max_num_peers_to_take, + sender_peer_id, + |peer_id, peer| (*peer_id, peer.connection_id, peer.consumer_id), + ); + + if let Some(peer) = self.peers.get_mut(&sender_peer_id) { + for ( + offer, + (offer_receiver_peer_id, offer_receiver_connection_id, offer_receiver_consumer_id), + ) in offers.into_iter().zip(offer_receivers) + { + peer.expecting_answers.insert( + ExpectingAnswer { + from_peer_id: offer_receiver_peer_id, + regarding_offer_id: offer.offer_id, + }, + ValidUntil::new(server_start_instant, config.cleaning.max_offer_age), + ); + + let offer_out_message = OfferOutMessage { + action: AnnounceAction::Announce, + info_hash, + peer_id: sender_peer_id, + offer: offer.offer, + offer_id: offer.offer_id, + }; + + let meta = OutMessageMeta { + out_message_consumer_id: offer_receiver_consumer_id, + connection_id: offer_receiver_connection_id, + pending_scrape_id: None, + }; + + out_messages.push((meta, OutMessage::OfferOutMessage(offer_out_message))); + } + } + } + + /// Pass on answer to relevant peer + fn handle_answer( + &mut self, + request_sender_meta: InMessageMeta, + info_hash: InfoHash, + peer_id: PeerId, + answer_receiver_id: PeerId, + offer_id: OfferId, + answer: RtcAnswer, + ) -> Option<(OutMessageMeta, OutMessage)> { + if let Some(answer_receiver) = self.peers.get_mut(&answer_receiver_id) { + let expecting_answer = ExpectingAnswer { + from_peer_id: peer_id, + regarding_offer_id: offer_id, + }; + + if answer_receiver + .expecting_answers + .remove(&expecting_answer) + .is_some() + { + let answer_out_message = AnswerOutMessage { + action: AnnounceAction::Announce, + peer_id, + info_hash, + answer, + offer_id, + }; + + let meta = OutMessageMeta { + out_message_consumer_id: answer_receiver.consumer_id, + connection_id: answer_receiver.connection_id, + pending_scrape_id: None, + }; + + Some((meta, OutMessage::AnswerOutMessage(answer_out_message))) + } else { + let error_message = ErrorResponse { + action: Some(ErrorResponseAction::Announce), + info_hash: Some(info_hash), + failure_reason: + "Could not find the offer corresponding to your answer. It may have expired." + .into(), + }; + + Some(( + request_sender_meta.into(), + OutMessage::ErrorResponse(error_message), + )) + } + } else { + None + } + } } #[derive(Clone, Debug)] @@ -523,7 +573,7 @@ impl PeerStatus { /// If there are more peers in map than `max_num_peers_to_take`, do a random /// selection of peers from first and second halves of map in order to avoid /// returning too homogeneous peers. -/// +/// /// Filters out announcing peer. #[inline] pub fn extract_response_peers( diff --git a/crates/ws_protocol/src/incoming/announce.rs b/crates/ws_protocol/src/incoming/announce.rs index 3a93eb9..36aef31 100644 --- a/crates/ws_protocol/src/incoming/announce.rs +++ b/crates/ws_protocol/src/incoming/announce.rs @@ -57,7 +57,7 @@ pub struct AnnounceRequest { pub answer_offer_id: Option, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum AnnounceEvent { Started,