mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws: further refactor of swarm worker to improve readability
This commit is contained in:
parent
e6e663761c
commit
f050467c2a
2 changed files with 236 additions and 186 deletions
|
|
@ -1,7 +1,9 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
||||
use aquatic_ws_protocol::incoming::{AnnounceEvent, AnnounceRequest, ScrapeRequest};
|
||||
use aquatic_ws_protocol::incoming::{
|
||||
AnnounceEvent, AnnounceRequest, AnnounceRequestOffer, ScrapeRequest,
|
||||
};
|
||||
use aquatic_ws_protocol::outgoing::{
|
||||
AnnounceResponse, AnswerOutMessage, ErrorResponse, ErrorResponseAction, OfferOutMessage,
|
||||
OutMessage, ScrapeResponse, ScrapeStatistics,
|
||||
|
|
@ -154,8 +156,6 @@ impl TorrentMap {
|
|||
) {
|
||||
let torrent_data = self.torrents.entry(request.info_hash).or_default();
|
||||
|
||||
let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
|
||||
|
||||
// If there is already a peer with this peer_id, check that connection id
|
||||
// is same as that of request sender. Otherwise, ignore request. Since
|
||||
// peers have access to each others peer_id's, they could send requests
|
||||
|
|
@ -168,194 +168,48 @@ impl TorrentMap {
|
|||
|
||||
::log::trace!("received request from {:?}", request_sender_meta);
|
||||
|
||||
// Insert/update/remove peer who sent this request
|
||||
{
|
||||
let peer_status = PeerStatus::from_event_and_bytes_left(
|
||||
request.event.unwrap_or_default(),
|
||||
request.bytes_left,
|
||||
);
|
||||
let peer_status = torrent_data.insert_or_update_peer(
|
||||
config,
|
||||
server_start_instant,
|
||||
request_sender_meta,
|
||||
&request,
|
||||
&self.peer_gauge,
|
||||
);
|
||||
|
||||
match torrent_data.peers.entry(request.peer_id) {
|
||||
::indexmap::map::Entry::Occupied(mut entry) => match peer_status {
|
||||
PeerStatus::Leeching => {
|
||||
let peer = entry.get_mut();
|
||||
|
||||
if peer.seeder {
|
||||
torrent_data.num_seeders -= 1;
|
||||
}
|
||||
|
||||
peer.seeder = false;
|
||||
peer.valid_until = valid_until;
|
||||
}
|
||||
PeerStatus::Seeding => {
|
||||
let peer = entry.get_mut();
|
||||
|
||||
if !peer.seeder {
|
||||
torrent_data.num_seeders += 1;
|
||||
}
|
||||
|
||||
peer.seeder = true;
|
||||
peer.valid_until = valid_until;
|
||||
}
|
||||
PeerStatus::Stopped => {
|
||||
let peer = entry.remove();
|
||||
|
||||
if peer.seeder {
|
||||
torrent_data.num_seeders -= 1;
|
||||
}
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
self.peer_gauge.decrement(1.0);
|
||||
|
||||
return;
|
||||
}
|
||||
},
|
||||
::indexmap::map::Entry::Vacant(entry) => match peer_status {
|
||||
PeerStatus::Leeching => {
|
||||
let peer = Peer {
|
||||
connection_id: request_sender_meta.connection_id,
|
||||
consumer_id: request_sender_meta.out_message_consumer_id,
|
||||
seeder: false,
|
||||
valid_until,
|
||||
expecting_answers: Default::default(),
|
||||
};
|
||||
|
||||
entry.insert(peer);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
self.peer_gauge.increment(1.0)
|
||||
}
|
||||
PeerStatus::Seeding => {
|
||||
torrent_data.num_seeders += 1;
|
||||
|
||||
let peer = Peer {
|
||||
connection_id: request_sender_meta.connection_id,
|
||||
consumer_id: request_sender_meta.out_message_consumer_id,
|
||||
seeder: true,
|
||||
valid_until,
|
||||
expecting_answers: Default::default(),
|
||||
};
|
||||
|
||||
entry.insert(peer);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
self.peer_gauge.increment(1.0);
|
||||
}
|
||||
PeerStatus::Stopped => return,
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
// If peer sent offers, send them on to random peers
|
||||
if let Some(offers) = request.offers {
|
||||
// FIXME: config: also maybe check this when parsing request
|
||||
let max_num_peers_to_take = offers.len().min(config.protocol.max_offers);
|
||||
|
||||
#[inline]
|
||||
fn convert_offer_receiver_peer(
|
||||
peer_id: &PeerId,
|
||||
peer: &Peer,
|
||||
) -> (PeerId, ConnectionId, ConsumerId) {
|
||||
(*peer_id, peer.connection_id, peer.consumer_id)
|
||||
if peer_status != PeerStatus::Stopped {
|
||||
if let Some(offers) = request.offers {
|
||||
torrent_data.handle_offers(
|
||||
config,
|
||||
rng,
|
||||
server_start_instant,
|
||||
request.info_hash,
|
||||
request.peer_id,
|
||||
offers,
|
||||
out_messages,
|
||||
);
|
||||
}
|
||||
|
||||
let offer_receivers: Vec<(PeerId, ConnectionId, ConsumerId)> = extract_response_peers(
|
||||
rng,
|
||||
&torrent_data.peers,
|
||||
max_num_peers_to_take,
|
||||
request.peer_id,
|
||||
convert_offer_receiver_peer,
|
||||
);
|
||||
if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) = (
|
||||
request.answer,
|
||||
request.answer_to_peer_id,
|
||||
request.answer_offer_id,
|
||||
) {
|
||||
let opt_out_message = torrent_data.handle_answer(
|
||||
request_sender_meta,
|
||||
request.info_hash,
|
||||
request.peer_id,
|
||||
answer_receiver_id,
|
||||
offer_id,
|
||||
answer,
|
||||
);
|
||||
|
||||
if let Some(peer) = torrent_data.peers.get_mut(&request.peer_id) {
|
||||
for (
|
||||
offer,
|
||||
(
|
||||
offer_receiver_peer_id,
|
||||
offer_receiver_connection_id,
|
||||
offer_receiver_consumer_id,
|
||||
),
|
||||
) in offers.into_iter().zip(offer_receivers)
|
||||
{
|
||||
let offer_out_message = OfferOutMessage {
|
||||
action: AnnounceAction::Announce,
|
||||
info_hash: request.info_hash,
|
||||
peer_id: request.peer_id,
|
||||
offer: offer.offer,
|
||||
offer_id: offer.offer_id,
|
||||
};
|
||||
|
||||
let meta = OutMessageMeta {
|
||||
out_message_consumer_id: offer_receiver_consumer_id,
|
||||
connection_id: offer_receiver_connection_id,
|
||||
pending_scrape_id: None,
|
||||
};
|
||||
|
||||
out_messages.push((meta, OutMessage::OfferOutMessage(offer_out_message)));
|
||||
::log::trace!("sending OfferOutMessage to {:?}", meta);
|
||||
|
||||
peer.expecting_answers.insert(
|
||||
ExpectingAnswer {
|
||||
from_peer_id: offer_receiver_peer_id,
|
||||
regarding_offer_id: offer.offer_id,
|
||||
},
|
||||
ValidUntil::new(server_start_instant, config.cleaning.max_offer_age),
|
||||
);
|
||||
if let Some(out_message) = opt_out_message {
|
||||
out_messages.push(out_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If peer sent answer, send it on to relevant peer
|
||||
if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) = (
|
||||
request.answer,
|
||||
request.answer_to_peer_id,
|
||||
request.answer_offer_id,
|
||||
) {
|
||||
if let Some(answer_receiver) = torrent_data.peers.get_mut(&answer_receiver_id) {
|
||||
let expecting_answer = ExpectingAnswer {
|
||||
from_peer_id: request.peer_id,
|
||||
regarding_offer_id: offer_id,
|
||||
};
|
||||
|
||||
if answer_receiver
|
||||
.expecting_answers
|
||||
.remove(&expecting_answer)
|
||||
.is_some()
|
||||
{
|
||||
let answer_out_message = AnswerOutMessage {
|
||||
action: AnnounceAction::Announce,
|
||||
peer_id: request.peer_id,
|
||||
info_hash: request.info_hash,
|
||||
answer,
|
||||
offer_id,
|
||||
};
|
||||
|
||||
let meta = OutMessageMeta {
|
||||
out_message_consumer_id: answer_receiver.consumer_id,
|
||||
connection_id: answer_receiver.connection_id,
|
||||
pending_scrape_id: None,
|
||||
};
|
||||
|
||||
out_messages.push((meta, OutMessage::AnswerOutMessage(answer_out_message)));
|
||||
::log::trace!("sending AnswerOutMessage to {:?}", meta);
|
||||
} else {
|
||||
let error_message = ErrorResponse {
|
||||
action: Some(ErrorResponseAction::Announce),
|
||||
info_hash: Some(request.info_hash),
|
||||
failure_reason:
|
||||
"Could not find the offer corresponding to your answer. It may have expired."
|
||||
.into(),
|
||||
};
|
||||
|
||||
out_messages.push((
|
||||
request_sender_meta.into(),
|
||||
OutMessage::ErrorResponse(error_message),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let out_message = OutMessage::AnnounceResponse(AnnounceResponse {
|
||||
let response = OutMessage::AnnounceResponse(AnnounceResponse {
|
||||
action: AnnounceAction::Announce,
|
||||
info_hash: request.info_hash,
|
||||
complete: torrent_data.num_seeders,
|
||||
|
|
@ -363,7 +217,7 @@ impl TorrentMap {
|
|||
announce_interval: config.protocol.peer_announce_interval,
|
||||
});
|
||||
|
||||
out_messages.push((request_sender_meta.into(), out_message));
|
||||
out_messages.push((request_sender_meta.into(), response));
|
||||
}
|
||||
|
||||
pub fn handle_scrape_request(
|
||||
|
|
@ -478,6 +332,202 @@ impl TorrentData {
|
|||
fn num_leechers(&self) -> usize {
|
||||
self.peers.len() - self.num_seeders
|
||||
}
|
||||
|
||||
pub fn insert_or_update_peer(
|
||||
&mut self,
|
||||
config: &Config,
|
||||
server_start_instant: ServerStartInstant,
|
||||
request_sender_meta: InMessageMeta,
|
||||
request: &AnnounceRequest,
|
||||
#[cfg(feature = "metrics")] peer_gauge: &::metrics::Gauge,
|
||||
) -> PeerStatus {
|
||||
let valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
|
||||
|
||||
let peer_status = PeerStatus::from_event_and_bytes_left(
|
||||
request.event.unwrap_or_default(),
|
||||
request.bytes_left,
|
||||
);
|
||||
|
||||
match self.peers.entry(request.peer_id) {
|
||||
::indexmap::map::Entry::Occupied(mut entry) => match peer_status {
|
||||
PeerStatus::Leeching => {
|
||||
let peer = entry.get_mut();
|
||||
|
||||
if peer.seeder {
|
||||
self.num_seeders -= 1;
|
||||
}
|
||||
|
||||
peer.seeder = false;
|
||||
peer.valid_until = valid_until;
|
||||
}
|
||||
PeerStatus::Seeding => {
|
||||
let peer = entry.get_mut();
|
||||
|
||||
if !peer.seeder {
|
||||
self.num_seeders += 1;
|
||||
}
|
||||
|
||||
peer.seeder = true;
|
||||
peer.valid_until = valid_until;
|
||||
}
|
||||
PeerStatus::Stopped => {
|
||||
let peer = entry.remove();
|
||||
|
||||
if peer.seeder {
|
||||
self.num_seeders -= 1;
|
||||
}
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
peer_gauge.decrement(1.0);
|
||||
}
|
||||
},
|
||||
::indexmap::map::Entry::Vacant(entry) => match peer_status {
|
||||
PeerStatus::Leeching => {
|
||||
let peer = Peer {
|
||||
connection_id: request_sender_meta.connection_id,
|
||||
consumer_id: request_sender_meta.out_message_consumer_id,
|
||||
seeder: false,
|
||||
valid_until,
|
||||
expecting_answers: Default::default(),
|
||||
};
|
||||
|
||||
entry.insert(peer);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
peer_gauge.increment(1.0)
|
||||
}
|
||||
PeerStatus::Seeding => {
|
||||
self.num_seeders += 1;
|
||||
|
||||
let peer = Peer {
|
||||
connection_id: request_sender_meta.connection_id,
|
||||
consumer_id: request_sender_meta.out_message_consumer_id,
|
||||
seeder: true,
|
||||
valid_until,
|
||||
expecting_answers: Default::default(),
|
||||
};
|
||||
|
||||
entry.insert(peer);
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
peer_gauge.increment(1.0);
|
||||
}
|
||||
PeerStatus::Stopped => (),
|
||||
},
|
||||
}
|
||||
|
||||
peer_status
|
||||
}
|
||||
|
||||
/// Pass on offers to random peers
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn handle_offers(
|
||||
&mut self,
|
||||
config: &Config,
|
||||
rng: &mut SmallRng,
|
||||
server_start_instant: ServerStartInstant,
|
||||
info_hash: InfoHash,
|
||||
sender_peer_id: PeerId,
|
||||
offers: Vec<AnnounceRequestOffer>,
|
||||
out_messages: &mut Vec<(OutMessageMeta, OutMessage)>,
|
||||
) {
|
||||
let max_num_peers_to_take = offers.len().min(config.protocol.max_offers);
|
||||
|
||||
let offer_receivers: Vec<(PeerId, ConnectionId, ConsumerId)> = extract_response_peers(
|
||||
rng,
|
||||
&self.peers,
|
||||
max_num_peers_to_take,
|
||||
sender_peer_id,
|
||||
|peer_id, peer| (*peer_id, peer.connection_id, peer.consumer_id),
|
||||
);
|
||||
|
||||
if let Some(peer) = self.peers.get_mut(&sender_peer_id) {
|
||||
for (
|
||||
offer,
|
||||
(offer_receiver_peer_id, offer_receiver_connection_id, offer_receiver_consumer_id),
|
||||
) in offers.into_iter().zip(offer_receivers)
|
||||
{
|
||||
peer.expecting_answers.insert(
|
||||
ExpectingAnswer {
|
||||
from_peer_id: offer_receiver_peer_id,
|
||||
regarding_offer_id: offer.offer_id,
|
||||
},
|
||||
ValidUntil::new(server_start_instant, config.cleaning.max_offer_age),
|
||||
);
|
||||
|
||||
let offer_out_message = OfferOutMessage {
|
||||
action: AnnounceAction::Announce,
|
||||
info_hash,
|
||||
peer_id: sender_peer_id,
|
||||
offer: offer.offer,
|
||||
offer_id: offer.offer_id,
|
||||
};
|
||||
|
||||
let meta = OutMessageMeta {
|
||||
out_message_consumer_id: offer_receiver_consumer_id,
|
||||
connection_id: offer_receiver_connection_id,
|
||||
pending_scrape_id: None,
|
||||
};
|
||||
|
||||
out_messages.push((meta, OutMessage::OfferOutMessage(offer_out_message)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pass on answer to relevant peer
|
||||
fn handle_answer(
|
||||
&mut self,
|
||||
request_sender_meta: InMessageMeta,
|
||||
info_hash: InfoHash,
|
||||
peer_id: PeerId,
|
||||
answer_receiver_id: PeerId,
|
||||
offer_id: OfferId,
|
||||
answer: RtcAnswer,
|
||||
) -> Option<(OutMessageMeta, OutMessage)> {
|
||||
if let Some(answer_receiver) = self.peers.get_mut(&answer_receiver_id) {
|
||||
let expecting_answer = ExpectingAnswer {
|
||||
from_peer_id: peer_id,
|
||||
regarding_offer_id: offer_id,
|
||||
};
|
||||
|
||||
if answer_receiver
|
||||
.expecting_answers
|
||||
.remove(&expecting_answer)
|
||||
.is_some()
|
||||
{
|
||||
let answer_out_message = AnswerOutMessage {
|
||||
action: AnnounceAction::Announce,
|
||||
peer_id,
|
||||
info_hash,
|
||||
answer,
|
||||
offer_id,
|
||||
};
|
||||
|
||||
let meta = OutMessageMeta {
|
||||
out_message_consumer_id: answer_receiver.consumer_id,
|
||||
connection_id: answer_receiver.connection_id,
|
||||
pending_scrape_id: None,
|
||||
};
|
||||
|
||||
Some((meta, OutMessage::AnswerOutMessage(answer_out_message)))
|
||||
} else {
|
||||
let error_message = ErrorResponse {
|
||||
action: Some(ErrorResponseAction::Announce),
|
||||
info_hash: Some(info_hash),
|
||||
failure_reason:
|
||||
"Could not find the offer corresponding to your answer. It may have expired."
|
||||
.into(),
|
||||
};
|
||||
|
||||
Some((
|
||||
request_sender_meta.into(),
|
||||
OutMessage::ErrorResponse(error_message),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
@ -523,7 +573,7 @@ impl PeerStatus {
|
|||
/// If there are more peers in map than `max_num_peers_to_take`, do a random
|
||||
/// selection of peers from first and second halves of map in order to avoid
|
||||
/// returning too homogeneous peers.
|
||||
///
|
||||
///
|
||||
/// Filters out announcing peer.
|
||||
#[inline]
|
||||
pub fn extract_response_peers<K, V, R, F>(
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ pub struct AnnounceRequest {
|
|||
pub answer_offer_id: Option<OfferId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum AnnounceEvent {
|
||||
Started,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue